• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20272

07 May 2026 12:34PM UTC coverage: 88.836% (+0.008%) from 88.828%
#20272

push

github

web-flow
core,opentelemetry: Fix server metric labels on early close (#12774)

This addresses the server-side OpenTelemetry metric labeling bug from
#12117 where a generated method can be recorded as `grpc.method="other"`
if `streamClosed()` happens before `serverCallStarted()`.

### What changed

- add an internal `StatsTraceContext.ServerCallMethodListener` hook so
tracers can consume an already-resolved primary-registry
`MethodDescriptor`
- resolve the immutable internal primary registry on the transport path
and seed method classification before the async `MethodLookup` path runs
- keep fallback registry lookup on the existing async path
- update the OpenTelemetry server tracer to use the early-resolved
method classification for close metrics

### Why this shape

- avoids tracer-side `HandlerRegistry` lookup
- uses only the immutable internal primary registry for early
transport-path lookup
- keeps fallback registry lookup on the existing async path

### Tests

- primary generated method: early close preserves the generated method
name
- primary non-generated method: early close still records `other`
- fallback generated method: fallback lookup remains on the existing
async path and does not introduce early transport-path classification
- tracer-level regression: `serverCallMethodResolved()` +
`streamClosed()` records the generated method name without waiting for
`serverCallStarted()`

### Notes

- `ServerCallMethodListener` is an internal hook that carries the
resolved `MethodDescriptor`; tracers consume the resolved result instead
of performing registry lookup themselves
- `ServerImpl` uses `InternalHandlerRegistry` explicitly for the primary
registry to make it clear that the early transport- path lookup is
limited to the immutable internal primary registry
- this PR intentionally does not widen transport-path lookup to the
fallback registry

Ref #12117

36260 of 40817 relevant lines covered (88.84%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.49
/../opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.opentelemetry;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY;
21
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY;
22
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.CUSTOM_LABEL_KEY;
23
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
24
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
25
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
26
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.TARGET_KEY;
27

28
import com.google.common.annotations.VisibleForTesting;
29
import com.google.common.base.Stopwatch;
30
import com.google.common.base.Supplier;
31
import com.google.common.collect.ImmutableList;
32
import com.google.common.collect.ImmutableSet;
33
import com.google.errorprone.annotations.concurrent.GuardedBy;
34
import io.grpc.CallOptions;
35
import io.grpc.Channel;
36
import io.grpc.ClientCall;
37
import io.grpc.ClientInterceptor;
38
import io.grpc.ClientStreamTracer;
39
import io.grpc.ClientStreamTracer.StreamInfo;
40
import io.grpc.Deadline;
41
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
42
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
43
import io.grpc.Grpc;
44
import io.grpc.Metadata;
45
import io.grpc.MethodDescriptor;
46
import io.grpc.ServerStreamTracer;
47
import io.grpc.Status;
48
import io.grpc.Status.Code;
49
import io.grpc.StreamTracer;
50
import io.grpc.internal.StatsTraceContext.ServerCallMethodListener;
51
import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter;
52
import io.opentelemetry.api.baggage.Baggage;
53
import io.opentelemetry.api.common.AttributesBuilder;
54
import io.opentelemetry.context.Context;
55
import java.util.ArrayList;
56
import java.util.Collection;
57
import java.util.Collections;
58
import java.util.List;
59
import java.util.concurrent.TimeUnit;
60
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
61
import java.util.concurrent.atomic.AtomicLong;
62
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
63
import java.util.logging.Level;
64
import java.util.logging.Logger;
65
import javax.annotation.Nullable;
66

67
/**
68
 * Provides factories for {@link StreamTracer} that records metrics to OpenTelemetry.
69
 *
70
 * <p>On the client-side, a factory is created for each call, and the factory creates a stream
71
 * tracer for each attempt. If there is no stream created when the call is ended, we still create a
72
 * tracer. It's the tracer that reports per-attempt stats, and the factory that reports the stats
73
 * of the overall RPC, such as RETRIES_PER_CALL, to OpenTelemetry.
74
 *
75
 * <p>This module optionally applies a target attribute filter to limit the cardinality of
76
 * the {@code grpc.target} attribute in client-side metrics by mapping disallowed targets
77
 * to a stable placeholder value.
78
 *
79
 * <p>On the server-side, there is only one ServerStream per each ServerCall, and ServerStream
80
 * starts earlier than the ServerCall. Therefore, only one tracer is created per stream/call, and
81
 * it's the tracer that reports the summary to OpenTelemetry.
82
 */
83
final class OpenTelemetryMetricsModule {
84
  private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName());
1✔
85
  public static final ImmutableSet<String> DEFAULT_PER_CALL_METRICS_SET =
1✔
86
      ImmutableSet.of(
1✔
87
          "grpc.client.attempt.started",
88
          "grpc.client.attempt.duration",
89
          "grpc.client.attempt.sent_total_compressed_message_size",
90
          "grpc.client.attempt.rcvd_total_compressed_message_size",
91
          "grpc.client.call.duration",
92
          "grpc.server.call.started",
93
          "grpc.server.call.duration",
94
          "grpc.server.call.sent_total_compressed_message_size",
95
          "grpc.server.call.rcvd_total_compressed_message_size");
96

97
  // Using floating point because TimeUnit.NANOSECONDS.toSeconds would discard
98
  // fractional seconds.
99
  private static final double SECONDS_PER_NANO = 1e-9;
100

101
  private final OpenTelemetryMetricsResource resource;
102
  private final Supplier<Stopwatch> stopwatchSupplier;
103
  private final boolean localityEnabled;
104
  private final boolean backendServiceEnabled;
105
  private final boolean customLabelEnabled;
106
  private final ImmutableList<OpenTelemetryPlugin> plugins;
107
  @Nullable
108
  private final TargetFilter targetAttributeFilter;
109

110
  OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
111
                             OpenTelemetryMetricsResource resource,
112
                             Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins) {
113
    this(stopwatchSupplier, resource, optionalLabels, plugins, null);
1✔
114
  }
1✔
115

116
  OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
117
      OpenTelemetryMetricsResource resource,
118
      Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins,
119
      @Nullable TargetFilter targetAttributeFilter) {
1✔
120
    this.resource = checkNotNull(resource, "resource");
1✔
121
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
122
    this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
1✔
123
    this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey());
1✔
124
    this.customLabelEnabled = optionalLabels.contains(CUSTOM_LABEL_KEY.getKey());
1✔
125
    this.plugins = ImmutableList.copyOf(plugins);
1✔
126
    this.targetAttributeFilter = targetAttributeFilter;
1✔
127
  }
1✔
128

129
  @VisibleForTesting
130
  TargetFilter getTargetAttributeFilter() {
131
    return targetAttributeFilter;
1✔
132
  }
133

134
  /**
135
   * Returns the server tracer factory.
136
   */
137
  ServerStreamTracer.Factory getServerTracerFactory() {
138
    return new ServerTracerFactory();
1✔
139
  }
140

141
  /**
142
   * Returns the client interceptor that facilitates OpenTelemetry metrics reporting.
143
   */
144
  ClientInterceptor getClientInterceptor(String target) {
145
    ImmutableList.Builder<OpenTelemetryPlugin> pluginBuilder =
1✔
146
        ImmutableList.builderWithExpectedSize(plugins.size());
1✔
147
    for (OpenTelemetryPlugin plugin : plugins) {
1✔
148
      if (plugin.enablePluginForChannel(target)) {
1✔
149
        pluginBuilder.add(plugin);
1✔
150
      }
151
    }
1✔
152
    String filteredTarget = recordTarget(target);
1✔
153
    return new MetricsClientInterceptor(filteredTarget, pluginBuilder.build());
1✔
154
  }
155

156
  String recordTarget(String target) {
157
    if (targetAttributeFilter == null || target == null) {
1✔
158
      return target;
1✔
159
    }
160
    return targetAttributeFilter.test(target) ? target : "other";
1✔
161
  }
162

163
  static String recordMethodName(String fullMethodName, boolean isGeneratedMethod) {
164
    return isGeneratedMethod ? fullMethodName : "other";
1✔
165
  }
166

167
  private static final class ClientTracer extends ClientStreamTracer {
168
    @Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater;
169
    @Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater;
170

171
    /*
172
     * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
173
     * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fall back to
174
     * (potentially racy) direct updates of the volatile variables.
175
     */
176
    static {
177
      AtomicLongFieldUpdater<ClientTracer> tmpOutboundWireSizeUpdater;
178
      AtomicLongFieldUpdater<ClientTracer> tmpInboundWireSizeUpdater;
179
      try {
180
        tmpOutboundWireSizeUpdater =
1✔
181
            AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundWireSize");
1✔
182
        tmpInboundWireSizeUpdater =
1✔
183
            AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundWireSize");
1✔
184
      } catch (Throwable t) {
×
185
        logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
186
        tmpOutboundWireSizeUpdater = null;
×
187
        tmpInboundWireSizeUpdater = null;
×
188
      }
1✔
189
      outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
1✔
190
      inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
1✔
191
    }
1✔
192

193
    final Stopwatch stopwatch;
194
    final CallAttemptsTracerFactory attemptsState;
195
    final OpenTelemetryMetricsModule module;
196
    final StreamInfo info;
197
    final String target;
198
    final String fullMethodName;
199
    final List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins;
200
    volatile long outboundWireSize;
201
    volatile long inboundWireSize;
202
    volatile String locality;
203
    volatile String backendService;
204
    long attemptNanos;
205
    Code statusCode;
206

207
    ClientTracer(CallAttemptsTracerFactory attemptsState, OpenTelemetryMetricsModule module,
208
        StreamInfo info, String target, String fullMethodName,
209
        List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins) {
1✔
210
      this.attemptsState = attemptsState;
1✔
211
      this.module = module;
1✔
212
      this.info = info;
1✔
213
      this.target = target;
1✔
214
      this.fullMethodName = fullMethodName;
1✔
215
      this.streamPlugins = streamPlugins;
1✔
216
      this.stopwatch = module.stopwatchSupplier.get().start();
1✔
217
    }
1✔
218

219
    @Override
220
    public void inboundHeaders(Metadata headers) {
221
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
222
        plugin.inboundHeaders(headers);
1✔
223
      }
1✔
224
    }
1✔
225

226
    @Override
227
    @SuppressWarnings("NonAtomicVolatileUpdate")
228
    public void outboundWireSize(long bytes) {
229
      if (outboundWireSizeUpdater != null) {
1✔
230
        outboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
231
      } else {
232
        outboundWireSize += bytes;
×
233
      }
234
    }
1✔
235

236
    @Override
237
    @SuppressWarnings("NonAtomicVolatileUpdate")
238
    public void inboundWireSize(long bytes) {
239
      if (inboundWireSizeUpdater != null) {
1✔
240
        inboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
241
      } else {
242
        inboundWireSize += bytes;
×
243
      }
244
    }
1✔
245

246
    @Override
247
    public void addOptionalLabel(String key, String value) {
248
      if ("grpc.lb.locality".equals(key)) {
1✔
249
        locality = value;
1✔
250
      }
251
      if ("grpc.lb.backend_service".equals(key)) {
1✔
252
        backendService = value;
1✔
253
      }
254
    }
1✔
255

256
    @Override
257
    public void inboundTrailers(Metadata trailers) {
258
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
259
        plugin.inboundTrailers(trailers);
1✔
260
      }
1✔
261
    }
1✔
262

263
    @Override
264
    public void streamClosed(Status status) {
265
      stopwatch.stop();
1✔
266
      attemptNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
267
      Deadline deadline = info.getCallOptions().getDeadline();
1✔
268
      statusCode = status.getCode();
1✔
269
      if (statusCode == Code.CANCELLED && deadline != null) {
1✔
270
        // When the server's deadline expires, it can only reset the stream with CANCEL and no
271
        // description. Since our timer may be delayed in firing, we double-check the deadline and
272
        // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
273
        if (deadline.isExpired()) {
1✔
274
          statusCode = Code.DEADLINE_EXCEEDED;
1✔
275
        }
276
      }
277
      attemptsState.attemptEnded(info.getCallOptions());
1✔
278
      recordFinishedAttempt();
1✔
279
    }
1✔
280

281
    void recordFinishedAttempt() {
282
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
283
          .put(METHOD_KEY, fullMethodName)
1✔
284
          .put(TARGET_KEY, target)
1✔
285
          .put(STATUS_KEY, statusCode.toString());
1✔
286
      if (module.localityEnabled) {
1✔
287
        String savedLocality = locality;
1✔
288
        if (savedLocality == null) {
1✔
289
          savedLocality = "";
1✔
290
        }
291
        builder.put(LOCALITY_KEY, savedLocality);
1✔
292
      }
293
      if (module.backendServiceEnabled) {
1✔
294
        String savedBackendService = backendService;
1✔
295
        if (savedBackendService == null) {
1✔
296
          savedBackendService = "";
1✔
297
        }
298
        builder.put(BACKEND_SERVICE_KEY, savedBackendService);
1✔
299
      }
300
      if (module.customLabelEnabled) {
1✔
301
        builder.put(
1✔
302
            CUSTOM_LABEL_KEY, info.getCallOptions().getOption(Grpc.CALL_OPTION_CUSTOM_LABEL));
1✔
303
      }
304
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
305
        plugin.addLabels(builder);
1✔
306
      }
1✔
307
      io.opentelemetry.api.common.Attributes attribute = builder.build();
1✔
308

309
      if (module.resource.clientAttemptDurationCounter() != null ) {
1✔
310
        module.resource.clientAttemptDurationCounter()
1✔
311
            .record(attemptNanos * SECONDS_PER_NANO, attribute, attemptsState.otelContext);
1✔
312
      }
313
      if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) {
1✔
314
        module.resource.clientTotalSentCompressedMessageSizeCounter()
1✔
315
            .record(outboundWireSize, attribute, attemptsState.otelContext);
1✔
316
      }
317
      if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) {
1✔
318
        module.resource.clientTotalReceivedCompressedMessageSizeCounter()
1✔
319
            .record(inboundWireSize, attribute, attemptsState.otelContext);
1✔
320
      }
321
    }
1✔
322
  }
323

324
  @VisibleForTesting
325
  static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory {
326
    private final OpenTelemetryMetricsModule module;
327
    private final String target;
328
    private final Stopwatch attemptDelayStopwatch;
329
    private final Stopwatch callStopWatch;
330
    @GuardedBy("lock")
331
    private boolean callEnded;
332
    private final String fullMethodName;
333
    private final List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins;
334
    private final Context otelContext;
335
    private Status status;
336
    private long retryDelayNanos;
337
    private long callLatencyNanos;
338
    private final Object lock = new Object();
1✔
339
    private final AtomicLong attemptsPerCall = new AtomicLong();
1✔
340
    private final AtomicLong hedgedAttemptsPerCall = new AtomicLong();
1✔
341
    private final AtomicLong transparentRetriesPerCall = new AtomicLong();
1✔
342
    @GuardedBy("lock")
343
    private int activeStreams;
344
    @GuardedBy("lock")
345
    private boolean finishedCallToBeRecorded;
346

347
    CallAttemptsTracerFactory(
348
        OpenTelemetryMetricsModule module,
349
        String target,
350
        CallOptions callOptions,
351
        String fullMethodName,
352
        List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins, Context otelContext) {
1✔
353
      this.module = checkNotNull(module, "module");
1✔
354
      this.target = checkNotNull(target, "target");
1✔
355
      this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
1✔
356
      this.callPlugins = checkNotNull(callPlugins, "callPlugins");
1✔
357
      this.otelContext = checkNotNull(otelContext, "otelContext");
1✔
358
      this.attemptDelayStopwatch = module.stopwatchSupplier.get();
1✔
359
      this.callStopWatch = module.stopwatchSupplier.get().start();
1✔
360

361
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
362
          .put(METHOD_KEY, fullMethodName)
1✔
363
          .put(TARGET_KEY, target);
1✔
364
      if (module.customLabelEnabled) {
1✔
365
        builder.put(
1✔
366
            CUSTOM_LABEL_KEY, callOptions.getOption(Grpc.CALL_OPTION_CUSTOM_LABEL));
1✔
367
      }
368
      io.opentelemetry.api.common.Attributes attribute = builder.build();
1✔
369

370
      // Record here in case mewClientStreamTracer() would never be called.
371
      if (module.resource.clientAttemptCountCounter() != null) {
1✔
372
        module.resource.clientAttemptCountCounter().add(1, attribute, otelContext);
1✔
373
      }
374
    }
1✔
375

376
    @Override
377
    public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) {
378
      synchronized (lock) {
1✔
379
        if (finishedCallToBeRecorded) {
1✔
380
          // This can be the case when the call is cancelled but a retry attempt is created.
381
          return new ClientStreamTracer() {};
×
382
        }
383
        if (++activeStreams == 1 && attemptDelayStopwatch.isRunning()) {
1✔
384
          attemptDelayStopwatch.stop();
1✔
385
          retryDelayNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
386
        }
387
      }
1✔
388
      // Skip recording for the first time, since it is already recorded in
389
      // CallAttemptsTracerFactory constructor. attemptsPerCall will be non-zero after the first
390
      // attempt, as first attempt cannot be a transparent retry.
391
      if (attemptsPerCall.get() > 0) {
1✔
392
        AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
393
            .put(METHOD_KEY, fullMethodName)
1✔
394
            .put(TARGET_KEY, target);
1✔
395
        if (module.customLabelEnabled) {
1✔
396
          builder.put(
1✔
397
              CUSTOM_LABEL_KEY, info.getCallOptions().getOption(Grpc.CALL_OPTION_CUSTOM_LABEL));
1✔
398
        }
399
        io.opentelemetry.api.common.Attributes attribute = builder.build();
1✔
400
        if (module.resource.clientAttemptCountCounter() != null) {
1✔
401
          module.resource.clientAttemptCountCounter().add(1, attribute, otelContext);
1✔
402
        }
403
      }
404
      if (info.isTransparentRetry()) {
1✔
405
        transparentRetriesPerCall.incrementAndGet();
1✔
406
      } else if (info.isHedging()) {
1✔
407
        hedgedAttemptsPerCall.incrementAndGet();
1✔
408
      } else {
409
        attemptsPerCall.incrementAndGet();
1✔
410
      }
411
      return newClientTracer(info);
1✔
412
    }
413

414
    private ClientTracer newClientTracer(StreamInfo info) {
415
      List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins = Collections.emptyList();
1✔
416
      if (!callPlugins.isEmpty()) {
1✔
417
        streamPlugins = new ArrayList<>(callPlugins.size());
1✔
418
        for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
419
          streamPlugins.add(plugin.newClientStreamPlugin());
1✔
420
        }
1✔
421
        streamPlugins = Collections.unmodifiableList(streamPlugins);
1✔
422
      }
423
      return new ClientTracer(this, module, info, target, fullMethodName, streamPlugins);
1✔
424
    }
425

426
    // Called whenever each attempt is ended.
427
    void attemptEnded(CallOptions callOptions) {
428
      boolean shouldRecordFinishedCall = false;
1✔
429
      synchronized (lock) {
1✔
430
        if (--activeStreams == 0) {
1✔
431
          attemptDelayStopwatch.start();
1✔
432
          if (callEnded && !finishedCallToBeRecorded) {
1✔
433
            shouldRecordFinishedCall = true;
×
434
            finishedCallToBeRecorded = true;
×
435
          }
436
        }
437
      }
1✔
438
      if (shouldRecordFinishedCall) {
1✔
439
        recordFinishedCall(callOptions);
×
440
      }
441
    }
1✔
442

443
    void callEnded(Status status, CallOptions callOptions) {
444
      callStopWatch.stop();
1✔
445
      this.status = status;
1✔
446
      boolean shouldRecordFinishedCall = false;
1✔
447
      synchronized (lock) {
1✔
448
        if (callEnded) {
1✔
449
          // TODO(https://github.com/grpc/grpc-java/issues/7921): this shouldn't happen
450
          return;
×
451
        }
452
        callEnded = true;
1✔
453
        if (activeStreams == 0 && !finishedCallToBeRecorded) {
1✔
454
          shouldRecordFinishedCall = true;
1✔
455
          finishedCallToBeRecorded = true;
1✔
456
        }
457
      }
1✔
458
      if (shouldRecordFinishedCall) {
1✔
459
        recordFinishedCall(callOptions);
1✔
460
      }
461
    }
1✔
462

463
    void recordFinishedCall(CallOptions callOptions) {
464
      if (attemptsPerCall.get() == 0) {
1✔
465
        ClientTracer tracer = newClientTracer(null);
1✔
466
        tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
467
        tracer.statusCode = status.getCode();
1✔
468
        tracer.recordFinishedAttempt();
1✔
469
      }
470
      callLatencyNanos = callStopWatch.elapsed(TimeUnit.NANOSECONDS);
1✔
471

472
      // Base attributes
473
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
474
          .put(METHOD_KEY, fullMethodName)
1✔
475
          .put(TARGET_KEY, target);
1✔
476
      if (module.customLabelEnabled) {
1✔
477
        builder.put(CUSTOM_LABEL_KEY, callOptions.getOption(Grpc.CALL_OPTION_CUSTOM_LABEL));
1✔
478
      }
479
      io.opentelemetry.api.common.Attributes baseAttributes = builder.build();
1✔
480

481
      // Duration
482
      if (module.resource.clientCallDurationCounter() != null) {
1✔
483
        module.resource.clientCallDurationCounter().record(
1✔
484
            callLatencyNanos * SECONDS_PER_NANO,
485
            baseAttributes.toBuilder()
1✔
486
                .put(STATUS_KEY, status.getCode().toString())
1✔
487
                .build(),
1✔
488
            otelContext
489
        );
490
      }
491

492
      // Retry counts
493
      if (module.resource.clientCallRetriesCounter() != null) {
1✔
494
        long retriesPerCall = Math.max(attemptsPerCall.get() - 1, 0);
1✔
495
        if (retriesPerCall > 0) {
1✔
496
          module.resource.clientCallRetriesCounter()
1✔
497
              .record(retriesPerCall, baseAttributes, otelContext);
1✔
498
        }
499
      }
500

501
      // Hedge counts
502
      if (module.resource.clientCallHedgesCounter() != null) {
1✔
503
        long hedges = hedgedAttemptsPerCall.get();
1✔
504
        if (hedges > 0) {
1✔
505
          module.resource.clientCallHedgesCounter()
1✔
506
              .record(hedges, baseAttributes, otelContext);
1✔
507
        }
508
      }
509

510
      // Transparent Retry counts
511
      if (module.resource.clientCallTransparentRetriesCounter() != null) {
1✔
512
        long transparentRetries = transparentRetriesPerCall.get();
1✔
513
        if (transparentRetries > 0) {
1✔
514
          module.resource.clientCallTransparentRetriesCounter()
1✔
515
              .record(transparentRetries, baseAttributes, otelContext);
1✔
516
        }
517
      }
518

519
      // Retry delay
520
      if (module.resource.clientCallRetryDelayCounter() != null) {
1✔
521
        module.resource.clientCallRetryDelayCounter().record(
1✔
522
            retryDelayNanos * SECONDS_PER_NANO,
523
            baseAttributes,
524
            otelContext
525
        );
526
      }
527
    }
1✔
528
  }
529

530
  private static final class ServerTracer extends ServerStreamTracer
531
      implements ServerCallMethodListener {
532
    @Nullable private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
533
    @Nullable private static final AtomicLongFieldUpdater<ServerTracer> outboundWireSizeUpdater;
534
    @Nullable private static final AtomicLongFieldUpdater<ServerTracer> inboundWireSizeUpdater;
535

536
    /*
537
     * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
538
     * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fall back to
539
     * (potentially racy) direct updates of the volatile variables.
540
     */
541
    static {
542
      AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
543
      AtomicLongFieldUpdater<ServerTracer> tmpOutboundWireSizeUpdater;
544
      AtomicLongFieldUpdater<ServerTracer> tmpInboundWireSizeUpdater;
545
      try {
546
        tmpStreamClosedUpdater =
1✔
547
            AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
1✔
548
        tmpOutboundWireSizeUpdater =
1✔
549
            AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundWireSize");
1✔
550
        tmpInboundWireSizeUpdater =
1✔
551
            AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundWireSize");
1✔
552
      } catch (Throwable t) {
×
553
        logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
554
        tmpStreamClosedUpdater = null;
×
555
        tmpOutboundWireSizeUpdater = null;
×
556
        tmpInboundWireSizeUpdater = null;
×
557
      }
1✔
558
      streamClosedUpdater = tmpStreamClosedUpdater;
1✔
559
      outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
1✔
560
      inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
1✔
561
    }
1✔
562

563
    private final OpenTelemetryMetricsModule module;
564
    private final String fullMethodName;
565
    private final List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins;
566
    private Context otelContext = Context.root();
1✔
567
    private volatile boolean isGeneratedMethod;
568
    private volatile int streamClosed;
569
    private final Stopwatch stopwatch;
570
    private volatile long outboundWireSize;
571
    private volatile long inboundWireSize;
572

573
    ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName,
574
        List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins) {
1✔
575
      this.module = checkNotNull(module, "module");
1✔
576
      this.fullMethodName = fullMethodName;
1✔
577
      this.streamPlugins = checkNotNull(streamPlugins, "streamPlugins");
1✔
578
      this.stopwatch = module.stopwatchSupplier.get().start();
1✔
579
    }
1✔
580

581
    @Override
582
    public io.grpc.Context filterContext(io.grpc.Context context) {
583
      Baggage baggage = BAGGAGE_KEY.get(context);
1✔
584
      if (baggage != null) {
1✔
585
        otelContext = Context.current().with(baggage);
1✔
586
      } else {
587
        otelContext = Context.current();
1✔
588
      }
589
      return context;
1✔
590
    }
591

592
    @Override
593
    public void serverCallMethodResolved(MethodDescriptor<?, ?> method) {
594
      isGeneratedMethod = method.isSampledToLocalTracing();
1✔
595
    }
1✔
596

597
    @Override
598
    public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
599
      // Only record method name as an attribute if isSampledToLocalTracing is set to true,
600
      // which is true for all generated methods. Otherwise, programmatically
601
      // created methods result in high cardinality metrics.
602
      boolean isSampledToLocalTracing = callInfo.getMethodDescriptor().isSampledToLocalTracing();
1✔
603
      isGeneratedMethod = isSampledToLocalTracing;
1✔
604

605
      io.opentelemetry.api.common.Attributes attribute =
1✔
606
          io.opentelemetry.api.common.Attributes.of(
1✔
607
              METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));
1✔
608

609
      if (module.resource.serverCallCountCounter() != null) {
1✔
610
        module.resource.serverCallCountCounter().add(1, attribute, otelContext);
1✔
611
      }
612
    }
1✔
613

614
    @Override
615
    @SuppressWarnings("NonAtomicVolatileUpdate")
616
    public void outboundWireSize(long bytes) {
617
      if (outboundWireSizeUpdater != null) {
1✔
618
        outboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
619
      } else {
620
        outboundWireSize += bytes;
×
621
      }
622
    }
1✔
623

624
    @Override
625
    @SuppressWarnings("NonAtomicVolatileUpdate")
626
    public void inboundWireSize(long bytes) {
627
      if (inboundWireSizeUpdater != null) {
1✔
628
        inboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
629
      } else {
630
        inboundWireSize += bytes;
×
631
      }
632
    }
1✔
633

634
    /**
635
     * Record a finished stream and mark the current time as the end time.
636
     *
637
     * <p>Can be called from any thread without synchronization.  Calling it the second time or more
638
     * is a no-op.
639
     */
640
    @Override
641
    public void streamClosed(Status status) {
642
      if (streamClosedUpdater != null) {
1✔
643
        if (streamClosedUpdater.getAndSet(this, 1) != 0) {
1✔
644
          return;
×
645
        }
646
      } else {
647
        if (streamClosed != 0) {
×
648
          return;
×
649
        }
650
        streamClosed = 1;
×
651
      }
652
      stopwatch.stop();
1✔
653
      long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
654
      recordClosedStream(
1✔
655
          status,
656
          elapsedTimeNanos,
657
          outboundWireSize,
658
          inboundWireSize,
659
          isGeneratedMethod);
660
    }
1✔
661

662
    private void recordClosedStream(
663
        Status status,
664
        long elapsedTimeNanos,
665
        long closedOutboundWireSize,
666
        long closedInboundWireSize,
667
        boolean generatedMethod) {
668
      AttributesBuilder builder =
669
          io.opentelemetry.api.common.Attributes.builder()
1✔
670
              .put(METHOD_KEY, recordMethodName(fullMethodName, generatedMethod))
1✔
671
              .put(STATUS_KEY, status.getCode().toString());
1✔
672
      for (OpenTelemetryPlugin.ServerStreamPlugin plugin : streamPlugins) {
1✔
673
        plugin.addLabels(builder);
1✔
674
      }
1✔
675
      io.opentelemetry.api.common.Attributes attributes = builder.build();
1✔
676

677
      if (module.resource.serverCallDurationCounter() != null) {
1✔
678
        module.resource.serverCallDurationCounter()
1✔
679
            .record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, otelContext);
1✔
680
      }
681
      if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) {
1✔
682
        module.resource.serverTotalSentCompressedMessageSizeCounter()
1✔
683
            .record(closedOutboundWireSize, attributes, otelContext);
1✔
684
      }
685
      if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) {
1✔
686
        module.resource.serverTotalReceivedCompressedMessageSizeCounter()
1✔
687
            .record(closedInboundWireSize, attributes, otelContext);
1✔
688
      }
689
    }
1✔
690
  }
691

692
  @VisibleForTesting
693
  final class ServerTracerFactory extends ServerStreamTracer.Factory {
1✔
694
    @Override
695
    public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
696
      final List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins;
697
      if (plugins.isEmpty()) {
1✔
698
        streamPlugins = Collections.emptyList();
1✔
699
      } else {
700
        List<OpenTelemetryPlugin.ServerStreamPlugin> streamPluginsMutable =
1✔
701
            new ArrayList<>(plugins.size());
1✔
702
        for (OpenTelemetryPlugin plugin : plugins) {
1✔
703
          streamPluginsMutable.add(plugin.newServerStreamPlugin(headers));
1✔
704
        }
1✔
705
        streamPlugins = Collections.unmodifiableList(streamPluginsMutable);
1✔
706
      }
707
      return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName,
1✔
708
          streamPlugins);
709
    }
710
  }
711

712
  @VisibleForTesting
713
  final class MetricsClientInterceptor implements ClientInterceptor {
714
    private final String target;
715
    private final ImmutableList<OpenTelemetryPlugin> plugins;
716

717
    MetricsClientInterceptor(String target, ImmutableList<OpenTelemetryPlugin> plugins) {
1✔
718
      this.target = checkNotNull(target, "target");
1✔
719
      this.plugins = checkNotNull(plugins, "plugins");
1✔
720
    }
1✔
721

722
    @Override
723
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
724
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
725
      final List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins;
726
      if (plugins.isEmpty()) {
1✔
727
        callPlugins = Collections.emptyList();
1✔
728
      } else {
729
        List<OpenTelemetryPlugin.ClientCallPlugin> callPluginsMutable =
1✔
730
            new ArrayList<>(plugins.size());
1✔
731
        for (OpenTelemetryPlugin plugin : plugins) {
1✔
732
          callPluginsMutable.add(plugin.newClientCallPlugin());
1✔
733
        }
1✔
734
        callPlugins = Collections.unmodifiableList(callPluginsMutable);
1✔
735
        for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
736
          callOptions = plugin.filterCallOptions(callOptions);
1✔
737
        }
1✔
738
      }
739
      final CallOptions finalCallOptions = callOptions;
1✔
740
      // Only record method name as an attribute if isSampledToLocalTracing is set to true,
741
      // which is true for all generated methods. Otherwise, programatically
742
      // created methods result in high cardinality metrics.
743
      final CallAttemptsTracerFactory tracerFactory = new CallAttemptsTracerFactory(
1✔
744
          OpenTelemetryMetricsModule.this, target, callOptions,
745
          recordMethodName(method.getFullMethodName(), method.isSampledToLocalTracing()),
1✔
746
          callPlugins, Context.current());
1✔
747
      ClientCall<ReqT, RespT> call =
1✔
748
          next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
1✔
749
      return new SimpleForwardingClientCall<ReqT, RespT>(call) {
1✔
750
        @Override
751
        public void start(Listener<RespT> responseListener, Metadata headers) {
752
          for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
753
            plugin.addMetadata(headers);
1✔
754
          }
1✔
755
          delegate().start(
1✔
756
              new SimpleForwardingClientCallListener<RespT>(responseListener) {
1✔
757
                @Override
758
                public void onClose(Status status, Metadata trailers) {
759
                  tracerFactory.callEnded(status, finalCallOptions);
1✔
760
                  super.onClose(status, trailers);
1✔
761
                }
1✔
762
              },
763
              headers);
764
        }
1✔
765
      };
766
    }
767
  }
768
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc