• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19216

09 May 2024 02:28AM UTC coverage: 88.369% (-0.007%) from 88.376%
#19216

push

github

ejona86
opentelemetry: Add grpc.target label to per-call metrics

As defined by gRFC A66, the target is on all client-side per-call
metrics (both call and attempt).

31538 of 35689 relevant lines covered (88.37%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.39
/../core/src/main/java/io/grpc/internal/CallCredentialsApplyingTransportFactory.java
1
/*
2
 * Copyright 2016,2022 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.MoreObjects.firstNonNull;
20
import static com.google.common.base.Preconditions.checkNotNull;
21

22
import io.grpc.Attributes;
23
import io.grpc.CallCredentials;
24
import io.grpc.CallCredentials.RequestInfo;
25
import io.grpc.CallOptions;
26
import io.grpc.ChannelCredentials;
27
import io.grpc.ChannelLogger;
28
import io.grpc.ClientStreamTracer;
29
import io.grpc.CompositeCallCredentials;
30
import io.grpc.InternalMayRequireSpecificExecutor;
31
import io.grpc.Metadata;
32
import io.grpc.MethodDescriptor;
33
import io.grpc.SecurityLevel;
34
import io.grpc.Status;
35
import io.grpc.internal.MetadataApplierImpl.MetadataApplierListener;
36
import java.net.SocketAddress;
37
import java.util.Collection;
38
import java.util.concurrent.Executor;
39
import java.util.concurrent.ScheduledExecutorService;
40
import java.util.concurrent.atomic.AtomicInteger;
41
import javax.annotation.concurrent.GuardedBy;
42

43
final class CallCredentialsApplyingTransportFactory implements ClientTransportFactory {
44
  private final ClientTransportFactory delegate;
45
  private final CallCredentials channelCallCredentials;
46
  private final Executor appExecutor;
47

48
  CallCredentialsApplyingTransportFactory(
49
      ClientTransportFactory delegate, CallCredentials channelCallCredentials,
50
      Executor appExecutor) {
1✔
51
    this.delegate = checkNotNull(delegate, "delegate");
1✔
52
    this.channelCallCredentials = channelCallCredentials;
1✔
53
    this.appExecutor = checkNotNull(appExecutor, "appExecutor");
1✔
54
  }
1✔
55

56
  @Override
57
  public ConnectionClientTransport newClientTransport(
58
      SocketAddress serverAddress, ClientTransportOptions options, ChannelLogger channelLogger) {
59
    return new CallCredentialsApplyingTransport(
1✔
60
        delegate.newClientTransport(serverAddress, options, channelLogger), options.getAuthority());
1✔
61
  }
62

63
  @Override
64
  public ScheduledExecutorService getScheduledExecutorService() {
65
    return delegate.getScheduledExecutorService();
1✔
66
  }
67

68
  @Override
69
  public SwapChannelCredentialsResult swapChannelCredentials(ChannelCredentials channelCreds) {
70
    throw new UnsupportedOperationException();
×
71
  }
72

73
  @Override
74
  public void close() {
75
    delegate.close();
1✔
76
  }
1✔
77

78
  @Override
79
  public Collection<Class<? extends SocketAddress>> getSupportedSocketAddressTypes() {
80
    return delegate.getSupportedSocketAddressTypes();
×
81
  }
82

83
  private class CallCredentialsApplyingTransport extends ForwardingConnectionClientTransport {
84
    private final ConnectionClientTransport delegate;
85
    private final String authority;
86
    // Negative value means transport active, non-negative value indicates shutdown invoked.
87
    private final AtomicInteger pendingApplier = new AtomicInteger(Integer.MIN_VALUE + 1);
1✔
88
    private volatile Status shutdownStatus;
89
    @GuardedBy("this")
90
    private Status savedShutdownStatus;
91
    @GuardedBy("this")
92
    private Status savedShutdownNowStatus;
93
    private final MetadataApplierListener applierListener = new MetadataApplierListener() {
1✔
94
      @Override
95
      public void onComplete() {
96
        if (pendingApplier.decrementAndGet() == 0) {
1✔
97
          maybeShutdown();
1✔
98
        }
99
      }
1✔
100
    };
101

102
    CallCredentialsApplyingTransport(ConnectionClientTransport delegate, String authority) {
1✔
103
      this.delegate = checkNotNull(delegate, "delegate");
1✔
104
      this.authority = checkNotNull(authority, "authority");
1✔
105
    }
1✔
106

107
    @Override
108
    protected ConnectionClientTransport delegate() {
109
      return delegate;
1✔
110
    }
111

112
    @Override
113
    @SuppressWarnings("deprecation")
114
    public ClientStream newStream(
115
        final MethodDescriptor<?, ?> method, Metadata headers, final CallOptions callOptions,
116
        ClientStreamTracer[] tracers) {
117
      CallCredentials creds = callOptions.getCredentials();
1✔
118
      if (creds == null) {
1✔
119
        creds = channelCallCredentials;
1✔
120
      } else if (channelCallCredentials != null) {
1✔
121
        creds = new CompositeCallCredentials(channelCallCredentials, creds);
1✔
122
      }
123
      if (creds != null) {
1✔
124
        MetadataApplierImpl applier = new MetadataApplierImpl(
1✔
125
            delegate, method, headers, callOptions, applierListener, tracers);
126
        if (pendingApplier.incrementAndGet() > 0) {
1✔
127
          applierListener.onComplete();
1✔
128
          return new FailingClientStream(shutdownStatus, tracers);
1✔
129
        }
130
        RequestInfo requestInfo = new RequestInfo() {
1✔
131
            @Override
132
            public MethodDescriptor<?, ?> getMethodDescriptor() {
133
              return method;
1✔
134
            }
135

136
            @Override
137
            public CallOptions getCallOptions() {
138
              return callOptions;
1✔
139
            }
140

141
            @Override
142
            public SecurityLevel getSecurityLevel() {
143
              return firstNonNull(
1✔
144
                  delegate.getAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL),
1✔
145
                  SecurityLevel.NONE);
146
            }
147

148
            @Override
149
            public String getAuthority() {
150
              return firstNonNull(callOptions.getAuthority(), authority);
1✔
151
            }
152

153
            @Override
154
            public Attributes getTransportAttrs() {
155
              return delegate.getAttributes();
1✔
156
            }
157
          };
158
        try {
159
          // Hack to allow appengine to work when using AppEngineCredentials (b/244209681)
160
          // since processing must happen on a specific thread.
161
          //
162
          // Ideally would always use appExecutor and we could eliminate the interface
163
          // InternalMayRequireSpecificExecutor
164
          Executor executor;
165
          if (creds instanceof InternalMayRequireSpecificExecutor
1✔
166
              && ((InternalMayRequireSpecificExecutor)creds).isSpecificExecutorRequired()
×
167
              && callOptions.getExecutor() != null) {
×
168
            executor = callOptions.getExecutor();
×
169
          } else {
170
            executor = appExecutor;
1✔
171
          }
172

173
          creds.applyRequestMetadata(requestInfo, executor, applier);
1✔
174
        } catch (Throwable t) {
1✔
175
          applier.fail(Status.UNAUTHENTICATED
1✔
176
              .withDescription("Credentials should use fail() instead of throwing exceptions")
1✔
177
              .withCause(t));
1✔
178
        }
1✔
179
        return applier.returnStream();
1✔
180
      } else {
181
        if (pendingApplier.get() >= 0) {
1✔
182
          return new FailingClientStream(shutdownStatus, tracers);
1✔
183
        }
184
        return delegate.newStream(method, headers, callOptions, tracers);
1✔
185
      }
186
    }
187

188
    @Override
189
    public void shutdown(Status status) {
190
      checkNotNull(status, "status");
1✔
191
      synchronized (this) {
1✔
192
        if (pendingApplier.get() < 0) {
1✔
193
          shutdownStatus = status;
1✔
194
          pendingApplier.addAndGet(Integer.MAX_VALUE);
1✔
195
        } else {
196
          return;
1✔
197
        }
198
        if (pendingApplier.get() != 0) {
1✔
199
          savedShutdownStatus = status;
1✔
200
          return;
1✔
201
        }
202
      }
1✔
203
      super.shutdown(status);
1✔
204
    }
1✔
205

206
    // TODO(zivy): cancel pending applier here.
207
    @Override
208
    public void shutdownNow(Status status) {
209
      checkNotNull(status, "status");
1✔
210
      synchronized (this) {
1✔
211
        if (pendingApplier.get() < 0) {
1✔
212
          shutdownStatus = status;
1✔
213
          pendingApplier.addAndGet(Integer.MAX_VALUE);
1✔
214
        } else if (savedShutdownNowStatus != null) {
1✔
215
          return;
×
216
        }
217
        if (pendingApplier.get() != 0) {
1✔
218
          savedShutdownNowStatus = status;
1✔
219
          // TODO(zivy): propagate shutdownNow to the delegate immediately.
220
          return;
1✔
221
        }
222
      }
1✔
223
      super.shutdownNow(status);
1✔
224
    }
1✔
225

226
    private void maybeShutdown() {
227
      Status maybeShutdown;
228
      Status maybeShutdownNow;
229
      synchronized (this) {
1✔
230
        if (pendingApplier.get() != 0) {
1✔
231
          return;
×
232
        }
233
        maybeShutdown = savedShutdownStatus;
1✔
234
        maybeShutdownNow = savedShutdownNowStatus;
1✔
235
        savedShutdownStatus = null;
1✔
236
        savedShutdownNowStatus = null;
1✔
237
      }
1✔
238
      if (maybeShutdown != null) {
1✔
239
        super.shutdown(maybeShutdown);
1✔
240
      }
241
      if (maybeShutdownNow != null) {
1✔
242
        super.shutdownNow(maybeShutdownNow);
1✔
243
      }
244
    }
1✔
245
  }
246
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc