• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20085

14 Nov 2025 11:20PM UTC coverage: 88.511% (-0.05%) from 88.561%
#20085

push

github

ejona86
opentelemetry: propagate baggage to metrics for custom attributes, helps with b/406058193 (#12389)

34999 of 39542 relevant lines covered (88.51%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.79
/../opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.opentelemetry;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY;
21
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY;
22
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
23
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
24
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
25
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.TARGET_KEY;
26

27
import com.google.common.annotations.VisibleForTesting;
28
import com.google.common.base.Stopwatch;
29
import com.google.common.base.Supplier;
30
import com.google.common.collect.ImmutableList;
31
import com.google.common.collect.ImmutableSet;
32
import com.google.errorprone.annotations.concurrent.GuardedBy;
33
import io.grpc.CallOptions;
34
import io.grpc.Channel;
35
import io.grpc.ClientCall;
36
import io.grpc.ClientInterceptor;
37
import io.grpc.ClientStreamTracer;
38
import io.grpc.ClientStreamTracer.StreamInfo;
39
import io.grpc.Deadline;
40
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
41
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
42
import io.grpc.Metadata;
43
import io.grpc.MethodDescriptor;
44
import io.grpc.ServerStreamTracer;
45
import io.grpc.Status;
46
import io.grpc.Status.Code;
47
import io.grpc.StreamTracer;
48
import io.opentelemetry.api.baggage.Baggage;
49
import io.opentelemetry.api.common.AttributesBuilder;
50
import io.opentelemetry.context.Context;
51
import java.util.ArrayList;
52
import java.util.Collection;
53
import java.util.Collections;
54
import java.util.List;
55
import java.util.concurrent.TimeUnit;
56
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
57
import java.util.concurrent.atomic.AtomicLong;
58
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
59
import java.util.logging.Level;
60
import java.util.logging.Logger;
61
import javax.annotation.Nullable;
62

63
/**
64
 * Provides factories for {@link StreamTracer} that records metrics to OpenTelemetry.
65
 *
66
 * <p>On the client-side, a factory is created for each call, and the factory creates a stream
67
 * tracer for each attempt. If there is no stream created when the call is ended, we still create a
68
 * tracer. It's the tracer that reports per-attempt stats, and the factory that reports the stats
69
 * of the overall RPC, such as RETRIES_PER_CALL, to OpenTelemetry.
70
 *
71
 * <p>On the server-side, there is only one ServerStream per each ServerCall, and ServerStream
72
 * starts earlier than the ServerCall. Therefore, only one tracer is created per stream/call, and
73
 * it's the tracer that reports the summary to OpenTelemetry.
74
 */
75
final class OpenTelemetryMetricsModule {
76
  private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName());
1✔
77
  public static final ImmutableSet<String> DEFAULT_PER_CALL_METRICS_SET =
1✔
78
      ImmutableSet.of(
1✔
79
          "grpc.client.attempt.started",
80
          "grpc.client.attempt.duration",
81
          "grpc.client.attempt.sent_total_compressed_message_size",
82
          "grpc.client.attempt.rcvd_total_compressed_message_size",
83
          "grpc.client.call.duration",
84
          "grpc.server.call.started",
85
          "grpc.server.call.duration",
86
          "grpc.server.call.sent_total_compressed_message_size",
87
          "grpc.server.call.rcvd_total_compressed_message_size");
88

89
  // Using floating point because TimeUnit.NANOSECONDS.toSeconds would discard
90
  // fractional seconds.
91
  private static final double SECONDS_PER_NANO = 1e-9;
92

93
  private final OpenTelemetryMetricsResource resource;
94
  private final Supplier<Stopwatch> stopwatchSupplier;
95
  private final boolean localityEnabled;
96
  private final boolean backendServiceEnabled;
97
  private final ImmutableList<OpenTelemetryPlugin> plugins;
98

99
  OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
100
                             OpenTelemetryMetricsResource resource,
101
                             Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins) {
1✔
102
    this.resource = checkNotNull(resource, "resource");
1✔
103
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
104
    this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
1✔
105
    this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey());
1✔
106
    this.plugins = ImmutableList.copyOf(plugins);
1✔
107
  }
1✔
108

109
  /**
110
   * Returns the server tracer factory.
111
   */
112
  ServerStreamTracer.Factory getServerTracerFactory() {
113
    return new ServerTracerFactory();
1✔
114
  }
115

116
  /**
117
   * Returns the client interceptor that facilitates OpenTelemetry metrics reporting.
118
   */
119
  ClientInterceptor getClientInterceptor(String target) {
120
    ImmutableList.Builder<OpenTelemetryPlugin> pluginBuilder =
1✔
121
        ImmutableList.builderWithExpectedSize(plugins.size());
1✔
122
    for (OpenTelemetryPlugin plugin : plugins) {
1✔
123
      if (plugin.enablePluginForChannel(target)) {
1✔
124
        pluginBuilder.add(plugin);
1✔
125
      }
126
    }
1✔
127
    return new MetricsClientInterceptor(target, pluginBuilder.build());
1✔
128
  }
129

130
  static String recordMethodName(String fullMethodName, boolean isGeneratedMethod) {
131
    return isGeneratedMethod ? fullMethodName : "other";
1✔
132
  }
133

134
  private static Context otelContextWithBaggage() {
135
    Baggage baggage = BAGGAGE_KEY.get();
1✔
136
    if (baggage == null) {
1✔
137
      return Context.current();
1✔
138
    }
139
    return Context.current().with(baggage);
1✔
140
  }
141

142
  private static final class ClientTracer extends ClientStreamTracer {
143
    @Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater;
144
    @Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater;
145

146
    /*
147
     * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
148
     * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fall back to
149
     * (potentially racy) direct updates of the volatile variables.
150
     */
151
    static {
152
      AtomicLongFieldUpdater<ClientTracer> tmpOutboundWireSizeUpdater;
153
      AtomicLongFieldUpdater<ClientTracer> tmpInboundWireSizeUpdater;
154
      try {
155
        tmpOutboundWireSizeUpdater =
1✔
156
            AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundWireSize");
1✔
157
        tmpInboundWireSizeUpdater =
1✔
158
            AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundWireSize");
1✔
159
      } catch (Throwable t) {
×
160
        logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
161
        tmpOutboundWireSizeUpdater = null;
×
162
        tmpInboundWireSizeUpdater = null;
×
163
      }
1✔
164
      outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
1✔
165
      inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
1✔
166
    }
1✔
167

168
    final Stopwatch stopwatch;
169
    final CallAttemptsTracerFactory attemptsState;
170
    final OpenTelemetryMetricsModule module;
171
    final StreamInfo info;
172
    final String target;
173
    final String fullMethodName;
174
    final List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins;
175
    volatile long outboundWireSize;
176
    volatile long inboundWireSize;
177
    volatile String locality;
178
    volatile String backendService;
179
    long attemptNanos;
180
    Code statusCode;
181

182
    ClientTracer(CallAttemptsTracerFactory attemptsState, OpenTelemetryMetricsModule module,
183
        StreamInfo info, String target, String fullMethodName,
184
        List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins) {
1✔
185
      this.attemptsState = attemptsState;
1✔
186
      this.module = module;
1✔
187
      this.info = info;
1✔
188
      this.target = target;
1✔
189
      this.fullMethodName = fullMethodName;
1✔
190
      this.streamPlugins = streamPlugins;
1✔
191
      this.stopwatch = module.stopwatchSupplier.get().start();
1✔
192
    }
1✔
193

194
    @Override
195
    public void inboundHeaders(Metadata headers) {
196
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
197
        plugin.inboundHeaders(headers);
1✔
198
      }
1✔
199
    }
1✔
200

201
    @Override
202
    @SuppressWarnings("NonAtomicVolatileUpdate")
203
    public void outboundWireSize(long bytes) {
204
      if (outboundWireSizeUpdater != null) {
1✔
205
        outboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
206
      } else {
207
        outboundWireSize += bytes;
×
208
      }
209
    }
1✔
210

211
    @Override
212
    @SuppressWarnings("NonAtomicVolatileUpdate")
213
    public void inboundWireSize(long bytes) {
214
      if (inboundWireSizeUpdater != null) {
1✔
215
        inboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
216
      } else {
217
        inboundWireSize += bytes;
×
218
      }
219
    }
1✔
220

221
    @Override
222
    public void addOptionalLabel(String key, String value) {
223
      if ("grpc.lb.locality".equals(key)) {
1✔
224
        locality = value;
1✔
225
      }
226
      if ("grpc.lb.backend_service".equals(key)) {
1✔
227
        backendService = value;
1✔
228
      }
229
    }
1✔
230

231
    @Override
232
    public void inboundTrailers(Metadata trailers) {
233
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
234
        plugin.inboundTrailers(trailers);
1✔
235
      }
1✔
236
    }
1✔
237

238
    @Override
239
    public void streamClosed(Status status) {
240
      stopwatch.stop();
1✔
241
      attemptNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
242
      Deadline deadline = info.getCallOptions().getDeadline();
1✔
243
      statusCode = status.getCode();
1✔
244
      if (statusCode == Code.CANCELLED && deadline != null) {
1✔
245
        // When the server's deadline expires, it can only reset the stream with CANCEL and no
246
        // description. Since our timer may be delayed in firing, we double-check the deadline and
247
        // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
248
        if (deadline.isExpired()) {
1✔
249
          statusCode = Code.DEADLINE_EXCEEDED;
1✔
250
        }
251
      }
252
      attemptsState.attemptEnded();
1✔
253
      recordFinishedAttempt();
1✔
254
    }
1✔
255

256
    void recordFinishedAttempt() {
257
      Context otelContext = otelContextWithBaggage();
1✔
258
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
259
          .put(METHOD_KEY, fullMethodName)
1✔
260
          .put(TARGET_KEY, target)
1✔
261
          .put(STATUS_KEY, statusCode.toString());
1✔
262
      if (module.localityEnabled) {
1✔
263
        String savedLocality = locality;
1✔
264
        if (savedLocality == null) {
1✔
265
          savedLocality = "";
1✔
266
        }
267
        builder.put(LOCALITY_KEY, savedLocality);
1✔
268
      }
269
      if (module.backendServiceEnabled) {
1✔
270
        String savedBackendService = backendService;
1✔
271
        if (savedBackendService == null) {
1✔
272
          savedBackendService = "";
1✔
273
        }
274
        builder.put(BACKEND_SERVICE_KEY, savedBackendService);
1✔
275
      }
276
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
277
        plugin.addLabels(builder);
1✔
278
      }
1✔
279
      io.opentelemetry.api.common.Attributes attribute = builder.build();
1✔
280

281
      if (module.resource.clientAttemptDurationCounter() != null ) {
1✔
282
        module.resource.clientAttemptDurationCounter()
1✔
283
            .record(attemptNanos * SECONDS_PER_NANO, attribute, otelContext);
1✔
284
      }
285
      if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) {
1✔
286
        module.resource.clientTotalSentCompressedMessageSizeCounter()
1✔
287
            .record(outboundWireSize, attribute, otelContext);
1✔
288
      }
289
      if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) {
1✔
290
        module.resource.clientTotalReceivedCompressedMessageSizeCounter()
1✔
291
            .record(inboundWireSize, attribute, otelContext);
1✔
292
      }
293
    }
1✔
294
  }
295

296
  @VisibleForTesting
297
  static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory {
298
    private final OpenTelemetryMetricsModule module;
299
    private final String target;
300
    private final Stopwatch attemptDelayStopwatch;
301
    private final Stopwatch callStopWatch;
302
    @GuardedBy("lock")
303
    private boolean callEnded;
304
    private final String fullMethodName;
305
    private final List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins;
306
    private Status status;
307
    private long retryDelayNanos;
308
    private long callLatencyNanos;
309
    private final Object lock = new Object();
1✔
310
    private final AtomicLong attemptsPerCall = new AtomicLong();
1✔
311
    private final AtomicLong hedgedAttemptsPerCall = new AtomicLong();
1✔
312
    private final AtomicLong transparentRetriesPerCall = new AtomicLong();
1✔
313
    @GuardedBy("lock")
314
    private int activeStreams;
315
    @GuardedBy("lock")
316
    private boolean finishedCallToBeRecorded;
317

318
    CallAttemptsTracerFactory(
319
        OpenTelemetryMetricsModule module,
320
        String target,
321
        String fullMethodName,
322
        List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins) {
1✔
323
      this.module = checkNotNull(module, "module");
1✔
324
      this.target = checkNotNull(target, "target");
1✔
325
      this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
1✔
326
      this.callPlugins = checkNotNull(callPlugins, "callPlugins");
1✔
327
      this.attemptDelayStopwatch = module.stopwatchSupplier.get();
1✔
328
      this.callStopWatch = module.stopwatchSupplier.get().start();
1✔
329

330
      io.opentelemetry.api.common.Attributes attribute = io.opentelemetry.api.common.Attributes.of(
1✔
331
          METHOD_KEY, fullMethodName,
332
          TARGET_KEY, target);
333

334
      // Record here in case mewClientStreamTracer() would never be called.
335
      if (module.resource.clientAttemptCountCounter() != null) {
1✔
336
        module.resource.clientAttemptCountCounter().add(1, attribute);
1✔
337
      }
338
    }
1✔
339

340
    @Override
341
    public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) {
342
      synchronized (lock) {
1✔
343
        if (finishedCallToBeRecorded) {
1✔
344
          // This can be the case when the call is cancelled but a retry attempt is created.
345
          return new ClientStreamTracer() {};
×
346
        }
347
        if (++activeStreams == 1 && attemptDelayStopwatch.isRunning()) {
1✔
348
          attemptDelayStopwatch.stop();
1✔
349
          retryDelayNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
350
        }
351
      }
1✔
352
      // Skip recording for the first time, since it is already recorded in
353
      // CallAttemptsTracerFactory constructor. attemptsPerCall will be non-zero after the first
354
      // attempt, as first attempt cannot be a transparent retry.
355
      if (attemptsPerCall.get() > 0) {
1✔
356
        io.opentelemetry.api.common.Attributes attribute =
1✔
357
            io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName,
1✔
358
                TARGET_KEY, target);
359
        if (module.resource.clientAttemptCountCounter() != null) {
1✔
360
          module.resource.clientAttemptCountCounter().add(1, attribute);
1✔
361
        }
362
      }
363
      if (info.isTransparentRetry()) {
1✔
364
        transparentRetriesPerCall.incrementAndGet();
1✔
365
      } else if (info.isHedging()) {
1✔
366
        hedgedAttemptsPerCall.incrementAndGet();
1✔
367
      } else {
368
        attemptsPerCall.incrementAndGet();
1✔
369
      }
370
      return newClientTracer(info);
1✔
371
    }
372

373
    private ClientTracer newClientTracer(StreamInfo info) {
374
      List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins = Collections.emptyList();
1✔
375
      if (!callPlugins.isEmpty()) {
1✔
376
        streamPlugins = new ArrayList<>(callPlugins.size());
1✔
377
        for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
378
          streamPlugins.add(plugin.newClientStreamPlugin());
1✔
379
        }
1✔
380
        streamPlugins = Collections.unmodifiableList(streamPlugins);
1✔
381
      }
382
      return new ClientTracer(this, module, info, target, fullMethodName, streamPlugins);
1✔
383
    }
384

385
    // Called whenever each attempt is ended.
386
    void attemptEnded() {
387
      boolean shouldRecordFinishedCall = false;
1✔
388
      synchronized (lock) {
1✔
389
        if (--activeStreams == 0) {
1✔
390
          attemptDelayStopwatch.start();
1✔
391
          if (callEnded && !finishedCallToBeRecorded) {
1✔
392
            shouldRecordFinishedCall = true;
×
393
            finishedCallToBeRecorded = true;
×
394
          }
395
        }
396
      }
1✔
397
      if (shouldRecordFinishedCall) {
1✔
398
        recordFinishedCall();
×
399
      }
400
    }
1✔
401

402
    void callEnded(Status status) {
403
      callStopWatch.stop();
1✔
404
      this.status = status;
1✔
405
      boolean shouldRecordFinishedCall = false;
1✔
406
      synchronized (lock) {
1✔
407
        if (callEnded) {
1✔
408
          // TODO(https://github.com/grpc/grpc-java/issues/7921): this shouldn't happen
409
          return;
×
410
        }
411
        callEnded = true;
1✔
412
        if (activeStreams == 0 && !finishedCallToBeRecorded) {
1✔
413
          shouldRecordFinishedCall = true;
1✔
414
          finishedCallToBeRecorded = true;
1✔
415
        }
416
      }
1✔
417
      if (shouldRecordFinishedCall) {
1✔
418
        recordFinishedCall();
1✔
419
      }
420
    }
1✔
421

422
    void recordFinishedCall() {
423
      Context otelContext = otelContextWithBaggage();
1✔
424
      if (attemptsPerCall.get() == 0) {
1✔
425
        ClientTracer tracer = newClientTracer(null);
1✔
426
        tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
427
        tracer.statusCode = status.getCode();
1✔
428
        tracer.recordFinishedAttempt();
1✔
429
      }
430
      callLatencyNanos = callStopWatch.elapsed(TimeUnit.NANOSECONDS);
1✔
431

432
      // Base attributes
433
      io.opentelemetry.api.common.Attributes baseAttributes =
1✔
434
          io.opentelemetry.api.common.Attributes.of(
1✔
435
              METHOD_KEY, fullMethodName,
436
              TARGET_KEY, target
437
          );
438

439
      // Duration
440
      if (module.resource.clientCallDurationCounter() != null) {
1✔
441
        module.resource.clientCallDurationCounter().record(
1✔
442
            callLatencyNanos * SECONDS_PER_NANO,
443
            baseAttributes.toBuilder()
1✔
444
                .put(STATUS_KEY, status.getCode().toString())
1✔
445
                .build(),
1✔
446
            otelContext
447
        );
448
      }
449

450
      // Retry counts
451
      if (module.resource.clientCallRetriesCounter() != null) {
1✔
452
        long retriesPerCall = Math.max(attemptsPerCall.get() - 1, 0);
1✔
453
        if (retriesPerCall > 0) {
1✔
454
          module.resource.clientCallRetriesCounter()
1✔
455
              .record(retriesPerCall, baseAttributes, otelContext);
1✔
456
        }
457
      }
458

459
      // Hedge counts
460
      if (module.resource.clientCallHedgesCounter() != null) {
1✔
461
        long hedges = hedgedAttemptsPerCall.get();
1✔
462
        if (hedges > 0) {
1✔
463
          module.resource.clientCallHedgesCounter()
1✔
464
              .record(hedges, baseAttributes, otelContext);
1✔
465
        }
466
      }
467

468
      // Transparent Retry counts
469
      if (module.resource.clientCallTransparentRetriesCounter() != null) {
1✔
470
        long transparentRetries = transparentRetriesPerCall.get();
1✔
471
        if (transparentRetries > 0) {
1✔
472
          module.resource.clientCallTransparentRetriesCounter()
1✔
473
              .record(transparentRetries, baseAttributes, otelContext);
1✔
474
        }
475
      }
476

477
      // Retry delay
478
      if (module.resource.clientCallRetryDelayCounter() != null) {
1✔
479
        module.resource.clientCallRetryDelayCounter().record(
1✔
480
            retryDelayNanos * SECONDS_PER_NANO,
481
            baseAttributes,
482
            otelContext
483
        );
484
      }
485
    }
1✔
486
  }
487

488
  private static final class ServerTracer extends ServerStreamTracer {
489
    @Nullable private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
490
    @Nullable private static final AtomicLongFieldUpdater<ServerTracer> outboundWireSizeUpdater;
491
    @Nullable private static final AtomicLongFieldUpdater<ServerTracer> inboundWireSizeUpdater;
492

493
    /*
494
     * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
495
     * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fall back to
496
     * (potentially racy) direct updates of the volatile variables.
497
     */
498
    static {
499
      AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
500
      AtomicLongFieldUpdater<ServerTracer> tmpOutboundWireSizeUpdater;
501
      AtomicLongFieldUpdater<ServerTracer> tmpInboundWireSizeUpdater;
502
      try {
503
        tmpStreamClosedUpdater =
1✔
504
            AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
1✔
505
        tmpOutboundWireSizeUpdater =
1✔
506
            AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundWireSize");
1✔
507
        tmpInboundWireSizeUpdater =
1✔
508
            AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundWireSize");
1✔
509
      } catch (Throwable t) {
×
510
        logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
511
        tmpStreamClosedUpdater = null;
×
512
        tmpOutboundWireSizeUpdater = null;
×
513
        tmpInboundWireSizeUpdater = null;
×
514
      }
1✔
515
      streamClosedUpdater = tmpStreamClosedUpdater;
1✔
516
      outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
1✔
517
      inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
1✔
518
    }
1✔
519

520
    private final OpenTelemetryMetricsModule module;
521
    private final String fullMethodName;
522
    private final List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins;
523
    private volatile boolean isGeneratedMethod;
524
    private volatile int streamClosed;
525
    private final Stopwatch stopwatch;
526
    private volatile long outboundWireSize;
527
    private volatile long inboundWireSize;
528

529
    ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName,
530
        List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins) {
1✔
531
      this.module = checkNotNull(module, "module");
1✔
532
      this.fullMethodName = fullMethodName;
1✔
533
      this.streamPlugins = checkNotNull(streamPlugins, "streamPlugins");
1✔
534
      this.stopwatch = module.stopwatchSupplier.get().start();
1✔
535
    }
1✔
536

537
    @Override
538
    public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
539
      // Only record method name as an attribute if isSampledToLocalTracing is set to true,
540
      // which is true for all generated methods. Otherwise, programmatically
541
      // created methods result in high cardinality metrics.
542
      boolean isSampledToLocalTracing = callInfo.getMethodDescriptor().isSampledToLocalTracing();
1✔
543
      isGeneratedMethod = isSampledToLocalTracing;
1✔
544
      io.opentelemetry.api.common.Attributes attribute =
1✔
545
          io.opentelemetry.api.common.Attributes.of(
1✔
546
              METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));
1✔
547

548
      if (module.resource.serverCallCountCounter() != null) {
1✔
549
        module.resource.serverCallCountCounter().add(1, attribute);
1✔
550
      }
551
    }
1✔
552

553
    @Override
554
    @SuppressWarnings("NonAtomicVolatileUpdate")
555
    public void outboundWireSize(long bytes) {
556
      if (outboundWireSizeUpdater != null) {
1✔
557
        outboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
558
      } else {
559
        outboundWireSize += bytes;
×
560
      }
561
    }
1✔
562

563
    @Override
564
    @SuppressWarnings("NonAtomicVolatileUpdate")
565
    public void inboundWireSize(long bytes) {
566
      if (inboundWireSizeUpdater != null) {
1✔
567
        inboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
568
      } else {
569
        inboundWireSize += bytes;
×
570
      }
571
    }
1✔
572

573
    /**
574
     * Record a finished stream and mark the current time as the end time.
575
     *
576
     * <p>Can be called from any thread without synchronization.  Calling it the second time or more
577
     * is a no-op.
578
     */
579
    @Override
580
    public void streamClosed(Status status) {
581
      Context otelContext = otelContextWithBaggage();
1✔
582
      if (streamClosedUpdater != null) {
1✔
583
        if (streamClosedUpdater.getAndSet(this, 1) != 0) {
1✔
584
          return;
×
585
        }
586
      } else {
587
        if (streamClosed != 0) {
×
588
          return;
×
589
        }
590
        streamClosed = 1;
×
591
      }
592
      stopwatch.stop();
1✔
593
      long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
594
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
595
          .put(METHOD_KEY, recordMethodName(fullMethodName, isGeneratedMethod))
1✔
596
          .put(STATUS_KEY, status.getCode().toString());
1✔
597
      for (OpenTelemetryPlugin.ServerStreamPlugin plugin : streamPlugins) {
1✔
598
        plugin.addLabels(builder);
1✔
599
      }
1✔
600
      io.opentelemetry.api.common.Attributes attributes = builder.build();
1✔
601

602
      if (module.resource.serverCallDurationCounter() != null) {
1✔
603
        module.resource.serverCallDurationCounter()
1✔
604
            .record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, otelContext);
1✔
605
      }
606
      if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) {
1✔
607
        module.resource.serverTotalSentCompressedMessageSizeCounter()
1✔
608
            .record(outboundWireSize, attributes, otelContext);
1✔
609
      }
610
      if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) {
1✔
611
        module.resource.serverTotalReceivedCompressedMessageSizeCounter()
1✔
612
            .record(inboundWireSize, attributes, otelContext);
1✔
613
      }
614
    }
1✔
615
  }
616

617
  @VisibleForTesting
618
  final class ServerTracerFactory extends ServerStreamTracer.Factory {
1✔
619
    @Override
620
    public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
621
      final List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins;
622
      if (plugins.isEmpty()) {
1✔
623
        streamPlugins = Collections.emptyList();
1✔
624
      } else {
625
        List<OpenTelemetryPlugin.ServerStreamPlugin> streamPluginsMutable =
1✔
626
            new ArrayList<>(plugins.size());
1✔
627
        for (OpenTelemetryPlugin plugin : plugins) {
1✔
628
          streamPluginsMutable.add(plugin.newServerStreamPlugin(headers));
1✔
629
        }
1✔
630
        streamPlugins = Collections.unmodifiableList(streamPluginsMutable);
1✔
631
      }
632
      return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins);
1✔
633
    }
634
  }
635

636
  @VisibleForTesting
637
  final class MetricsClientInterceptor implements ClientInterceptor {
638
    private final String target;
639
    private final ImmutableList<OpenTelemetryPlugin> plugins;
640

641
    MetricsClientInterceptor(String target, ImmutableList<OpenTelemetryPlugin> plugins) {
1✔
642
      this.target = checkNotNull(target, "target");
1✔
643
      this.plugins = checkNotNull(plugins, "plugins");
1✔
644
    }
1✔
645

646
    @Override
647
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
648
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
649
      final List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins;
650
      if (plugins.isEmpty()) {
1✔
651
        callPlugins = Collections.emptyList();
1✔
652
      } else {
653
        List<OpenTelemetryPlugin.ClientCallPlugin> callPluginsMutable =
1✔
654
            new ArrayList<>(plugins.size());
1✔
655
        for (OpenTelemetryPlugin plugin : plugins) {
1✔
656
          callPluginsMutable.add(plugin.newClientCallPlugin());
1✔
657
        }
1✔
658
        callPlugins = Collections.unmodifiableList(callPluginsMutable);
1✔
659
        for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
660
          callOptions = plugin.filterCallOptions(callOptions);
1✔
661
        }
1✔
662
      }
663
      // Only record method name as an attribute if isSampledToLocalTracing is set to true,
664
      // which is true for all generated methods. Otherwise, programatically
665
      // created methods result in high cardinality metrics.
666
      final CallAttemptsTracerFactory tracerFactory = new CallAttemptsTracerFactory(
1✔
667
          OpenTelemetryMetricsModule.this, target,
668
          recordMethodName(method.getFullMethodName(), method.isSampledToLocalTracing()),
1✔
669
          callPlugins);
670
      ClientCall<ReqT, RespT> call =
1✔
671
          next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
1✔
672
      return new SimpleForwardingClientCall<ReqT, RespT>(call) {
1✔
673
        @Override
674
        public void start(Listener<RespT> responseListener, Metadata headers) {
675
          for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
676
            plugin.addMetadata(headers);
1✔
677
          }
1✔
678
          delegate().start(
1✔
679
              new SimpleForwardingClientCallListener<RespT>(responseListener) {
1✔
680
                @Override
681
                public void onClose(Status status, Metadata trailers) {
682
                  tracerFactory.callEnded(status);
1✔
683
                  super.onClose(status, trailers);
1✔
684
                }
1✔
685
              },
686
              headers);
687
        }
1✔
688
      };
689
    }
690
  }
691
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc