• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 1878

pending completion
1878

push

buildkite

web-flow
Update rpc-caller header of grpc (#824)

This might be a breaking change since it updates the rpc-caller header of grpc from uber-java to be a variable from clientOptions.
If there is any dependency on the value uber-java, updates the corresponding clientOptions value to be uber-java

1 of 1 new or added line in 1 file covered. (100.0%)

11123 of 18414 relevant lines covered (60.41%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java
1
/*
2
 *  Modifications Copyright (c) 2017-2021 Uber Technologies Inc.
3
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4
 *
5
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
6
 *  use this file except in compliance with the License. A copy of the License is
7
 *  located at
8
 *
9
 *  http://aws.amazon.com/apache2.0
10
 *
11
 *  or in the "license" file accompanying this file. This file is distributed on
12
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13
 *  express or implied. See the License for the specific language governing
14
 *  permissions and limitations under the License.
15
 */
16
package com.uber.cadence.internal.compatibility.proto.serviceclient;
17

18
import com.google.common.base.Strings;
19
import com.uber.cadence.api.v1.DomainAPIGrpc;
20
import com.uber.cadence.api.v1.MetaAPIGrpc;
21
import com.uber.cadence.api.v1.MetaAPIGrpc.MetaAPIBlockingStub;
22
import com.uber.cadence.api.v1.MetaAPIGrpc.MetaAPIFutureStub;
23
import com.uber.cadence.api.v1.VisibilityAPIGrpc;
24
import com.uber.cadence.api.v1.VisibilityAPIGrpc.VisibilityAPIBlockingStub;
25
import com.uber.cadence.api.v1.VisibilityAPIGrpc.VisibilityAPIFutureStub;
26
import com.uber.cadence.api.v1.WorkerAPIGrpc;
27
import com.uber.cadence.api.v1.WorkerAPIGrpc.WorkerAPIBlockingStub;
28
import com.uber.cadence.api.v1.WorkerAPIGrpc.WorkerAPIFutureStub;
29
import com.uber.cadence.api.v1.WorkflowAPIGrpc;
30
import com.uber.cadence.api.v1.WorkflowAPIGrpc.WorkflowAPIBlockingStub;
31
import com.uber.cadence.api.v1.WorkflowAPIGrpc.WorkflowAPIFutureStub;
32
import com.uber.cadence.internal.Version;
33
import com.uber.cadence.serviceclient.ClientOptions;
34
import io.grpc.CallOptions;
35
import io.grpc.Channel;
36
import io.grpc.ClientCall;
37
import io.grpc.ClientInterceptor;
38
import io.grpc.ClientInterceptors;
39
import io.grpc.Deadline;
40
import io.grpc.ForwardingClientCall;
41
import io.grpc.ForwardingClientCallListener;
42
import io.grpc.ManagedChannel;
43
import io.grpc.ManagedChannelBuilder;
44
import io.grpc.Metadata;
45
import io.grpc.MethodDescriptor;
46
import io.grpc.stub.MetadataUtils;
47
import io.opentelemetry.api.GlobalOpenTelemetry;
48
import io.opentelemetry.context.Context;
49
import io.opentelemetry.context.propagation.TextMapPropagator;
50
import io.opentelemetry.context.propagation.TextMapSetter;
51
import java.util.concurrent.TimeUnit;
52
import java.util.concurrent.atomic.AtomicBoolean;
53
import org.slf4j.Logger;
54
import org.slf4j.LoggerFactory;
55

56
final class GrpcServiceStubs implements IGrpcServiceStubs {
57

58
  private static final Logger log = LoggerFactory.getLogger(GrpcServiceStubs.class);
×
59
  private static final Metadata.Key<String> LIBRARY_VERSION_HEADER_KEY =
×
60
      Metadata.Key.of("cadence-client-library-version", Metadata.ASCII_STRING_MARSHALLER);
×
61
  private static final Metadata.Key<String> FEATURE_VERSION_HEADER_KEY =
×
62
      Metadata.Key.of("cadence-client-feature-version", Metadata.ASCII_STRING_MARSHALLER);
×
63
  private static final Metadata.Key<String> CLIENT_IMPL_HEADER_KEY =
×
64
      Metadata.Key.of("cadence-client-name", Metadata.ASCII_STRING_MARSHALLER);
×
65
  private static final Metadata.Key<String> ISOLATION_GROUP_HEADER_KEY =
×
66
      Metadata.Key.of("cadence-client-isolation-group", Metadata.ASCII_STRING_MARSHALLER);
×
67
  private static final Metadata.Key<String> RPC_SERVICE_NAME_HEADER_KEY =
×
68
      Metadata.Key.of("rpc-service", Metadata.ASCII_STRING_MARSHALLER);
×
69
  private static final Metadata.Key<String> RPC_CALLER_NAME_HEADER_KEY =
×
70
      Metadata.Key.of("rpc-caller", Metadata.ASCII_STRING_MARSHALLER);
×
71
  private static final Metadata.Key<String> RPC_ENCODING_HEADER_KEY =
×
72
      Metadata.Key.of("rpc-encoding", Metadata.ASCII_STRING_MARSHALLER);
×
73

74
  private static final String CLIENT_IMPL_HEADER_VALUE = "uber-java";
75

76
  private final ManagedChannel channel;
77
  private final boolean shutdownChannel;
78
  private final AtomicBoolean shutdownRequested = new AtomicBoolean();
×
79
  private final DomainAPIGrpc.DomainAPIBlockingStub domainBlockingStub;
80
  private final DomainAPIGrpc.DomainAPIFutureStub domainFutureStub;
81
  private final VisibilityAPIGrpc.VisibilityAPIBlockingStub visibilityBlockingStub;
82
  private final VisibilityAPIGrpc.VisibilityAPIFutureStub visibilityFutureStub;
83
  private final WorkerAPIGrpc.WorkerAPIBlockingStub workerBlockingStub;
84
  private final WorkerAPIGrpc.WorkerAPIFutureStub workerFutureStub;
85
  private final WorkflowAPIGrpc.WorkflowAPIBlockingStub workflowBlockingStub;
86
  private final WorkflowAPIGrpc.WorkflowAPIFutureStub workflowFutureStub;
87
  private final MetaAPIGrpc.MetaAPIBlockingStub metaBlockingStub;
88
  private final MetaAPIGrpc.MetaAPIFutureStub metaFutureStub;
89

90
  GrpcServiceStubs(ClientOptions options) {
×
91
    if (options.getGRPCChannel() != null) {
×
92
      this.channel = options.getGRPCChannel();
×
93
      shutdownChannel = false;
×
94
    } else {
95
      this.channel =
×
96
          ManagedChannelBuilder.forAddress(options.getHost(), options.getPort())
×
97
              .defaultLoadBalancingPolicy("round_robin")
×
98
              .usePlaintext()
×
99
              .build();
×
100
      shutdownChannel = true;
×
101
    }
102
    ClientInterceptor deadlineInterceptor = new GrpcDeadlineInterceptor(options);
×
103
    ClientInterceptor tracingInterceptor = newTracingInterceptor();
×
104
    Metadata headers = new Metadata();
×
105
    headers.put(LIBRARY_VERSION_HEADER_KEY, Version.LIBRARY_VERSION);
×
106
    headers.put(FEATURE_VERSION_HEADER_KEY, Version.FEATURE_VERSION);
×
107
    headers.put(CLIENT_IMPL_HEADER_KEY, CLIENT_IMPL_HEADER_VALUE);
×
108
    headers.put(RPC_SERVICE_NAME_HEADER_KEY, options.getServiceName());
×
109
    headers.put(RPC_CALLER_NAME_HEADER_KEY, options.getClientAppName());
×
110
    headers.put(RPC_ENCODING_HEADER_KEY, "proto");
×
111
    if (!Strings.isNullOrEmpty(options.getIsolationGroup())) {
×
112
      headers.put(ISOLATION_GROUP_HEADER_KEY, options.getIsolationGroup());
×
113
    }
114
    Channel interceptedChannel =
×
115
        ClientInterceptors.intercept(
×
116
            channel,
117
            deadlineInterceptor,
118
            MetadataUtils.newAttachHeadersInterceptor(headers),
×
119
            newOpenTelemetryInterceptor());
×
120
    if (log.isTraceEnabled()) {
×
121
      interceptedChannel = ClientInterceptors.intercept(interceptedChannel, tracingInterceptor);
×
122
    }
123
    this.domainBlockingStub = DomainAPIGrpc.newBlockingStub(interceptedChannel);
×
124
    this.domainFutureStub = DomainAPIGrpc.newFutureStub(interceptedChannel);
×
125
    this.visibilityBlockingStub = VisibilityAPIGrpc.newBlockingStub(interceptedChannel);
×
126
    this.visibilityFutureStub = VisibilityAPIGrpc.newFutureStub(interceptedChannel);
×
127
    this.workerBlockingStub = WorkerAPIGrpc.newBlockingStub(interceptedChannel);
×
128
    this.workerFutureStub = WorkerAPIGrpc.newFutureStub(interceptedChannel);
×
129
    this.workflowBlockingStub = WorkflowAPIGrpc.newBlockingStub(interceptedChannel);
×
130
    this.workflowFutureStub = WorkflowAPIGrpc.newFutureStub(interceptedChannel);
×
131
    this.metaBlockingStub = MetaAPIGrpc.newBlockingStub(interceptedChannel);
×
132
    this.metaFutureStub = MetaAPIGrpc.newFutureStub(interceptedChannel);
×
133
  }
×
134

135
  private ClientInterceptor newOpenTelemetryInterceptor() {
136
    return new ClientInterceptor() {
×
137
      @Override
138
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
139
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
140
        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
×
141
            next.newCall(method, callOptions)) {
×
142

143
          @Override
144
          public void start(Listener<RespT> responseListener, Metadata headers) {
145
            TextMapPropagator propagator =
146
                GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
×
147

148
            final TextMapSetter<Metadata> setter =
×
149
                (carrier, key, value) -> {
150
                  if (carrier != null) {
×
151
                    carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value);
×
152
                  }
153
                };
×
154
            if (propagator != null) {
×
155
              propagator.inject(Context.current(), headers, setter);
×
156
            }
157

158
            super.start(responseListener, headers);
×
159
          }
×
160
        };
161
      }
162
    };
163
  }
164

165
  private ClientInterceptor newTracingInterceptor() {
166
    return new ClientInterceptor() {
×
167

168
      @Override
169
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
170
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
171
        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
×
172
            next.newCall(method, callOptions)) {
×
173
          @Override
174
          public void sendMessage(ReqT message) {
175
            log.trace("Invoking " + method.getFullMethodName() + "with input: " + message);
×
176
            super.sendMessage(message);
×
177
          }
×
178

179
          @Override
180
          public void start(Listener<RespT> responseListener, Metadata headers) {
181
            Listener<RespT> listener =
×
182
                new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
183
                    responseListener) {
×
184
                  @Override
185
                  public void onMessage(RespT message) {
186
                    if (method == WorkerAPIGrpc.getPollForDecisionTaskMethod()) {
×
187
                      log.trace("Returned " + method.getFullMethodName());
×
188
                    } else {
189
                      log.trace(
×
190
                          "Returned " + method.getFullMethodName() + " with output: " + message);
×
191
                    }
192
                    super.onMessage(message);
×
193
                  }
×
194
                };
195
            super.start(listener, headers);
×
196
          }
×
197
        };
198
      }
199
    };
200
  }
201

202
  public DomainAPIGrpc.DomainAPIBlockingStub domainBlockingStub() {
203
    return domainBlockingStub;
×
204
  }
205

206
  public DomainAPIGrpc.DomainAPIFutureStub domainFutureStub() {
207
    return domainFutureStub;
×
208
  }
209

210
  @Override
211
  public VisibilityAPIBlockingStub visibilityBlockingStub() {
212
    return visibilityBlockingStub;
×
213
  }
214

215
  @Override
216
  public VisibilityAPIFutureStub visibilityFutureStub() {
217
    return visibilityFutureStub;
×
218
  }
219

220
  @Override
221
  public WorkerAPIBlockingStub workerBlockingStub() {
222
    return workerBlockingStub;
×
223
  }
224

225
  @Override
226
  public WorkerAPIFutureStub workerFutureStub() {
227
    return workerFutureStub;
×
228
  }
229

230
  @Override
231
  public WorkflowAPIBlockingStub workflowBlockingStub() {
232
    return workflowBlockingStub;
×
233
  }
234

235
  @Override
236
  public MetaAPIFutureStub metaFutureStub() {
237
    return metaFutureStub;
×
238
  }
239

240
  @Override
241
  public MetaAPIBlockingStub metaBlockingStub() {
242
    return metaBlockingStub;
×
243
  }
244

245
  @Override
246
  public WorkflowAPIFutureStub workflowFutureStub() {
247
    return workflowFutureStub;
×
248
  }
249

250
  @Override
251
  public void shutdown() {
252
    shutdownRequested.set(true);
×
253
    if (shutdownChannel) {
×
254
      channel.shutdown();
×
255
    }
256
  }
×
257

258
  @Override
259
  public void shutdownNow() {
260
    shutdownRequested.set(true);
×
261
    if (shutdownChannel) {
×
262
      channel.shutdownNow();
×
263
    }
264
  }
×
265

266
  @Override
267
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
268
    if (shutdownChannel) {
×
269
      return channel.awaitTermination(timeout, unit);
×
270
    }
271
    return true;
×
272
  }
273

274
  @Override
275
  public boolean isShutdown() {
276
    if (shutdownChannel) {
×
277
      return channel.isShutdown();
×
278
    }
279
    return shutdownRequested.get();
×
280
  }
281

282
  @Override
283
  public boolean isTerminated() {
284
    if (shutdownChannel) {
×
285
      return channel.isTerminated();
×
286
    }
287
    return shutdownRequested.get();
×
288
  }
289

290
  private static class GrpcDeadlineInterceptor implements ClientInterceptor {
291

292
    private final ClientOptions options;
293

294
    public GrpcDeadlineInterceptor(ClientOptions options) {
×
295
      this.options = options;
×
296
    }
×
297

298
    @Override
299
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
300
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
301
      Deadline deadline = callOptions.getDeadline();
×
302
      long duration;
303
      if (deadline == null) {
×
304
        duration = options.getRpcTimeoutMillis();
×
305
      } else {
306
        duration = deadline.timeRemaining(TimeUnit.MILLISECONDS);
×
307
      }
308
      if (method == WorkflowAPIGrpc.getGetWorkflowExecutionHistoryMethod()) {
×
309
        if (deadline == null) {
×
310
          duration = options.getRpcLongPollTimeoutMillis();
×
311
        } else {
312
          duration = deadline.timeRemaining(TimeUnit.MILLISECONDS);
×
313
          if (duration > options.getRpcLongPollTimeoutMillis()) {
×
314
            duration = options.getRpcLongPollTimeoutMillis();
×
315
          }
316
        }
317
      } else if (method == WorkerAPIGrpc.getPollForDecisionTaskMethod()
×
318
          || method == WorkerAPIGrpc.getPollForActivityTaskMethod()) {
×
319
        duration = options.getRpcLongPollTimeoutMillis();
×
320
      } else if (method == WorkflowAPIGrpc.getQueryWorkflowMethod()) {
×
321
        duration = options.getRpcQueryTimeoutMillis();
×
322
      }
323
      if (log.isTraceEnabled()) {
×
324
        String name = method.getFullMethodName();
×
325
        log.trace("TimeoutInterceptor method=" + name + ", timeoutMs=" + duration);
×
326
      }
327
      return next.newCall(method, callOptions.withDeadlineAfter(duration, TimeUnit.MILLISECONDS));
×
328
    }
329
  }
330
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc