• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19255

29 May 2024 09:40PM UTC coverage: 88.298% (-0.1%) from 88.425%
#19255

push

github

ejona86
Create gcp-csm-observability

32038 of 36284 relevant lines covered (88.3%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.57
/../opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.opentelemetry;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
21
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
22
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
23
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.TARGET_KEY;
24

25
import com.google.common.annotations.VisibleForTesting;
26
import com.google.common.base.Stopwatch;
27
import com.google.common.base.Supplier;
28
import com.google.common.collect.ImmutableList;
29
import com.google.common.collect.ImmutableSet;
30
import io.grpc.CallOptions;
31
import io.grpc.Channel;
32
import io.grpc.ClientCall;
33
import io.grpc.ClientInterceptor;
34
import io.grpc.ClientStreamTracer;
35
import io.grpc.ClientStreamTracer.StreamInfo;
36
import io.grpc.Deadline;
37
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
38
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
39
import io.grpc.Metadata;
40
import io.grpc.MethodDescriptor;
41
import io.grpc.ServerStreamTracer;
42
import io.grpc.Status;
43
import io.grpc.Status.Code;
44
import io.grpc.StreamTracer;
45
import io.opentelemetry.api.common.AttributesBuilder;
46
import java.util.ArrayList;
47
import java.util.Collection;
48
import java.util.Collections;
49
import java.util.List;
50
import java.util.concurrent.TimeUnit;
51
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
52
import java.util.concurrent.atomic.AtomicLong;
53
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
54
import java.util.logging.Level;
55
import java.util.logging.Logger;
56
import javax.annotation.Nullable;
57
import javax.annotation.concurrent.GuardedBy;
58

59
/**
60
 * Provides factories for {@link StreamTracer} that records metrics to OpenTelemetry.
61
 *
62
 * <p>On the client-side, a factory is created for each call, and the factory creates a stream
63
 * tracer for each attempt. If there is no stream created when the call is ended, we still create a
64
 * tracer. It's the tracer that reports per-attempt stats, and the factory that reports the stats
65
 * of the overall RPC, such as RETRIES_PER_CALL, to OpenTelemetry.
66
 *
67
 * <p>On the server-side, there is only one ServerStream per each ServerCall, and ServerStream
68
 * starts earlier than the ServerCall. Therefore, only one tracer is created per stream/call, and
69
 * it's the tracer that reports the summary to OpenTelemetry.
70
 */
71
final class OpenTelemetryMetricsModule {
72
  private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName());
1✔
73
  private static final String LOCALITY_LABEL_NAME = "grpc.lb.locality";
74
  public static final ImmutableSet<String> DEFAULT_PER_CALL_METRICS_SET =
1✔
75
      ImmutableSet.of(
1✔
76
          "grpc.client.attempt.started",
77
          "grpc.client.attempt.duration",
78
          "grpc.client.attempt.sent_total_compressed_message_size",
79
          "grpc.client.attempt.rcvd_total_compressed_message_size",
80
          "grpc.client.call.duration",
81
          "grpc.server.call.started",
82
          "grpc.server.call.duration",
83
          "grpc.server.call.sent_total_compressed_message_size",
84
          "grpc.server.call.rcvd_total_compressed_message_size");
85

86
  // Using floating point because TimeUnit.NANOSECONDS.toSeconds would discard
87
  // fractional seconds.
88
  private static final double SECONDS_PER_NANO = 1e-9;
89

90
  private final OpenTelemetryMetricsResource resource;
91
  private final Supplier<Stopwatch> stopwatchSupplier;
92
  private final boolean localityEnabled;
93
  private final ImmutableList<OpenTelemetryPlugin> plugins;
94

95
  OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
96
      OpenTelemetryMetricsResource resource, Collection<String> optionalLabels,
97
      List<OpenTelemetryPlugin> plugins) {
1✔
98
    this.resource = checkNotNull(resource, "resource");
1✔
99
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
100
    this.localityEnabled = optionalLabels.contains(LOCALITY_LABEL_NAME);
1✔
101
    this.plugins = ImmutableList.copyOf(plugins);
1✔
102
  }
1✔
103

104
  /**
105
   * Returns the server tracer factory.
106
   */
107
  ServerStreamTracer.Factory getServerTracerFactory() {
108
    return new ServerTracerFactory();
1✔
109
  }
110

111
  /**
112
   * Returns the client interceptor that facilitates OpenTelemetry metrics reporting.
113
   */
114
  ClientInterceptor getClientInterceptor(String target) {
115
    ImmutableList.Builder<OpenTelemetryPlugin> pluginBuilder =
1✔
116
        ImmutableList.builderWithExpectedSize(plugins.size());
1✔
117
    for (OpenTelemetryPlugin plugin : plugins) {
1✔
118
      if (plugin.enablePluginForChannel(target)) {
×
119
        pluginBuilder.add(plugin);
×
120
      }
121
    }
×
122
    return new MetricsClientInterceptor(target, pluginBuilder.build());
1✔
123
  }
124

125
  static String recordMethodName(String fullMethodName, boolean isGeneratedMethod) {
126
    return isGeneratedMethod ? fullMethodName : "other";
1✔
127
  }
128

129
  private static final class ClientTracer extends ClientStreamTracer {
130
    @Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater;
131
    @Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater;
132

133
    /*
134
     * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
135
     * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fall back to
136
     * (potentially racy) direct updates of the volatile variables.
137
     */
138
    static {
139
      AtomicLongFieldUpdater<ClientTracer> tmpOutboundWireSizeUpdater;
140
      AtomicLongFieldUpdater<ClientTracer> tmpInboundWireSizeUpdater;
141
      try {
142
        tmpOutboundWireSizeUpdater =
1✔
143
            AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundWireSize");
1✔
144
        tmpInboundWireSizeUpdater =
1✔
145
            AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundWireSize");
1✔
146
      } catch (Throwable t) {
×
147
        logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
148
        tmpOutboundWireSizeUpdater = null;
×
149
        tmpInboundWireSizeUpdater = null;
×
150
      }
1✔
151
      outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
1✔
152
      inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
1✔
153
    }
1✔
154

155
    final Stopwatch stopwatch;
156
    final CallAttemptsTracerFactory attemptsState;
157
    final OpenTelemetryMetricsModule module;
158
    final StreamInfo info;
159
    final String target;
160
    final String fullMethodName;
161
    final List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins;
162
    volatile long outboundWireSize;
163
    volatile long inboundWireSize;
164
    volatile String locality;
165
    long attemptNanos;
166
    Code statusCode;
167

168
    ClientTracer(CallAttemptsTracerFactory attemptsState, OpenTelemetryMetricsModule module,
169
        StreamInfo info, String target, String fullMethodName,
170
        List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins) {
1✔
171
      this.attemptsState = attemptsState;
1✔
172
      this.module = module;
1✔
173
      this.info = info;
1✔
174
      this.target = target;
1✔
175
      this.fullMethodName = fullMethodName;
1✔
176
      this.streamPlugins = streamPlugins;
1✔
177
      this.stopwatch = module.stopwatchSupplier.get().start();
1✔
178
    }
1✔
179

180
    @Override
181
    public void inboundHeaders(Metadata headers) {
182
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
183
        plugin.inboundHeaders(headers);
×
184
      }
×
185
    }
1✔
186

187
    @Override
188
    @SuppressWarnings("NonAtomicVolatileUpdate")
189
    public void outboundWireSize(long bytes) {
190
      if (outboundWireSizeUpdater != null) {
1✔
191
        outboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
192
      } else {
193
        outboundWireSize += bytes;
×
194
      }
195
    }
1✔
196

197
    @Override
198
    @SuppressWarnings("NonAtomicVolatileUpdate")
199
    public void inboundWireSize(long bytes) {
200
      if (inboundWireSizeUpdater != null) {
1✔
201
        inboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
202
      } else {
203
        inboundWireSize += bytes;
×
204
      }
205
    }
1✔
206

207
    @Override
208
    public void addOptionalLabel(String key, String value) {
209
      if (LOCALITY_LABEL_NAME.equals(key)) {
1✔
210
        locality = value;
1✔
211
      }
212
    }
1✔
213

214
    @Override
215
    public void inboundTrailers(Metadata trailers) {
216
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
217
        plugin.inboundTrailers(trailers);
×
218
      }
×
219
    }
1✔
220

221
    @Override
222
    public void streamClosed(Status status) {
223
      stopwatch.stop();
1✔
224
      attemptNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
225
      Deadline deadline = info.getCallOptions().getDeadline();
1✔
226
      statusCode = status.getCode();
1✔
227
      if (statusCode == Code.CANCELLED && deadline != null) {
1✔
228
        // When the server's deadline expires, it can only reset the stream with CANCEL and no
229
        // description. Since our timer may be delayed in firing, we double-check the deadline and
230
        // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
231
        if (deadline.isExpired()) {
×
232
          statusCode = Code.DEADLINE_EXCEEDED;
×
233
        }
234
      }
235
      attemptsState.attemptEnded();
1✔
236
      recordFinishedAttempt();
1✔
237
    }
1✔
238

239
    void recordFinishedAttempt() {
240
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
241
          .put(METHOD_KEY, fullMethodName)
1✔
242
          .put(TARGET_KEY, target)
1✔
243
          .put(STATUS_KEY, statusCode.toString());
1✔
244
      if (module.localityEnabled) {
1✔
245
        String savedLocality = locality;
1✔
246
        if (savedLocality == null) {
1✔
247
          savedLocality = "";
1✔
248
        }
249
        builder.put(LOCALITY_KEY, savedLocality);
1✔
250
      }
251
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
252
        plugin.addLabels(builder);
×
253
      }
×
254
      io.opentelemetry.api.common.Attributes attribute = builder.build();
1✔
255

256
      if (module.resource.clientAttemptDurationCounter() != null ) {
1✔
257
        module.resource.clientAttemptDurationCounter()
1✔
258
            .record(attemptNanos * SECONDS_PER_NANO, attribute);
1✔
259
      }
260
      if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) {
1✔
261
        module.resource.clientTotalSentCompressedMessageSizeCounter()
1✔
262
            .record(outboundWireSize, attribute);
1✔
263
      }
264
      if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) {
1✔
265
        module.resource.clientTotalReceivedCompressedMessageSizeCounter()
1✔
266
            .record(inboundWireSize, attribute);
1✔
267
      }
268
    }
1✔
269
  }
270

271
  @VisibleForTesting
272
  static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory {
273
    private final OpenTelemetryMetricsModule module;
274
    private final String target;
275
    private final Stopwatch attemptStopwatch;
276
    private final Stopwatch callStopWatch;
277
    @GuardedBy("lock")
278
    private boolean callEnded;
279
    private final String fullMethodName;
280
    private final List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins;
281
    private Status status;
282
    private long callLatencyNanos;
283
    private final Object lock = new Object();
1✔
284
    private final AtomicLong attemptsPerCall = new AtomicLong();
1✔
285
    @GuardedBy("lock")
286
    private int activeStreams;
287
    @GuardedBy("lock")
288
    private boolean finishedCallToBeRecorded;
289

290
    CallAttemptsTracerFactory(
291
        OpenTelemetryMetricsModule module,
292
        String target,
293
        String fullMethodName,
294
        List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins) {
1✔
295
      this.module = checkNotNull(module, "module");
1✔
296
      this.target = checkNotNull(target, "target");
1✔
297
      this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
1✔
298
      this.callPlugins = checkNotNull(callPlugins, "callPlugins");
1✔
299
      this.attemptStopwatch = module.stopwatchSupplier.get();
1✔
300
      this.callStopWatch = module.stopwatchSupplier.get().start();
1✔
301

302
      io.opentelemetry.api.common.Attributes attribute = io.opentelemetry.api.common.Attributes.of(
1✔
303
          METHOD_KEY, fullMethodName,
304
          TARGET_KEY, target);
305

306
      // Record here in case mewClientStreamTracer() would never be called.
307
      if (module.resource.clientAttemptCountCounter() != null) {
1✔
308
        module.resource.clientAttemptCountCounter().add(1, attribute);
1✔
309
      }
310
    }
1✔
311

312
    @Override
313
    public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) {
314
      synchronized (lock) {
1✔
315
        if (finishedCallToBeRecorded) {
1✔
316
          // This can be the case when the call is cancelled but a retry attempt is created.
317
          return new ClientStreamTracer() {};
×
318
        }
319
        if (++activeStreams == 1 && attemptStopwatch.isRunning()) {
1✔
320
          attemptStopwatch.stop();
1✔
321
        }
322
      }
1✔
323
      // Skip recording for the first time, since it is already recorded in
324
      // CallAttemptsTracerFactory constructor. attemptsPerCall will be non-zero after the first
325
      // attempt, as first attempt cannot be a transparent retry.
326
      if (attemptsPerCall.get() > 0) {
1✔
327
        io.opentelemetry.api.common.Attributes attribute =
1✔
328
            io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName,
1✔
329
                TARGET_KEY, target);
330
        if (module.resource.clientAttemptCountCounter() != null) {
1✔
331
          module.resource.clientAttemptCountCounter().add(1, attribute);
1✔
332
        }
333
      }
334
      if (!info.isTransparentRetry()) {
1✔
335
        attemptsPerCall.incrementAndGet();
1✔
336
      }
337
      return newClientTracer(info);
1✔
338
    }
339

340
    private ClientTracer newClientTracer(StreamInfo info) {
341
      List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins = Collections.emptyList();
1✔
342
      if (!callPlugins.isEmpty()) {
1✔
343
        streamPlugins = new ArrayList<>(callPlugins.size());
×
344
        for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
×
345
          streamPlugins.add(plugin.newClientStreamPlugin());
×
346
        }
×
347
        streamPlugins = Collections.unmodifiableList(streamPlugins);
×
348
      }
349
      return new ClientTracer(this, module, info, target, fullMethodName, streamPlugins);
1✔
350
    }
351

352
    // Called whenever each attempt is ended.
353
    void attemptEnded() {
354
      boolean shouldRecordFinishedCall = false;
1✔
355
      synchronized (lock) {
1✔
356
        if (--activeStreams == 0) {
1✔
357
          attemptStopwatch.start();
1✔
358
          if (callEnded && !finishedCallToBeRecorded) {
1✔
359
            shouldRecordFinishedCall = true;
×
360
            finishedCallToBeRecorded = true;
×
361
          }
362
        }
363
      }
1✔
364
      if (shouldRecordFinishedCall) {
1✔
365
        recordFinishedCall();
×
366
      }
367
    }
1✔
368

369
    void callEnded(Status status) {
370
      callStopWatch.stop();
1✔
371
      this.status = status;
1✔
372
      boolean shouldRecordFinishedCall = false;
1✔
373
      synchronized (lock) {
1✔
374
        if (callEnded) {
1✔
375
          // TODO(https://github.com/grpc/grpc-java/issues/7921): this shouldn't happen
376
          return;
×
377
        }
378
        callEnded = true;
1✔
379
        if (activeStreams == 0 && !finishedCallToBeRecorded) {
1✔
380
          shouldRecordFinishedCall = true;
1✔
381
          finishedCallToBeRecorded = true;
1✔
382
        }
383
      }
1✔
384
      if (shouldRecordFinishedCall) {
1✔
385
        recordFinishedCall();
1✔
386
      }
387
    }
1✔
388

389
    void recordFinishedCall() {
390
      if (attemptsPerCall.get() == 0) {
1✔
391
        ClientTracer tracer = newClientTracer(null);
1✔
392
        tracer.attemptNanos = attemptStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
393
        tracer.statusCode = status.getCode();
1✔
394
        tracer.recordFinishedAttempt();
1✔
395
      }
396
      callLatencyNanos = callStopWatch.elapsed(TimeUnit.NANOSECONDS);
1✔
397
      io.opentelemetry.api.common.Attributes attribute =
1✔
398
          io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName,
1✔
399
              TARGET_KEY, target,
400
              STATUS_KEY, status.getCode().toString());
1✔
401

402
      if (module.resource.clientCallDurationCounter() != null) {
1✔
403
        module.resource.clientCallDurationCounter()
1✔
404
            .record(callLatencyNanos * SECONDS_PER_NANO, attribute);
1✔
405
      }
406
    }
1✔
407
  }
408

409
  private static final class ServerTracer extends ServerStreamTracer {
410
    @Nullable private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
411
    @Nullable private static final AtomicLongFieldUpdater<ServerTracer> outboundWireSizeUpdater;
412
    @Nullable private static final AtomicLongFieldUpdater<ServerTracer> inboundWireSizeUpdater;
413

414
    /*
415
     * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
416
     * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fall back to
417
     * (potentially racy) direct updates of the volatile variables.
418
     */
419
    static {
420
      AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
421
      AtomicLongFieldUpdater<ServerTracer> tmpOutboundWireSizeUpdater;
422
      AtomicLongFieldUpdater<ServerTracer> tmpInboundWireSizeUpdater;
423
      try {
424
        tmpStreamClosedUpdater =
1✔
425
            AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
1✔
426
        tmpOutboundWireSizeUpdater =
1✔
427
            AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundWireSize");
1✔
428
        tmpInboundWireSizeUpdater =
1✔
429
            AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundWireSize");
1✔
430
      } catch (Throwable t) {
×
431
        logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
432
        tmpStreamClosedUpdater = null;
×
433
        tmpOutboundWireSizeUpdater = null;
×
434
        tmpInboundWireSizeUpdater = null;
×
435
      }
1✔
436
      streamClosedUpdater = tmpStreamClosedUpdater;
1✔
437
      outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
1✔
438
      inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
1✔
439
    }
1✔
440

441
    private final OpenTelemetryMetricsModule module;
442
    private final String fullMethodName;
443
    private final List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins;
444
    private volatile boolean isGeneratedMethod;
445
    private volatile int streamClosed;
446
    private final Stopwatch stopwatch;
447
    private volatile long outboundWireSize;
448
    private volatile long inboundWireSize;
449

450
    ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName,
451
        List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins) {
1✔
452
      this.module = checkNotNull(module, "module");
1✔
453
      this.fullMethodName = fullMethodName;
1✔
454
      this.streamPlugins = checkNotNull(streamPlugins, "streamPlugins");
1✔
455
      this.stopwatch = module.stopwatchSupplier.get().start();
1✔
456
    }
1✔
457

458
    @Override
459
    public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
460
      // Only record method name as an attribute if isSampledToLocalTracing is set to true,
461
      // which is true for all generated methods. Otherwise, programmatically
462
      // created methods result in high cardinality metrics.
463
      boolean isSampledToLocalTracing = callInfo.getMethodDescriptor().isSampledToLocalTracing();
1✔
464
      isGeneratedMethod = isSampledToLocalTracing;
1✔
465
      io.opentelemetry.api.common.Attributes attribute =
1✔
466
          io.opentelemetry.api.common.Attributes.of(
1✔
467
              METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));
1✔
468

469
      if (module.resource.serverCallCountCounter() != null) {
1✔
470
        module.resource.serverCallCountCounter().add(1, attribute);
1✔
471
      }
472
    }
1✔
473

474
    @Override
475
    @SuppressWarnings("NonAtomicVolatileUpdate")
476
    public void outboundWireSize(long bytes) {
477
      if (outboundWireSizeUpdater != null) {
1✔
478
        outboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
479
      } else {
480
        outboundWireSize += bytes;
×
481
      }
482
    }
1✔
483

484
    @Override
485
    @SuppressWarnings("NonAtomicVolatileUpdate")
486
    public void inboundWireSize(long bytes) {
487
      if (inboundWireSizeUpdater != null) {
1✔
488
        inboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
489
      } else {
490
        inboundWireSize += bytes;
×
491
      }
492
    }
1✔
493

494
    /**
495
     * Record a finished stream and mark the current time as the end time.
496
     *
497
     * <p>Can be called from any thread without synchronization.  Calling it the second time or more
498
     * is a no-op.
499
     */
500
    @Override
501
    public void streamClosed(Status status) {
502
      if (streamClosedUpdater != null) {
1✔
503
        if (streamClosedUpdater.getAndSet(this, 1) != 0) {
1✔
504
          return;
×
505
        }
506
      } else {
507
        if (streamClosed != 0) {
×
508
          return;
×
509
        }
510
        streamClosed = 1;
×
511
      }
512
      stopwatch.stop();
1✔
513
      long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
514
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
515
          .put(METHOD_KEY, recordMethodName(fullMethodName, isGeneratedMethod))
1✔
516
          .put(STATUS_KEY, status.getCode().toString());
1✔
517
      for (OpenTelemetryPlugin.ServerStreamPlugin plugin : streamPlugins) {
1✔
518
        plugin.addLabels(builder);
×
519
      }
×
520
      io.opentelemetry.api.common.Attributes attributes = builder.build();
1✔
521

522
      if (module.resource.serverCallDurationCounter() != null) {
1✔
523
        module.resource.serverCallDurationCounter()
1✔
524
            .record(elapsedTimeNanos * SECONDS_PER_NANO, attributes);
1✔
525
      }
526
      if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) {
1✔
527
        module.resource.serverTotalSentCompressedMessageSizeCounter()
1✔
528
            .record(outboundWireSize, attributes);
1✔
529
      }
530
      if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) {
1✔
531
        module.resource.serverTotalReceivedCompressedMessageSizeCounter()
1✔
532
            .record(inboundWireSize, attributes);
1✔
533
      }
534
    }
1✔
535
  }
536

537
  @VisibleForTesting
538
  final class ServerTracerFactory extends ServerStreamTracer.Factory {
1✔
539
    @Override
540
    public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
541
      final List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins;
542
      if (plugins.isEmpty()) {
1✔
543
        streamPlugins = Collections.emptyList();
1✔
544
      } else {
545
        List<OpenTelemetryPlugin.ServerStreamPlugin> streamPluginsMutable =
×
546
            new ArrayList<>(plugins.size());
×
547
        for (OpenTelemetryPlugin plugin : plugins) {
×
548
          streamPluginsMutable.add(plugin.newServerStreamPlugin(headers));
×
549
        }
×
550
        streamPlugins = Collections.unmodifiableList(streamPluginsMutable);
×
551
      }
552
      return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins);
1✔
553
    }
554
  }
555

556
  @VisibleForTesting
557
  final class MetricsClientInterceptor implements ClientInterceptor {
558
    private final String target;
559
    private final ImmutableList<OpenTelemetryPlugin> plugins;
560

561
    MetricsClientInterceptor(String target, ImmutableList<OpenTelemetryPlugin> plugins) {
1✔
562
      this.target = checkNotNull(target, "target");
1✔
563
      this.plugins = checkNotNull(plugins, "plugins");
1✔
564
    }
1✔
565

566
    @Override
567
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
568
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
569
      final List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins;
570
      if (plugins.isEmpty()) {
1✔
571
        callPlugins = Collections.emptyList();
1✔
572
      } else {
573
        List<OpenTelemetryPlugin.ClientCallPlugin> callPluginsMutable =
×
574
            new ArrayList<>(plugins.size());
×
575
        for (OpenTelemetryPlugin plugin : plugins) {
×
576
          callPluginsMutable.add(plugin.newClientCallPlugin());
×
577
        }
×
578
        callPlugins = Collections.unmodifiableList(callPluginsMutable);
×
579
        for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
×
580
          callOptions = plugin.filterCallOptions(callOptions);
×
581
        }
×
582
      }
583
      // Only record method name as an attribute if isSampledToLocalTracing is set to true,
584
      // which is true for all generated methods. Otherwise, programatically
585
      // created methods result in high cardinality metrics.
586
      final CallAttemptsTracerFactory tracerFactory = new CallAttemptsTracerFactory(
1✔
587
          OpenTelemetryMetricsModule.this, target,
588
          recordMethodName(method.getFullMethodName(), method.isSampledToLocalTracing()),
1✔
589
          callPlugins);
590
      ClientCall<ReqT, RespT> call =
1✔
591
          next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
1✔
592
      return new SimpleForwardingClientCall<ReqT, RespT>(call) {
1✔
593
        @Override
594
        public void start(Listener<RespT> responseListener, Metadata headers) {
595
          for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
596
            plugin.addMetadata(headers);
×
597
          }
×
598
          delegate().start(
1✔
599
              new SimpleForwardingClientCallListener<RespT>(responseListener) {
1✔
600
                @Override
601
                public void onClose(Status status, Metadata trailers) {
602
                  tracerFactory.callEnded(status);
1✔
603
                  super.onClose(status, trailers);
1✔
604
                }
1✔
605
              },
606
              headers);
607
        }
1✔
608
      };
609
    }
610
  }
611
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc