• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19996

24 Sep 2025 12:08AM UTC coverage: 88.575% (+0.03%) from 88.543%
#19996

push

github

web-flow
Implement otel retry metrics (#12064)

implements [A96](https://github.com/grpc/proposal/pull/488/files)

34731 of 39211 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.57
/../opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.opentelemetry;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY;
21
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
22
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
23
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
24
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.TARGET_KEY;
25

26
import com.google.common.annotations.VisibleForTesting;
27
import com.google.common.base.Stopwatch;
28
import com.google.common.base.Supplier;
29
import com.google.common.collect.ImmutableList;
30
import com.google.common.collect.ImmutableSet;
31
import com.google.errorprone.annotations.concurrent.GuardedBy;
32
import io.grpc.CallOptions;
33
import io.grpc.Channel;
34
import io.grpc.ClientCall;
35
import io.grpc.ClientInterceptor;
36
import io.grpc.ClientStreamTracer;
37
import io.grpc.ClientStreamTracer.StreamInfo;
38
import io.grpc.Deadline;
39
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
40
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
41
import io.grpc.Metadata;
42
import io.grpc.MethodDescriptor;
43
import io.grpc.ServerStreamTracer;
44
import io.grpc.Status;
45
import io.grpc.Status.Code;
46
import io.grpc.StreamTracer;
47
import io.opentelemetry.api.common.AttributesBuilder;
48
import java.util.ArrayList;
49
import java.util.Collection;
50
import java.util.Collections;
51
import java.util.List;
52
import java.util.concurrent.TimeUnit;
53
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
54
import java.util.concurrent.atomic.AtomicLong;
55
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
56
import java.util.logging.Level;
57
import java.util.logging.Logger;
58
import javax.annotation.Nullable;
59

60
/**
61
 * Provides factories for {@link StreamTracer} that records metrics to OpenTelemetry.
62
 *
63
 * <p>On the client-side, a factory is created for each call, and the factory creates a stream
64
 * tracer for each attempt. If there is no stream created when the call is ended, we still create a
65
 * tracer. It's the tracer that reports per-attempt stats, and the factory that reports the stats
66
 * of the overall RPC, such as RETRIES_PER_CALL, to OpenTelemetry.
67
 *
68
 * <p>On the server-side, there is only one ServerStream per each ServerCall, and ServerStream
69
 * starts earlier than the ServerCall. Therefore, only one tracer is created per stream/call, and
70
 * it's the tracer that reports the summary to OpenTelemetry.
71
 */
72
final class OpenTelemetryMetricsModule {
73
  private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName());
1✔
74
  public static final ImmutableSet<String> DEFAULT_PER_CALL_METRICS_SET =
1✔
75
      ImmutableSet.of(
1✔
76
          "grpc.client.attempt.started",
77
          "grpc.client.attempt.duration",
78
          "grpc.client.attempt.sent_total_compressed_message_size",
79
          "grpc.client.attempt.rcvd_total_compressed_message_size",
80
          "grpc.client.call.duration",
81
          "grpc.server.call.started",
82
          "grpc.server.call.duration",
83
          "grpc.server.call.sent_total_compressed_message_size",
84
          "grpc.server.call.rcvd_total_compressed_message_size");
85

86
  // Using floating point because TimeUnit.NANOSECONDS.toSeconds would discard
87
  // fractional seconds.
88
  private static final double SECONDS_PER_NANO = 1e-9;
89

90
  private final OpenTelemetryMetricsResource resource;
91
  private final Supplier<Stopwatch> stopwatchSupplier;
92
  private final boolean localityEnabled;
93
  private final boolean backendServiceEnabled;
94
  private final ImmutableList<OpenTelemetryPlugin> plugins;
95

96
  OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
97
      OpenTelemetryMetricsResource resource, Collection<String> optionalLabels,
98
      List<OpenTelemetryPlugin> plugins) {
1✔
99
    this.resource = checkNotNull(resource, "resource");
1✔
100
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
101
    this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
1✔
102
    this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey());
1✔
103
    this.plugins = ImmutableList.copyOf(plugins);
1✔
104
  }
1✔
105

106
  /**
107
   * Returns the server tracer factory.
108
   */
109
  ServerStreamTracer.Factory getServerTracerFactory() {
110
    return new ServerTracerFactory();
1✔
111
  }
112

113
  /**
114
   * Returns the client interceptor that facilitates OpenTelemetry metrics reporting.
115
   */
116
  ClientInterceptor getClientInterceptor(String target) {
117
    ImmutableList.Builder<OpenTelemetryPlugin> pluginBuilder =
1✔
118
        ImmutableList.builderWithExpectedSize(plugins.size());
1✔
119
    for (OpenTelemetryPlugin plugin : plugins) {
1✔
120
      if (plugin.enablePluginForChannel(target)) {
1✔
121
        pluginBuilder.add(plugin);
1✔
122
      }
123
    }
1✔
124
    return new MetricsClientInterceptor(target, pluginBuilder.build());
1✔
125
  }
126

127
  static String recordMethodName(String fullMethodName, boolean isGeneratedMethod) {
128
    return isGeneratedMethod ? fullMethodName : "other";
1✔
129
  }
130

131
  private static final class ClientTracer extends ClientStreamTracer {
132
    @Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater;
133
    @Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater;
134

135
    /*
136
     * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
137
     * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fall back to
138
     * (potentially racy) direct updates of the volatile variables.
139
     */
140
    static {
141
      AtomicLongFieldUpdater<ClientTracer> tmpOutboundWireSizeUpdater;
142
      AtomicLongFieldUpdater<ClientTracer> tmpInboundWireSizeUpdater;
143
      try {
144
        tmpOutboundWireSizeUpdater =
1✔
145
            AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundWireSize");
1✔
146
        tmpInboundWireSizeUpdater =
1✔
147
            AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundWireSize");
1✔
148
      } catch (Throwable t) {
×
149
        logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
150
        tmpOutboundWireSizeUpdater = null;
×
151
        tmpInboundWireSizeUpdater = null;
×
152
      }
1✔
153
      outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
1✔
154
      inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
1✔
155
    }
1✔
156

157
    final Stopwatch stopwatch;
158
    final CallAttemptsTracerFactory attemptsState;
159
    final OpenTelemetryMetricsModule module;
160
    final StreamInfo info;
161
    final String target;
162
    final String fullMethodName;
163
    final List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins;
164
    volatile long outboundWireSize;
165
    volatile long inboundWireSize;
166
    volatile String locality;
167
    volatile String backendService;
168
    long attemptNanos;
169
    Code statusCode;
170

171
    ClientTracer(CallAttemptsTracerFactory attemptsState, OpenTelemetryMetricsModule module,
172
        StreamInfo info, String target, String fullMethodName,
173
        List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins) {
1✔
174
      this.attemptsState = attemptsState;
1✔
175
      this.module = module;
1✔
176
      this.info = info;
1✔
177
      this.target = target;
1✔
178
      this.fullMethodName = fullMethodName;
1✔
179
      this.streamPlugins = streamPlugins;
1✔
180
      this.stopwatch = module.stopwatchSupplier.get().start();
1✔
181
    }
1✔
182

183
    @Override
184
    public void inboundHeaders(Metadata headers) {
185
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
186
        plugin.inboundHeaders(headers);
1✔
187
      }
1✔
188
    }
1✔
189

190
    @Override
191
    @SuppressWarnings("NonAtomicVolatileUpdate")
192
    public void outboundWireSize(long bytes) {
193
      if (outboundWireSizeUpdater != null) {
1✔
194
        outboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
195
      } else {
196
        outboundWireSize += bytes;
×
197
      }
198
    }
1✔
199

200
    @Override
201
    @SuppressWarnings("NonAtomicVolatileUpdate")
202
    public void inboundWireSize(long bytes) {
203
      if (inboundWireSizeUpdater != null) {
1✔
204
        inboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
205
      } else {
206
        inboundWireSize += bytes;
×
207
      }
208
    }
1✔
209

210
    @Override
211
    public void addOptionalLabel(String key, String value) {
212
      if ("grpc.lb.locality".equals(key)) {
1✔
213
        locality = value;
1✔
214
      }
215
      if ("grpc.lb.backend_service".equals(key)) {
1✔
216
        backendService = value;
1✔
217
      }
218
    }
1✔
219

220
    @Override
221
    public void inboundTrailers(Metadata trailers) {
222
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
223
        plugin.inboundTrailers(trailers);
1✔
224
      }
1✔
225
    }
1✔
226

227
    @Override
228
    public void streamClosed(Status status) {
229
      stopwatch.stop();
1✔
230
      attemptNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
231
      Deadline deadline = info.getCallOptions().getDeadline();
1✔
232
      statusCode = status.getCode();
1✔
233
      if (statusCode == Code.CANCELLED && deadline != null) {
1✔
234
        // When the server's deadline expires, it can only reset the stream with CANCEL and no
235
        // description. Since our timer may be delayed in firing, we double-check the deadline and
236
        // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
237
        if (deadline.isExpired()) {
1✔
238
          statusCode = Code.DEADLINE_EXCEEDED;
1✔
239
        }
240
      }
241
      attemptsState.attemptEnded();
1✔
242
      recordFinishedAttempt();
1✔
243
    }
1✔
244

245
    void recordFinishedAttempt() {
246
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
247
          .put(METHOD_KEY, fullMethodName)
1✔
248
          .put(TARGET_KEY, target)
1✔
249
          .put(STATUS_KEY, statusCode.toString());
1✔
250
      if (module.localityEnabled) {
1✔
251
        String savedLocality = locality;
1✔
252
        if (savedLocality == null) {
1✔
253
          savedLocality = "";
1✔
254
        }
255
        builder.put(LOCALITY_KEY, savedLocality);
1✔
256
      }
257
      if (module.backendServiceEnabled) {
1✔
258
        String savedBackendService = backendService;
1✔
259
        if (savedBackendService == null) {
1✔
260
          savedBackendService = "";
1✔
261
        }
262
        builder.put(BACKEND_SERVICE_KEY, savedBackendService);
1✔
263
      }
264
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
265
        plugin.addLabels(builder);
1✔
266
      }
1✔
267
      io.opentelemetry.api.common.Attributes attribute = builder.build();
1✔
268

269
      if (module.resource.clientAttemptDurationCounter() != null ) {
1✔
270
        module.resource.clientAttemptDurationCounter()
1✔
271
            .record(attemptNanos * SECONDS_PER_NANO, attribute);
1✔
272
      }
273
      if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) {
1✔
274
        module.resource.clientTotalSentCompressedMessageSizeCounter()
1✔
275
            .record(outboundWireSize, attribute);
1✔
276
      }
277
      if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) {
1✔
278
        module.resource.clientTotalReceivedCompressedMessageSizeCounter()
1✔
279
            .record(inboundWireSize, attribute);
1✔
280
      }
281
    }
1✔
282
  }
283

284
  @VisibleForTesting
285
  static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory {
286
    private final OpenTelemetryMetricsModule module;
287
    private final String target;
288
    private final Stopwatch attemptDelayStopwatch;
289
    private final Stopwatch callStopWatch;
290
    @GuardedBy("lock")
291
    private boolean callEnded;
292
    private final String fullMethodName;
293
    private final List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins;
294
    private Status status;
295
    private long retryDelayNanos;
296
    private long callLatencyNanos;
297
    private final Object lock = new Object();
1✔
298
    private final AtomicLong attemptsPerCall = new AtomicLong();
1✔
299
    private final AtomicLong hedgedAttemptsPerCall = new AtomicLong();
1✔
300
    private final AtomicLong transparentRetriesPerCall = new AtomicLong();
1✔
301
    @GuardedBy("lock")
302
    private int activeStreams;
303
    @GuardedBy("lock")
304
    private boolean finishedCallToBeRecorded;
305

306
    CallAttemptsTracerFactory(
307
        OpenTelemetryMetricsModule module,
308
        String target,
309
        String fullMethodName,
310
        List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins) {
1✔
311
      this.module = checkNotNull(module, "module");
1✔
312
      this.target = checkNotNull(target, "target");
1✔
313
      this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
1✔
314
      this.callPlugins = checkNotNull(callPlugins, "callPlugins");
1✔
315
      this.attemptDelayStopwatch = module.stopwatchSupplier.get();
1✔
316
      this.callStopWatch = module.stopwatchSupplier.get().start();
1✔
317

318
      io.opentelemetry.api.common.Attributes attribute = io.opentelemetry.api.common.Attributes.of(
1✔
319
          METHOD_KEY, fullMethodName,
320
          TARGET_KEY, target);
321

322
      // Record here in case mewClientStreamTracer() would never be called.
323
      if (module.resource.clientAttemptCountCounter() != null) {
1✔
324
        module.resource.clientAttemptCountCounter().add(1, attribute);
1✔
325
      }
326
    }
1✔
327

328
    @Override
329
    public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) {
330
      synchronized (lock) {
1✔
331
        if (finishedCallToBeRecorded) {
1✔
332
          // This can be the case when the call is cancelled but a retry attempt is created.
333
          return new ClientStreamTracer() {};
×
334
        }
335
        if (++activeStreams == 1 && attemptDelayStopwatch.isRunning()) {
1✔
336
          attemptDelayStopwatch.stop();
1✔
337
          retryDelayNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
338
        }
339
      }
1✔
340
      // Skip recording for the first time, since it is already recorded in
341
      // CallAttemptsTracerFactory constructor. attemptsPerCall will be non-zero after the first
342
      // attempt, as first attempt cannot be a transparent retry.
343
      if (attemptsPerCall.get() > 0) {
1✔
344
        io.opentelemetry.api.common.Attributes attribute =
1✔
345
            io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName,
1✔
346
                TARGET_KEY, target);
347
        if (module.resource.clientAttemptCountCounter() != null) {
1✔
348
          module.resource.clientAttemptCountCounter().add(1, attribute);
1✔
349
        }
350
      }
351
      if (info.isTransparentRetry()) {
1✔
352
        transparentRetriesPerCall.incrementAndGet();
1✔
353
      } else if (info.isHedging()) {
1✔
354
        hedgedAttemptsPerCall.incrementAndGet();
1✔
355
      } else {
356
        attemptsPerCall.incrementAndGet();
1✔
357
      }
358
      return newClientTracer(info);
1✔
359
    }
360

361
    private ClientTracer newClientTracer(StreamInfo info) {
362
      List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins = Collections.emptyList();
1✔
363
      if (!callPlugins.isEmpty()) {
1✔
364
        streamPlugins = new ArrayList<>(callPlugins.size());
1✔
365
        for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
366
          streamPlugins.add(plugin.newClientStreamPlugin());
1✔
367
        }
1✔
368
        streamPlugins = Collections.unmodifiableList(streamPlugins);
1✔
369
      }
370
      return new ClientTracer(this, module, info, target, fullMethodName, streamPlugins);
1✔
371
    }
372

373
    // Called whenever each attempt is ended.
374
    void attemptEnded() {
375
      boolean shouldRecordFinishedCall = false;
1✔
376
      synchronized (lock) {
1✔
377
        if (--activeStreams == 0) {
1✔
378
          attemptDelayStopwatch.start();
1✔
379
          if (callEnded && !finishedCallToBeRecorded) {
1✔
380
            shouldRecordFinishedCall = true;
×
381
            finishedCallToBeRecorded = true;
×
382
          }
383
        }
384
      }
1✔
385
      if (shouldRecordFinishedCall) {
1✔
386
        recordFinishedCall();
×
387
      }
388
    }
1✔
389

390
    void callEnded(Status status) {
391
      callStopWatch.stop();
1✔
392
      this.status = status;
1✔
393
      boolean shouldRecordFinishedCall = false;
1✔
394
      synchronized (lock) {
1✔
395
        if (callEnded) {
1✔
396
          // TODO(https://github.com/grpc/grpc-java/issues/7921): this shouldn't happen
397
          return;
×
398
        }
399
        callEnded = true;
1✔
400
        if (activeStreams == 0 && !finishedCallToBeRecorded) {
1✔
401
          shouldRecordFinishedCall = true;
1✔
402
          finishedCallToBeRecorded = true;
1✔
403
        }
404
      }
1✔
405
      if (shouldRecordFinishedCall) {
1✔
406
        recordFinishedCall();
1✔
407
      }
408
    }
1✔
409

410
    void recordFinishedCall() {
411
      if (attemptsPerCall.get() == 0) {
1✔
412
        ClientTracer tracer = newClientTracer(null);
1✔
413
        tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
414
        tracer.statusCode = status.getCode();
1✔
415
        tracer.recordFinishedAttempt();
1✔
416
      }
417
      callLatencyNanos = callStopWatch.elapsed(TimeUnit.NANOSECONDS);
1✔
418

419
      // Base attributes
420
      io.opentelemetry.api.common.Attributes baseAttributes =
1✔
421
          io.opentelemetry.api.common.Attributes.of(
1✔
422
              METHOD_KEY, fullMethodName,
423
              TARGET_KEY, target
424
          );
425

426
      // Duration
427
      if (module.resource.clientCallDurationCounter() != null) {
1✔
428
        module.resource.clientCallDurationCounter().record(
1✔
429
            callLatencyNanos * SECONDS_PER_NANO,
430
            baseAttributes.toBuilder()
1✔
431
                .put(STATUS_KEY, status.getCode().toString())
1✔
432
                .build()
1✔
433
        );
434
      }
435

436
      // Retry counts
437
      if (module.resource.clientCallRetriesCounter() != null) {
1✔
438
        long retriesPerCall = Math.max(attemptsPerCall.get() - 1, 0);
1✔
439
        if (retriesPerCall > 0) {
1✔
440
          module.resource.clientCallRetriesCounter().record(retriesPerCall, baseAttributes);
1✔
441
        }
442
      }
443

444
      // Hedge counts
445
      if (module.resource.clientCallHedgesCounter() != null) {
1✔
446
        long hedges = hedgedAttemptsPerCall.get();
1✔
447
        if (hedges > 0) {
1✔
448
          module.resource.clientCallHedgesCounter()
1✔
449
              .record(hedges, baseAttributes);
1✔
450
        }
451
      }
452

453
      // Transparent Retry counts
454
      if (module.resource.clientCallTransparentRetriesCounter() != null) {
1✔
455
        long transparentRetries = transparentRetriesPerCall.get();
1✔
456
        if (transparentRetries > 0) {
1✔
457
          module.resource.clientCallTransparentRetriesCounter().record(
1✔
458
              transparentRetries, baseAttributes);
459
        }
460
      }
461

462
      // Retry delay
463
      if (module.resource.clientCallRetryDelayCounter() != null) {
1✔
464
        module.resource.clientCallRetryDelayCounter().record(
1✔
465
            retryDelayNanos * SECONDS_PER_NANO,
466
            baseAttributes
467
        );
468
      }
469
    }
1✔
470
  }
471

472
  private static final class ServerTracer extends ServerStreamTracer {
473
    @Nullable private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
474
    @Nullable private static final AtomicLongFieldUpdater<ServerTracer> outboundWireSizeUpdater;
475
    @Nullable private static final AtomicLongFieldUpdater<ServerTracer> inboundWireSizeUpdater;
476

477
    /*
478
     * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
479
     * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fall back to
480
     * (potentially racy) direct updates of the volatile variables.
481
     */
482
    static {
483
      AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
484
      AtomicLongFieldUpdater<ServerTracer> tmpOutboundWireSizeUpdater;
485
      AtomicLongFieldUpdater<ServerTracer> tmpInboundWireSizeUpdater;
486
      try {
487
        tmpStreamClosedUpdater =
1✔
488
            AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
1✔
489
        tmpOutboundWireSizeUpdater =
1✔
490
            AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundWireSize");
1✔
491
        tmpInboundWireSizeUpdater =
1✔
492
            AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundWireSize");
1✔
493
      } catch (Throwable t) {
×
494
        logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
495
        tmpStreamClosedUpdater = null;
×
496
        tmpOutboundWireSizeUpdater = null;
×
497
        tmpInboundWireSizeUpdater = null;
×
498
      }
1✔
499
      streamClosedUpdater = tmpStreamClosedUpdater;
1✔
500
      outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
1✔
501
      inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
1✔
502
    }
1✔
503

504
    private final OpenTelemetryMetricsModule module;
505
    private final String fullMethodName;
506
    private final List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins;
507
    private volatile boolean isGeneratedMethod;
508
    private volatile int streamClosed;
509
    private final Stopwatch stopwatch;
510
    private volatile long outboundWireSize;
511
    private volatile long inboundWireSize;
512

513
    ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName,
514
        List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins) {
1✔
515
      this.module = checkNotNull(module, "module");
1✔
516
      this.fullMethodName = fullMethodName;
1✔
517
      this.streamPlugins = checkNotNull(streamPlugins, "streamPlugins");
1✔
518
      this.stopwatch = module.stopwatchSupplier.get().start();
1✔
519
    }
1✔
520

521
    @Override
522
    public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
523
      // Only record method name as an attribute if isSampledToLocalTracing is set to true,
524
      // which is true for all generated methods. Otherwise, programmatically
525
      // created methods result in high cardinality metrics.
526
      boolean isSampledToLocalTracing = callInfo.getMethodDescriptor().isSampledToLocalTracing();
1✔
527
      isGeneratedMethod = isSampledToLocalTracing;
1✔
528
      io.opentelemetry.api.common.Attributes attribute =
1✔
529
          io.opentelemetry.api.common.Attributes.of(
1✔
530
              METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));
1✔
531

532
      if (module.resource.serverCallCountCounter() != null) {
1✔
533
        module.resource.serverCallCountCounter().add(1, attribute);
1✔
534
      }
535
    }
1✔
536

537
    @Override
538
    @SuppressWarnings("NonAtomicVolatileUpdate")
539
    public void outboundWireSize(long bytes) {
540
      if (outboundWireSizeUpdater != null) {
1✔
541
        outboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
542
      } else {
543
        outboundWireSize += bytes;
×
544
      }
545
    }
1✔
546

547
    @Override
548
    @SuppressWarnings("NonAtomicVolatileUpdate")
549
    public void inboundWireSize(long bytes) {
550
      if (inboundWireSizeUpdater != null) {
1✔
551
        inboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
552
      } else {
553
        inboundWireSize += bytes;
×
554
      }
555
    }
1✔
556

557
    /**
558
     * Record a finished stream and mark the current time as the end time.
559
     *
560
     * <p>Can be called from any thread without synchronization.  Calling it the second time or more
561
     * is a no-op.
562
     */
563
    @Override
564
    public void streamClosed(Status status) {
565
      if (streamClosedUpdater != null) {
1✔
566
        if (streamClosedUpdater.getAndSet(this, 1) != 0) {
1✔
567
          return;
×
568
        }
569
      } else {
570
        if (streamClosed != 0) {
×
571
          return;
×
572
        }
573
        streamClosed = 1;
×
574
      }
575
      stopwatch.stop();
1✔
576
      long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
577
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
578
          .put(METHOD_KEY, recordMethodName(fullMethodName, isGeneratedMethod))
1✔
579
          .put(STATUS_KEY, status.getCode().toString());
1✔
580
      for (OpenTelemetryPlugin.ServerStreamPlugin plugin : streamPlugins) {
1✔
581
        plugin.addLabels(builder);
1✔
582
      }
1✔
583
      io.opentelemetry.api.common.Attributes attributes = builder.build();
1✔
584

585
      if (module.resource.serverCallDurationCounter() != null) {
1✔
586
        module.resource.serverCallDurationCounter()
1✔
587
            .record(elapsedTimeNanos * SECONDS_PER_NANO, attributes);
1✔
588
      }
589
      if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) {
1✔
590
        module.resource.serverTotalSentCompressedMessageSizeCounter()
1✔
591
            .record(outboundWireSize, attributes);
1✔
592
      }
593
      if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) {
1✔
594
        module.resource.serverTotalReceivedCompressedMessageSizeCounter()
1✔
595
            .record(inboundWireSize, attributes);
1✔
596
      }
597
    }
1✔
598
  }
599

600
  @VisibleForTesting
601
  final class ServerTracerFactory extends ServerStreamTracer.Factory {
1✔
602
    @Override
603
    public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
604
      final List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins;
605
      if (plugins.isEmpty()) {
1✔
606
        streamPlugins = Collections.emptyList();
1✔
607
      } else {
608
        List<OpenTelemetryPlugin.ServerStreamPlugin> streamPluginsMutable =
1✔
609
            new ArrayList<>(plugins.size());
1✔
610
        for (OpenTelemetryPlugin plugin : plugins) {
1✔
611
          streamPluginsMutable.add(plugin.newServerStreamPlugin(headers));
1✔
612
        }
1✔
613
        streamPlugins = Collections.unmodifiableList(streamPluginsMutable);
1✔
614
      }
615
      return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins);
1✔
616
    }
617
  }
618

619
  @VisibleForTesting
620
  final class MetricsClientInterceptor implements ClientInterceptor {
621
    private final String target;
622
    private final ImmutableList<OpenTelemetryPlugin> plugins;
623

624
    MetricsClientInterceptor(String target, ImmutableList<OpenTelemetryPlugin> plugins) {
1✔
625
      this.target = checkNotNull(target, "target");
1✔
626
      this.plugins = checkNotNull(plugins, "plugins");
1✔
627
    }
1✔
628

629
    @Override
630
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
631
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
632
      final List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins;
633
      if (plugins.isEmpty()) {
1✔
634
        callPlugins = Collections.emptyList();
1✔
635
      } else {
636
        List<OpenTelemetryPlugin.ClientCallPlugin> callPluginsMutable =
1✔
637
            new ArrayList<>(plugins.size());
1✔
638
        for (OpenTelemetryPlugin plugin : plugins) {
1✔
639
          callPluginsMutable.add(plugin.newClientCallPlugin());
1✔
640
        }
1✔
641
        callPlugins = Collections.unmodifiableList(callPluginsMutable);
1✔
642
        for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
643
          callOptions = plugin.filterCallOptions(callOptions);
1✔
644
        }
1✔
645
      }
646
      // Only record method name as an attribute if isSampledToLocalTracing is set to true,
647
      // which is true for all generated methods. Otherwise, programatically
648
      // created methods result in high cardinality metrics.
649
      final CallAttemptsTracerFactory tracerFactory = new CallAttemptsTracerFactory(
1✔
650
          OpenTelemetryMetricsModule.this, target,
651
          recordMethodName(method.getFullMethodName(), method.isSampledToLocalTracing()),
1✔
652
          callPlugins);
653
      ClientCall<ReqT, RespT> call =
1✔
654
          next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
1✔
655
      return new SimpleForwardingClientCall<ReqT, RespT>(call) {
1✔
656
        @Override
657
        public void start(Listener<RespT> responseListener, Metadata headers) {
658
          for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
659
            plugin.addMetadata(headers);
1✔
660
          }
1✔
661
          delegate().start(
1✔
662
              new SimpleForwardingClientCallListener<RespT>(responseListener) {
1✔
663
                @Override
664
                public void onClose(Status status, Metadata trailers) {
665
                  tracerFactory.callEnded(status);
1✔
666
                  super.onClose(status, trailers);
1✔
667
                }
1✔
668
              },
669
              headers);
670
        }
1✔
671
      };
672
    }
673
  }
674
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc