• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 16

16 Apr 2024 01:28AM UTC coverage: 60.239% (-0.1%) from 60.343%
16

push

buildkite

mstifflin
Remove unnecessary sidecar command, try executing with lower resources

11446 of 19001 relevant lines covered (60.24%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java
1
/*
2
 *  Modifications Copyright (c) 2017-2021 Uber Technologies Inc.
3
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4
 *
5
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
6
 *  use this file except in compliance with the License. A copy of the License is
7
 *  located at
8
 *
9
 *  http://aws.amazon.com/apache2.0
10
 *
11
 *  or in the "license" file accompanying this file. This file is distributed on
12
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13
 *  express or implied. See the License for the specific language governing
14
 *  permissions and limitations under the License.
15
 */
16
package com.uber.cadence.internal.compatibility.proto.serviceclient;
17

18
import com.google.common.base.Strings;
19
import com.google.protobuf.ByteString;
20
import com.uber.cadence.api.v1.*;
21
import com.uber.cadence.api.v1.DomainAPIGrpc;
22
import com.uber.cadence.api.v1.MetaAPIGrpc;
23
import com.uber.cadence.api.v1.MetaAPIGrpc.MetaAPIBlockingStub;
24
import com.uber.cadence.api.v1.MetaAPIGrpc.MetaAPIFutureStub;
25
import com.uber.cadence.api.v1.VisibilityAPIGrpc;
26
import com.uber.cadence.api.v1.VisibilityAPIGrpc.VisibilityAPIBlockingStub;
27
import com.uber.cadence.api.v1.VisibilityAPIGrpc.VisibilityAPIFutureStub;
28
import com.uber.cadence.api.v1.WorkerAPIGrpc;
29
import com.uber.cadence.api.v1.WorkerAPIGrpc.WorkerAPIBlockingStub;
30
import com.uber.cadence.api.v1.WorkerAPIGrpc.WorkerAPIFutureStub;
31
import com.uber.cadence.api.v1.WorkflowAPIGrpc;
32
import com.uber.cadence.api.v1.WorkflowAPIGrpc.WorkflowAPIBlockingStub;
33
import com.uber.cadence.api.v1.WorkflowAPIGrpc.WorkflowAPIFutureStub;
34
import com.uber.cadence.internal.Version;
35
import com.uber.cadence.internal.tracing.TracingPropagator;
36
import com.uber.cadence.serviceclient.ClientOptions;
37
import io.grpc.CallOptions;
38
import io.grpc.Channel;
39
import io.grpc.ClientCall;
40
import io.grpc.ClientInterceptor;
41
import io.grpc.ClientInterceptors;
42
import io.grpc.Deadline;
43
import io.grpc.ForwardingClientCall;
44
import io.grpc.ForwardingClientCallListener;
45
import io.grpc.ManagedChannel;
46
import io.grpc.ManagedChannelBuilder;
47
import io.grpc.Metadata;
48
import io.grpc.MethodDescriptor;
49
import io.grpc.stub.MetadataUtils;
50
import io.opentelemetry.api.GlobalOpenTelemetry;
51
import io.opentelemetry.context.Context;
52
import io.opentelemetry.context.propagation.TextMapPropagator;
53
import io.opentelemetry.context.propagation.TextMapSetter;
54
import io.opentracing.Span;
55
import io.opentracing.Tracer;
56
import java.util.HashMap;
57
import java.util.Map;
58
import java.util.Objects;
59
import java.util.concurrent.TimeUnit;
60
import java.util.concurrent.atomic.AtomicBoolean;
61
import org.slf4j.Logger;
62
import org.slf4j.LoggerFactory;
63

64
final class GrpcServiceStubs implements IGrpcServiceStubs {
65

66
  private static final Logger log = LoggerFactory.getLogger(GrpcServiceStubs.class);
×
67
  private static final Metadata.Key<String> LIBRARY_VERSION_HEADER_KEY =
×
68
      Metadata.Key.of("cadence-client-library-version", Metadata.ASCII_STRING_MARSHALLER);
×
69
  private static final Metadata.Key<String> FEATURE_VERSION_HEADER_KEY =
×
70
      Metadata.Key.of("cadence-client-feature-version", Metadata.ASCII_STRING_MARSHALLER);
×
71
  private static final Metadata.Key<String> CLIENT_IMPL_HEADER_KEY =
×
72
      Metadata.Key.of("cadence-client-name", Metadata.ASCII_STRING_MARSHALLER);
×
73
  private static final Metadata.Key<String> ISOLATION_GROUP_HEADER_KEY =
×
74
      Metadata.Key.of("cadence-client-isolation-group", Metadata.ASCII_STRING_MARSHALLER);
×
75
  private static final Metadata.Key<String> RPC_SERVICE_NAME_HEADER_KEY =
×
76
      Metadata.Key.of("rpc-service", Metadata.ASCII_STRING_MARSHALLER);
×
77
  private static final Metadata.Key<String> RPC_CALLER_NAME_HEADER_KEY =
×
78
      Metadata.Key.of("rpc-caller", Metadata.ASCII_STRING_MARSHALLER);
×
79
  private static final Metadata.Key<String> RPC_ENCODING_HEADER_KEY =
×
80
      Metadata.Key.of("rpc-encoding", Metadata.ASCII_STRING_MARSHALLER);
×
81

82
  private static final String CLIENT_IMPL_HEADER_VALUE = "uber-java";
83

84
  private final ManagedChannel channel;
85
  private final boolean shutdownChannel;
86
  private final AtomicBoolean shutdownRequested = new AtomicBoolean();
×
87
  private final DomainAPIGrpc.DomainAPIBlockingStub domainBlockingStub;
88
  private final DomainAPIGrpc.DomainAPIFutureStub domainFutureStub;
89
  private final VisibilityAPIGrpc.VisibilityAPIBlockingStub visibilityBlockingStub;
90
  private final VisibilityAPIGrpc.VisibilityAPIFutureStub visibilityFutureStub;
91
  private final WorkerAPIGrpc.WorkerAPIBlockingStub workerBlockingStub;
92
  private final WorkerAPIGrpc.WorkerAPIFutureStub workerFutureStub;
93
  private final WorkflowAPIGrpc.WorkflowAPIBlockingStub workflowBlockingStub;
94
  private final WorkflowAPIGrpc.WorkflowAPIFutureStub workflowFutureStub;
95
  private final MetaAPIGrpc.MetaAPIBlockingStub metaBlockingStub;
96
  private final MetaAPIGrpc.MetaAPIFutureStub metaFutureStub;
97

98
  GrpcServiceStubs(ClientOptions options) {
×
99
    if (options.getGRPCChannel() != null) {
×
100
      this.channel = options.getGRPCChannel();
×
101
      shutdownChannel = false;
×
102
    } else {
103
      this.channel =
×
104
          ManagedChannelBuilder.forAddress(options.getHost(), options.getPort())
×
105
              .defaultLoadBalancingPolicy("round_robin")
×
106
              .usePlaintext()
×
107
              .build();
×
108
      shutdownChannel = true;
×
109
    }
110
    ClientInterceptor deadlineInterceptor = new GrpcDeadlineInterceptor(options);
×
111
    ClientInterceptor tracingInterceptor = newTracingInterceptor();
×
112
    Metadata headers = new Metadata();
×
113
    headers.put(LIBRARY_VERSION_HEADER_KEY, Version.LIBRARY_VERSION);
×
114
    headers.put(FEATURE_VERSION_HEADER_KEY, Version.FEATURE_VERSION);
×
115
    headers.put(CLIENT_IMPL_HEADER_KEY, CLIENT_IMPL_HEADER_VALUE);
×
116
    headers.put(RPC_SERVICE_NAME_HEADER_KEY, options.getServiceName());
×
117
    headers.put(RPC_CALLER_NAME_HEADER_KEY, options.getClientAppName());
×
118
    headers.put(RPC_ENCODING_HEADER_KEY, "proto");
×
119
    if (!Strings.isNullOrEmpty(options.getIsolationGroup())) {
×
120
      headers.put(ISOLATION_GROUP_HEADER_KEY, options.getIsolationGroup());
×
121
    }
122
    Channel interceptedChannel =
×
123
        ClientInterceptors.intercept(
×
124
            channel,
125
            deadlineInterceptor,
126
            MetadataUtils.newAttachHeadersInterceptor(headers),
×
127
            newOpenTelemetryInterceptor(),
×
128
            newOpenTracingInterceptor(options.getTracer()));
×
129
    if (log.isTraceEnabled()) {
×
130
      interceptedChannel = ClientInterceptors.intercept(interceptedChannel, tracingInterceptor);
×
131
    }
132
    this.domainBlockingStub = DomainAPIGrpc.newBlockingStub(interceptedChannel);
×
133
    this.domainFutureStub = DomainAPIGrpc.newFutureStub(interceptedChannel);
×
134
    this.visibilityBlockingStub = VisibilityAPIGrpc.newBlockingStub(interceptedChannel);
×
135
    this.visibilityFutureStub = VisibilityAPIGrpc.newFutureStub(interceptedChannel);
×
136
    this.workerBlockingStub = WorkerAPIGrpc.newBlockingStub(interceptedChannel);
×
137
    this.workerFutureStub = WorkerAPIGrpc.newFutureStub(interceptedChannel);
×
138
    this.workflowBlockingStub = WorkflowAPIGrpc.newBlockingStub(interceptedChannel);
×
139
    this.workflowFutureStub = WorkflowAPIGrpc.newFutureStub(interceptedChannel);
×
140
    this.metaBlockingStub = MetaAPIGrpc.newBlockingStub(interceptedChannel);
×
141
    this.metaFutureStub = MetaAPIGrpc.newFutureStub(interceptedChannel);
×
142
  }
×
143

144
  private ClientInterceptor newOpenTelemetryInterceptor() {
145
    return new ClientInterceptor() {
×
146
      @Override
147
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
148
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
149
        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
×
150
            next.newCall(method, callOptions)) {
×
151

152
          @Override
153
          public void start(Listener<RespT> responseListener, Metadata headers) {
154
            TextMapPropagator propagator =
155
                GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
×
156

157
            final TextMapSetter<Metadata> setter =
×
158
                (carrier, key, value) -> {
159
                  if (carrier != null) {
×
160
                    carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value);
×
161
                  }
162
                };
×
163
            if (propagator != null) {
×
164
              propagator.inject(Context.current(), headers, setter);
×
165
            }
166

167
            super.start(responseListener, headers);
×
168
          }
×
169
        };
170
      }
171
    };
172
  }
173

174
  private ClientInterceptor newOpenTracingInterceptor(Tracer tracer) {
175
    return new ClientInterceptor() {
×
176
      private final TracingPropagator tracingPropagator = new TracingPropagator(tracer);
×
177
      private final String OPERATIONFORMAT = "cadence-%s";
×
178

179
      @Override
180
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
181
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
182
        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
×
183
            next.newCall(method, callOptions)) {
×
184

185
          @Override
186
          public void start(Listener<RespT> responseListener, Metadata headers) {
187
            Span span =
×
188
                tracingPropagator.activateSpanByServiceMethod(
×
189
                    String.format(OPERATIONFORMAT, method.getBareMethodName()));
×
190
            super.start(responseListener, headers);
×
191
            span.finish();
×
192
          }
×
193

194
          @Override
195
          public void sendMessage(ReqT message) {
196
            if (Objects.equals(method.getBareMethodName(), "StartWorkflowExecution")
×
197
                && message instanceof StartWorkflowExecutionRequest) {
198
              StartWorkflowExecutionRequest request = (StartWorkflowExecutionRequest) message;
×
199
              Map<String, byte[]> headers = new HashMap<>();
×
200
              tracingPropagator.inject(headers);
×
201
              Header.Builder headerBuilder = request.getHeader().toBuilder();
×
202
              headers.forEach(
×
203
                  (k, v) -> {
204
                    headerBuilder.putFields(
×
205
                        k, Payload.newBuilder().setData(ByteString.copyFrom(v)).build());
×
206
                  });
×
207

208
              // cast should not throw error as we are using the builder
209
              message =
×
210
                  (ReqT)
211
                      ((StartWorkflowExecutionRequest) message)
212
                          .toBuilder()
×
213
                          .setHeader(headerBuilder.build())
×
214
                          .build();
×
215
            }
216
            if (Objects.equals(method.getBareMethodName(), "SignalWithStartWorkflowExecution")
×
217
                && message instanceof SignalWithStartWorkflowExecutionRequest) {
218
              SignalWithStartWorkflowExecutionRequest request =
×
219
                  (SignalWithStartWorkflowExecutionRequest) message;
220
              Map<String, byte[]> headers = new HashMap<>();
×
221
              tracingPropagator.inject(headers);
×
222
              Header.Builder headerBuilder = request.getStartRequest().getHeader().toBuilder();
×
223
              headers.forEach(
×
224
                  (k, v) -> {
225
                    headerBuilder.putFields(
×
226
                        k, Payload.newBuilder().setData(ByteString.copyFrom(v)).build());
×
227
                  });
×
228

229
              // cast should not throw error as we are using the builder
230
              message =
×
231
                  (ReqT)
232
                      ((SignalWithStartWorkflowExecutionRequest) message)
233
                          .toBuilder()
×
234
                          .setStartRequest(
×
235
                              request
236
                                  .getStartRequest()
×
237
                                  .toBuilder()
×
238
                                  .setHeader(headerBuilder.build()))
×
239
                          .build();
×
240
            }
241
            super.sendMessage(message);
×
242
          }
×
243
        };
244
      }
245
    };
246
  }
247

248
  private ClientInterceptor newTracingInterceptor() {
249
    return new ClientInterceptor() {
×
250

251
      @Override
252
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
253
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
254
        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
×
255
            next.newCall(method, callOptions)) {
×
256
          @Override
257
          public void sendMessage(ReqT message) {
258
            log.trace("Invoking " + method.getFullMethodName() + "with input: " + message);
×
259
            super.sendMessage(message);
×
260
          }
×
261

262
          @Override
263
          public void start(Listener<RespT> responseListener, Metadata headers) {
264
            Listener<RespT> listener =
×
265
                new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
266
                    responseListener) {
×
267
                  @Override
268
                  public void onMessage(RespT message) {
269
                    if (method == WorkerAPIGrpc.getPollForDecisionTaskMethod()) {
×
270
                      log.trace("Returned " + method.getFullMethodName());
×
271
                    } else {
272
                      log.trace(
×
273
                          "Returned " + method.getFullMethodName() + " with output: " + message);
×
274
                    }
275
                    super.onMessage(message);
×
276
                  }
×
277
                };
278
            super.start(listener, headers);
×
279
          }
×
280
        };
281
      }
282
    };
283
  }
284

285
  public DomainAPIGrpc.DomainAPIBlockingStub domainBlockingStub() {
286
    return domainBlockingStub;
×
287
  }
288

289
  public DomainAPIGrpc.DomainAPIFutureStub domainFutureStub() {
290
    return domainFutureStub;
×
291
  }
292

293
  @Override
294
  public VisibilityAPIBlockingStub visibilityBlockingStub() {
295
    return visibilityBlockingStub;
×
296
  }
297

298
  @Override
299
  public VisibilityAPIFutureStub visibilityFutureStub() {
300
    return visibilityFutureStub;
×
301
  }
302

303
  @Override
304
  public WorkerAPIBlockingStub workerBlockingStub() {
305
    return workerBlockingStub;
×
306
  }
307

308
  @Override
309
  public WorkerAPIFutureStub workerFutureStub() {
310
    return workerFutureStub;
×
311
  }
312

313
  @Override
314
  public WorkflowAPIBlockingStub workflowBlockingStub() {
315
    return workflowBlockingStub;
×
316
  }
317

318
  @Override
319
  public MetaAPIFutureStub metaFutureStub() {
320
    return metaFutureStub;
×
321
  }
322

323
  @Override
324
  public MetaAPIBlockingStub metaBlockingStub() {
325
    return metaBlockingStub;
×
326
  }
327

328
  @Override
329
  public WorkflowAPIFutureStub workflowFutureStub() {
330
    return workflowFutureStub;
×
331
  }
332

333
  @Override
334
  public void shutdown() {
335
    shutdownRequested.set(true);
×
336
    if (shutdownChannel) {
×
337
      channel.shutdown();
×
338
    }
339
  }
×
340

341
  @Override
342
  public void shutdownNow() {
343
    shutdownRequested.set(true);
×
344
    if (shutdownChannel) {
×
345
      channel.shutdownNow();
×
346
    }
347
  }
×
348

349
  @Override
350
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
351
    if (shutdownChannel) {
×
352
      return channel.awaitTermination(timeout, unit);
×
353
    }
354
    return true;
×
355
  }
356

357
  @Override
358
  public boolean isShutdown() {
359
    if (shutdownChannel) {
×
360
      return channel.isShutdown();
×
361
    }
362
    return shutdownRequested.get();
×
363
  }
364

365
  @Override
366
  public boolean isTerminated() {
367
    if (shutdownChannel) {
×
368
      return channel.isTerminated();
×
369
    }
370
    return shutdownRequested.get();
×
371
  }
372

373
  private static class GrpcDeadlineInterceptor implements ClientInterceptor {
374

375
    private final ClientOptions options;
376

377
    public GrpcDeadlineInterceptor(ClientOptions options) {
×
378
      this.options = options;
×
379
    }
×
380

381
    @Override
382
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
383
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
384
      Deadline deadline = callOptions.getDeadline();
×
385
      long duration;
386
      if (deadline == null) {
×
387
        duration = options.getRpcTimeoutMillis();
×
388
      } else {
389
        duration = deadline.timeRemaining(TimeUnit.MILLISECONDS);
×
390
      }
391
      if (method == WorkflowAPIGrpc.getGetWorkflowExecutionHistoryMethod()) {
×
392
        if (deadline == null) {
×
393
          duration = options.getRpcLongPollTimeoutMillis();
×
394
        } else {
395
          duration = deadline.timeRemaining(TimeUnit.MILLISECONDS);
×
396
          if (duration > options.getRpcLongPollTimeoutMillis()) {
×
397
            duration = options.getRpcLongPollTimeoutMillis();
×
398
          }
399
        }
400
      } else if (method == WorkerAPIGrpc.getPollForDecisionTaskMethod()
×
401
          || method == WorkerAPIGrpc.getPollForActivityTaskMethod()) {
×
402
        duration = options.getRpcLongPollTimeoutMillis();
×
403
      } else if (method == WorkflowAPIGrpc.getQueryWorkflowMethod()) {
×
404
        duration = options.getRpcQueryTimeoutMillis();
×
405
      }
406
      if (log.isTraceEnabled()) {
×
407
        String name = method.getFullMethodName();
×
408
        log.trace("TimeoutInterceptor method=" + name + ", timeoutMs=" + duration);
×
409
      }
410
      return next.newCall(method, callOptions.withDeadlineAfter(duration, TimeUnit.MILLISECONDS));
×
411
    }
412
  }
413
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc