• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19785

21 Apr 2025 01:17PM UTC coverage: 88.591% (-0.008%) from 88.599%
#19785

push

github

web-flow
Implement grpc.lb.backend_service optional label

This completes gRFC A89. 7162d2d66 and fc86084df had already implemented
the LB plumbing for the optional label on RPC metrics. This observes the
value in OpenTelemetry and adds it to WRR metrics as well.

https://github.com/grpc/proposal/blob/master/A89-backend-service-metric-label.md

34747 of 39222 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.97
/../opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.opentelemetry;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY;
21
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
22
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
23
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
24
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.TARGET_KEY;
25

26
import com.google.common.annotations.VisibleForTesting;
27
import com.google.common.base.Stopwatch;
28
import com.google.common.base.Supplier;
29
import com.google.common.collect.ImmutableList;
30
import com.google.common.collect.ImmutableSet;
31
import com.google.errorprone.annotations.concurrent.GuardedBy;
32
import io.grpc.CallOptions;
33
import io.grpc.Channel;
34
import io.grpc.ClientCall;
35
import io.grpc.ClientInterceptor;
36
import io.grpc.ClientStreamTracer;
37
import io.grpc.ClientStreamTracer.StreamInfo;
38
import io.grpc.Deadline;
39
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
40
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
41
import io.grpc.Metadata;
42
import io.grpc.MethodDescriptor;
43
import io.grpc.ServerStreamTracer;
44
import io.grpc.Status;
45
import io.grpc.Status.Code;
46
import io.grpc.StreamTracer;
47
import io.opentelemetry.api.common.AttributesBuilder;
48
import java.util.ArrayList;
49
import java.util.Collection;
50
import java.util.Collections;
51
import java.util.List;
52
import java.util.concurrent.TimeUnit;
53
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
54
import java.util.concurrent.atomic.AtomicLong;
55
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
56
import java.util.logging.Level;
57
import java.util.logging.Logger;
58
import javax.annotation.Nullable;
59

60
/**
61
 * Provides factories for {@link StreamTracer} that records metrics to OpenTelemetry.
62
 *
63
 * <p>On the client-side, a factory is created for each call, and the factory creates a stream
64
 * tracer for each attempt. If there is no stream created when the call is ended, we still create a
65
 * tracer. It's the tracer that reports per-attempt stats, and the factory that reports the stats
66
 * of the overall RPC, such as RETRIES_PER_CALL, to OpenTelemetry.
67
 *
68
 * <p>On the server-side, there is only one ServerStream per each ServerCall, and ServerStream
69
 * starts earlier than the ServerCall. Therefore, only one tracer is created per stream/call, and
70
 * it's the tracer that reports the summary to OpenTelemetry.
71
 */
72
final class OpenTelemetryMetricsModule {
73
  private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName());
1✔
74
  public static final ImmutableSet<String> DEFAULT_PER_CALL_METRICS_SET =
1✔
75
      ImmutableSet.of(
1✔
76
          "grpc.client.attempt.started",
77
          "grpc.client.attempt.duration",
78
          "grpc.client.attempt.sent_total_compressed_message_size",
79
          "grpc.client.attempt.rcvd_total_compressed_message_size",
80
          "grpc.client.call.duration",
81
          "grpc.server.call.started",
82
          "grpc.server.call.duration",
83
          "grpc.server.call.sent_total_compressed_message_size",
84
          "grpc.server.call.rcvd_total_compressed_message_size");
85

86
  // Using floating point because TimeUnit.NANOSECONDS.toSeconds would discard
87
  // fractional seconds.
88
  private static final double SECONDS_PER_NANO = 1e-9;
89

90
  private final OpenTelemetryMetricsResource resource;
91
  private final Supplier<Stopwatch> stopwatchSupplier;
92
  private final boolean localityEnabled;
93
  private final boolean backendServiceEnabled;
94
  private final ImmutableList<OpenTelemetryPlugin> plugins;
95

96
  OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
97
      OpenTelemetryMetricsResource resource, Collection<String> optionalLabels,
98
      List<OpenTelemetryPlugin> plugins) {
1✔
99
    this.resource = checkNotNull(resource, "resource");
1✔
100
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
101
    this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
1✔
102
    this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey());
1✔
103
    this.plugins = ImmutableList.copyOf(plugins);
1✔
104
  }
1✔
105

106
  /**
107
   * Returns the server tracer factory.
108
   */
109
  ServerStreamTracer.Factory getServerTracerFactory() {
110
    return new ServerTracerFactory();
1✔
111
  }
112

113
  /**
114
   * Returns the client interceptor that facilitates OpenTelemetry metrics reporting.
115
   */
116
  ClientInterceptor getClientInterceptor(String target) {
117
    ImmutableList.Builder<OpenTelemetryPlugin> pluginBuilder =
1✔
118
        ImmutableList.builderWithExpectedSize(plugins.size());
1✔
119
    for (OpenTelemetryPlugin plugin : plugins) {
1✔
120
      if (plugin.enablePluginForChannel(target)) {
1✔
121
        pluginBuilder.add(plugin);
1✔
122
      }
123
    }
1✔
124
    return new MetricsClientInterceptor(target, pluginBuilder.build());
1✔
125
  }
126

127
  static String recordMethodName(String fullMethodName, boolean isGeneratedMethod) {
128
    return isGeneratedMethod ? fullMethodName : "other";
1✔
129
  }
130

131
  private static final class ClientTracer extends ClientStreamTracer {
132
    @Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater;
133
    @Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater;
134

135
    /*
136
     * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
137
     * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fall back to
138
     * (potentially racy) direct updates of the volatile variables.
139
     */
140
    static {
141
      AtomicLongFieldUpdater<ClientTracer> tmpOutboundWireSizeUpdater;
142
      AtomicLongFieldUpdater<ClientTracer> tmpInboundWireSizeUpdater;
143
      try {
144
        tmpOutboundWireSizeUpdater =
1✔
145
            AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundWireSize");
1✔
146
        tmpInboundWireSizeUpdater =
1✔
147
            AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundWireSize");
1✔
148
      } catch (Throwable t) {
×
149
        logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
150
        tmpOutboundWireSizeUpdater = null;
×
151
        tmpInboundWireSizeUpdater = null;
×
152
      }
1✔
153
      outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
1✔
154
      inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
1✔
155
    }
1✔
156

157
    final Stopwatch stopwatch;
158
    final CallAttemptsTracerFactory attemptsState;
159
    final OpenTelemetryMetricsModule module;
160
    final StreamInfo info;
161
    final String target;
162
    final String fullMethodName;
163
    final List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins;
164
    volatile long outboundWireSize;
165
    volatile long inboundWireSize;
166
    volatile String locality;
167
    volatile String backendService;
168
    long attemptNanos;
169
    Code statusCode;
170

171
    ClientTracer(CallAttemptsTracerFactory attemptsState, OpenTelemetryMetricsModule module,
172
        StreamInfo info, String target, String fullMethodName,
173
        List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins) {
1✔
174
      this.attemptsState = attemptsState;
1✔
175
      this.module = module;
1✔
176
      this.info = info;
1✔
177
      this.target = target;
1✔
178
      this.fullMethodName = fullMethodName;
1✔
179
      this.streamPlugins = streamPlugins;
1✔
180
      this.stopwatch = module.stopwatchSupplier.get().start();
1✔
181
    }
1✔
182

183
    @Override
184
    public void inboundHeaders(Metadata headers) {
185
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
186
        plugin.inboundHeaders(headers);
1✔
187
      }
1✔
188
    }
1✔
189

190
    @Override
191
    @SuppressWarnings("NonAtomicVolatileUpdate")
192
    public void outboundWireSize(long bytes) {
193
      if (outboundWireSizeUpdater != null) {
1✔
194
        outboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
195
      } else {
196
        outboundWireSize += bytes;
×
197
      }
198
    }
1✔
199

200
    @Override
201
    @SuppressWarnings("NonAtomicVolatileUpdate")
202
    public void inboundWireSize(long bytes) {
203
      if (inboundWireSizeUpdater != null) {
1✔
204
        inboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
205
      } else {
206
        inboundWireSize += bytes;
×
207
      }
208
    }
1✔
209

210
    @Override
211
    public void addOptionalLabel(String key, String value) {
212
      if ("grpc.lb.locality".equals(key)) {
1✔
213
        locality = value;
1✔
214
      }
215
      if ("grpc.lb.backend_service".equals(key)) {
1✔
216
        backendService = value;
1✔
217
      }
218
    }
1✔
219

220
    @Override
221
    public void inboundTrailers(Metadata trailers) {
222
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
223
        plugin.inboundTrailers(trailers);
1✔
224
      }
1✔
225
    }
1✔
226

227
    @Override
228
    public void streamClosed(Status status) {
229
      stopwatch.stop();
1✔
230
      attemptNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
231
      Deadline deadline = info.getCallOptions().getDeadline();
1✔
232
      statusCode = status.getCode();
1✔
233
      if (statusCode == Code.CANCELLED && deadline != null) {
1✔
234
        // When the server's deadline expires, it can only reset the stream with CANCEL and no
235
        // description. Since our timer may be delayed in firing, we double-check the deadline and
236
        // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
237
        if (deadline.isExpired()) {
1✔
238
          statusCode = Code.DEADLINE_EXCEEDED;
1✔
239
        }
240
      }
241
      attemptsState.attemptEnded();
1✔
242
      recordFinishedAttempt();
1✔
243
    }
1✔
244

245
    void recordFinishedAttempt() {
246
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
247
          .put(METHOD_KEY, fullMethodName)
1✔
248
          .put(TARGET_KEY, target)
1✔
249
          .put(STATUS_KEY, statusCode.toString());
1✔
250
      if (module.localityEnabled) {
1✔
251
        String savedLocality = locality;
1✔
252
        if (savedLocality == null) {
1✔
253
          savedLocality = "";
1✔
254
        }
255
        builder.put(LOCALITY_KEY, savedLocality);
1✔
256
      }
257
      if (module.backendServiceEnabled) {
1✔
258
        String savedBackendService = backendService;
1✔
259
        if (savedBackendService == null) {
1✔
260
          savedBackendService = "";
1✔
261
        }
262
        builder.put(BACKEND_SERVICE_KEY, savedBackendService);
1✔
263
      }
264
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
265
        plugin.addLabels(builder);
1✔
266
      }
1✔
267
      io.opentelemetry.api.common.Attributes attribute = builder.build();
1✔
268

269
      if (module.resource.clientAttemptDurationCounter() != null ) {
1✔
270
        module.resource.clientAttemptDurationCounter()
1✔
271
            .record(attemptNanos * SECONDS_PER_NANO, attribute);
1✔
272
      }
273
      if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) {
1✔
274
        module.resource.clientTotalSentCompressedMessageSizeCounter()
1✔
275
            .record(outboundWireSize, attribute);
1✔
276
      }
277
      if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) {
1✔
278
        module.resource.clientTotalReceivedCompressedMessageSizeCounter()
1✔
279
            .record(inboundWireSize, attribute);
1✔
280
      }
281
    }
1✔
282
  }
283

284
  @VisibleForTesting
285
  static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory {
286
    private final OpenTelemetryMetricsModule module;
287
    private final String target;
288
    private final Stopwatch attemptStopwatch;
289
    private final Stopwatch callStopWatch;
290
    @GuardedBy("lock")
291
    private boolean callEnded;
292
    private final String fullMethodName;
293
    private final List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins;
294
    private Status status;
295
    private long callLatencyNanos;
296
    private final Object lock = new Object();
1✔
297
    private final AtomicLong attemptsPerCall = new AtomicLong();
1✔
298
    @GuardedBy("lock")
299
    private int activeStreams;
300
    @GuardedBy("lock")
301
    private boolean finishedCallToBeRecorded;
302

303
    CallAttemptsTracerFactory(
304
        OpenTelemetryMetricsModule module,
305
        String target,
306
        String fullMethodName,
307
        List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins) {
1✔
308
      this.module = checkNotNull(module, "module");
1✔
309
      this.target = checkNotNull(target, "target");
1✔
310
      this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
1✔
311
      this.callPlugins = checkNotNull(callPlugins, "callPlugins");
1✔
312
      this.attemptStopwatch = module.stopwatchSupplier.get();
1✔
313
      this.callStopWatch = module.stopwatchSupplier.get().start();
1✔
314

315
      io.opentelemetry.api.common.Attributes attribute = io.opentelemetry.api.common.Attributes.of(
1✔
316
          METHOD_KEY, fullMethodName,
317
          TARGET_KEY, target);
318

319
      // Record here in case mewClientStreamTracer() would never be called.
320
      if (module.resource.clientAttemptCountCounter() != null) {
1✔
321
        module.resource.clientAttemptCountCounter().add(1, attribute);
1✔
322
      }
323
    }
1✔
324

325
    @Override
326
    public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) {
327
      synchronized (lock) {
1✔
328
        if (finishedCallToBeRecorded) {
1✔
329
          // This can be the case when the call is cancelled but a retry attempt is created.
330
          return new ClientStreamTracer() {};
×
331
        }
332
        if (++activeStreams == 1 && attemptStopwatch.isRunning()) {
1✔
333
          attemptStopwatch.stop();
1✔
334
        }
335
      }
1✔
336
      // Skip recording for the first time, since it is already recorded in
337
      // CallAttemptsTracerFactory constructor. attemptsPerCall will be non-zero after the first
338
      // attempt, as first attempt cannot be a transparent retry.
339
      if (attemptsPerCall.get() > 0) {
1✔
340
        io.opentelemetry.api.common.Attributes attribute =
1✔
341
            io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName,
1✔
342
                TARGET_KEY, target);
343
        if (module.resource.clientAttemptCountCounter() != null) {
1✔
344
          module.resource.clientAttemptCountCounter().add(1, attribute);
1✔
345
        }
346
      }
347
      if (!info.isTransparentRetry()) {
1✔
348
        attemptsPerCall.incrementAndGet();
1✔
349
      }
350
      return newClientTracer(info);
1✔
351
    }
352

353
    private ClientTracer newClientTracer(StreamInfo info) {
354
      List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins = Collections.emptyList();
1✔
355
      if (!callPlugins.isEmpty()) {
1✔
356
        streamPlugins = new ArrayList<>(callPlugins.size());
1✔
357
        for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
358
          streamPlugins.add(plugin.newClientStreamPlugin());
1✔
359
        }
1✔
360
        streamPlugins = Collections.unmodifiableList(streamPlugins);
1✔
361
      }
362
      return new ClientTracer(this, module, info, target, fullMethodName, streamPlugins);
1✔
363
    }
364

365
    // Called whenever each attempt is ended.
366
    void attemptEnded() {
367
      boolean shouldRecordFinishedCall = false;
1✔
368
      synchronized (lock) {
1✔
369
        if (--activeStreams == 0) {
1✔
370
          attemptStopwatch.start();
1✔
371
          if (callEnded && !finishedCallToBeRecorded) {
1✔
372
            shouldRecordFinishedCall = true;
×
373
            finishedCallToBeRecorded = true;
×
374
          }
375
        }
376
      }
1✔
377
      if (shouldRecordFinishedCall) {
1✔
378
        recordFinishedCall();
×
379
      }
380
    }
1✔
381

382
    void callEnded(Status status) {
383
      callStopWatch.stop();
1✔
384
      this.status = status;
1✔
385
      boolean shouldRecordFinishedCall = false;
1✔
386
      synchronized (lock) {
1✔
387
        if (callEnded) {
1✔
388
          // TODO(https://github.com/grpc/grpc-java/issues/7921): this shouldn't happen
389
          return;
×
390
        }
391
        callEnded = true;
1✔
392
        if (activeStreams == 0 && !finishedCallToBeRecorded) {
1✔
393
          shouldRecordFinishedCall = true;
1✔
394
          finishedCallToBeRecorded = true;
1✔
395
        }
396
      }
1✔
397
      if (shouldRecordFinishedCall) {
1✔
398
        recordFinishedCall();
1✔
399
      }
400
    }
1✔
401

402
    void recordFinishedCall() {
403
      if (attemptsPerCall.get() == 0) {
1✔
404
        ClientTracer tracer = newClientTracer(null);
1✔
405
        tracer.attemptNanos = attemptStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
406
        tracer.statusCode = status.getCode();
1✔
407
        tracer.recordFinishedAttempt();
1✔
408
      }
409
      callLatencyNanos = callStopWatch.elapsed(TimeUnit.NANOSECONDS);
1✔
410
      io.opentelemetry.api.common.Attributes attribute =
1✔
411
          io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName,
1✔
412
              TARGET_KEY, target,
413
              STATUS_KEY, status.getCode().toString());
1✔
414

415
      if (module.resource.clientCallDurationCounter() != null) {
1✔
416
        module.resource.clientCallDurationCounter()
1✔
417
            .record(callLatencyNanos * SECONDS_PER_NANO, attribute);
1✔
418
      }
419
    }
1✔
420
  }
421

422
  private static final class ServerTracer extends ServerStreamTracer {
423
    @Nullable private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
424
    @Nullable private static final AtomicLongFieldUpdater<ServerTracer> outboundWireSizeUpdater;
425
    @Nullable private static final AtomicLongFieldUpdater<ServerTracer> inboundWireSizeUpdater;
426

427
    /*
428
     * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
429
     * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fall back to
430
     * (potentially racy) direct updates of the volatile variables.
431
     */
432
    static {
433
      AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
434
      AtomicLongFieldUpdater<ServerTracer> tmpOutboundWireSizeUpdater;
435
      AtomicLongFieldUpdater<ServerTracer> tmpInboundWireSizeUpdater;
436
      try {
437
        tmpStreamClosedUpdater =
1✔
438
            AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
1✔
439
        tmpOutboundWireSizeUpdater =
1✔
440
            AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundWireSize");
1✔
441
        tmpInboundWireSizeUpdater =
1✔
442
            AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundWireSize");
1✔
443
      } catch (Throwable t) {
×
444
        logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
445
        tmpStreamClosedUpdater = null;
×
446
        tmpOutboundWireSizeUpdater = null;
×
447
        tmpInboundWireSizeUpdater = null;
×
448
      }
1✔
449
      streamClosedUpdater = tmpStreamClosedUpdater;
1✔
450
      outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
1✔
451
      inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
1✔
452
    }
1✔
453

454
    private final OpenTelemetryMetricsModule module;
455
    private final String fullMethodName;
456
    private final List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins;
457
    private volatile boolean isGeneratedMethod;
458
    private volatile int streamClosed;
459
    private final Stopwatch stopwatch;
460
    private volatile long outboundWireSize;
461
    private volatile long inboundWireSize;
462

463
    ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName,
464
        List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins) {
1✔
465
      this.module = checkNotNull(module, "module");
1✔
466
      this.fullMethodName = fullMethodName;
1✔
467
      this.streamPlugins = checkNotNull(streamPlugins, "streamPlugins");
1✔
468
      this.stopwatch = module.stopwatchSupplier.get().start();
1✔
469
    }
1✔
470

471
    @Override
472
    public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
473
      // Only record method name as an attribute if isSampledToLocalTracing is set to true,
474
      // which is true for all generated methods. Otherwise, programmatically
475
      // created methods result in high cardinality metrics.
476
      boolean isSampledToLocalTracing = callInfo.getMethodDescriptor().isSampledToLocalTracing();
1✔
477
      isGeneratedMethod = isSampledToLocalTracing;
1✔
478
      io.opentelemetry.api.common.Attributes attribute =
1✔
479
          io.opentelemetry.api.common.Attributes.of(
1✔
480
              METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));
1✔
481

482
      if (module.resource.serverCallCountCounter() != null) {
1✔
483
        module.resource.serverCallCountCounter().add(1, attribute);
1✔
484
      }
485
    }
1✔
486

487
    @Override
488
    @SuppressWarnings("NonAtomicVolatileUpdate")
489
    public void outboundWireSize(long bytes) {
490
      if (outboundWireSizeUpdater != null) {
1✔
491
        outboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
492
      } else {
493
        outboundWireSize += bytes;
×
494
      }
495
    }
1✔
496

497
    @Override
498
    @SuppressWarnings("NonAtomicVolatileUpdate")
499
    public void inboundWireSize(long bytes) {
500
      if (inboundWireSizeUpdater != null) {
1✔
501
        inboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
502
      } else {
503
        inboundWireSize += bytes;
×
504
      }
505
    }
1✔
506

507
    /**
508
     * Record a finished stream and mark the current time as the end time.
509
     *
510
     * <p>Can be called from any thread without synchronization.  Calling it the second time or more
511
     * is a no-op.
512
     */
513
    @Override
514
    public void streamClosed(Status status) {
515
      if (streamClosedUpdater != null) {
1✔
516
        if (streamClosedUpdater.getAndSet(this, 1) != 0) {
1✔
517
          return;
×
518
        }
519
      } else {
520
        if (streamClosed != 0) {
×
521
          return;
×
522
        }
523
        streamClosed = 1;
×
524
      }
525
      stopwatch.stop();
1✔
526
      long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
527
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
528
          .put(METHOD_KEY, recordMethodName(fullMethodName, isGeneratedMethod))
1✔
529
          .put(STATUS_KEY, status.getCode().toString());
1✔
530
      for (OpenTelemetryPlugin.ServerStreamPlugin plugin : streamPlugins) {
1✔
531
        plugin.addLabels(builder);
1✔
532
      }
1✔
533
      io.opentelemetry.api.common.Attributes attributes = builder.build();
1✔
534

535
      if (module.resource.serverCallDurationCounter() != null) {
1✔
536
        module.resource.serverCallDurationCounter()
1✔
537
            .record(elapsedTimeNanos * SECONDS_PER_NANO, attributes);
1✔
538
      }
539
      if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) {
1✔
540
        module.resource.serverTotalSentCompressedMessageSizeCounter()
1✔
541
            .record(outboundWireSize, attributes);
1✔
542
      }
543
      if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) {
1✔
544
        module.resource.serverTotalReceivedCompressedMessageSizeCounter()
1✔
545
            .record(inboundWireSize, attributes);
1✔
546
      }
547
    }
1✔
548
  }
549

550
  @VisibleForTesting
551
  final class ServerTracerFactory extends ServerStreamTracer.Factory {
1✔
552
    @Override
553
    public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
554
      final List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins;
555
      if (plugins.isEmpty()) {
1✔
556
        streamPlugins = Collections.emptyList();
1✔
557
      } else {
558
        List<OpenTelemetryPlugin.ServerStreamPlugin> streamPluginsMutable =
1✔
559
            new ArrayList<>(plugins.size());
1✔
560
        for (OpenTelemetryPlugin plugin : plugins) {
1✔
561
          streamPluginsMutable.add(plugin.newServerStreamPlugin(headers));
1✔
562
        }
1✔
563
        streamPlugins = Collections.unmodifiableList(streamPluginsMutable);
1✔
564
      }
565
      return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins);
1✔
566
    }
567
  }
568

569
  @VisibleForTesting
570
  final class MetricsClientInterceptor implements ClientInterceptor {
571
    private final String target;
572
    private final ImmutableList<OpenTelemetryPlugin> plugins;
573

574
    MetricsClientInterceptor(String target, ImmutableList<OpenTelemetryPlugin> plugins) {
1✔
575
      this.target = checkNotNull(target, "target");
1✔
576
      this.plugins = checkNotNull(plugins, "plugins");
1✔
577
    }
1✔
578

579
    @Override
580
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
581
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
582
      final List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins;
583
      if (plugins.isEmpty()) {
1✔
584
        callPlugins = Collections.emptyList();
1✔
585
      } else {
586
        List<OpenTelemetryPlugin.ClientCallPlugin> callPluginsMutable =
1✔
587
            new ArrayList<>(plugins.size());
1✔
588
        for (OpenTelemetryPlugin plugin : plugins) {
1✔
589
          callPluginsMutable.add(plugin.newClientCallPlugin());
1✔
590
        }
1✔
591
        callPlugins = Collections.unmodifiableList(callPluginsMutable);
1✔
592
        for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
593
          callOptions = plugin.filterCallOptions(callOptions);
1✔
594
        }
1✔
595
      }
596
      // Only record method name as an attribute if isSampledToLocalTracing is set to true,
597
      // which is true for all generated methods. Otherwise, programatically
598
      // created methods result in high cardinality metrics.
599
      final CallAttemptsTracerFactory tracerFactory = new CallAttemptsTracerFactory(
1✔
600
          OpenTelemetryMetricsModule.this, target,
601
          recordMethodName(method.getFullMethodName(), method.isSampledToLocalTracing()),
1✔
602
          callPlugins);
603
      ClientCall<ReqT, RespT> call =
1✔
604
          next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
1✔
605
      return new SimpleForwardingClientCall<ReqT, RespT>(call) {
1✔
606
        @Override
607
        public void start(Listener<RespT> responseListener, Metadata headers) {
608
          for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
609
            plugin.addMetadata(headers);
1✔
610
          }
1✔
611
          delegate().start(
1✔
612
              new SimpleForwardingClientCallListener<RespT>(responseListener) {
1✔
613
                @Override
614
                public void onClose(Status status, Metadata trailers) {
615
                  tracerFactory.callEnded(status);
1✔
616
                  super.onClose(status, trailers);
1✔
617
                }
1✔
618
              },
619
              headers);
620
        }
1✔
621
      };
622
    }
623
  }
624
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc