• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2681

20 Nov 2024 10:22PM CUT coverage: 74.857%. First build
2681

Pull #963

buildkite

taylanisikdemir
Update codecov config with new github org
Pull Request #963: Update codecov config with new github org

14511 of 19385 relevant lines covered (74.86%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java
1
/*
2
 *  Modifications Copyright (c) 2017-2021 Uber Technologies Inc.
3
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4
 *
5
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
6
 *  use this file except in compliance with the License. A copy of the License is
7
 *  located at
8
 *
9
 *  http://aws.amazon.com/apache2.0
10
 *
11
 *  or in the "license" file accompanying this file. This file is distributed on
12
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13
 *  express or implied. See the License for the specific language governing
14
 *  permissions and limitations under the License.
15
 */
16
package com.uber.cadence.internal.compatibility.proto.serviceclient;
17

18
import com.google.common.base.Strings;
19
import com.google.protobuf.ByteString;
20
import com.uber.cadence.api.v1.*;
21
import com.uber.cadence.api.v1.DomainAPIGrpc;
22
import com.uber.cadence.api.v1.MetaAPIGrpc;
23
import com.uber.cadence.api.v1.MetaAPIGrpc.MetaAPIBlockingStub;
24
import com.uber.cadence.api.v1.MetaAPIGrpc.MetaAPIFutureStub;
25
import com.uber.cadence.api.v1.VisibilityAPIGrpc;
26
import com.uber.cadence.api.v1.VisibilityAPIGrpc.VisibilityAPIBlockingStub;
27
import com.uber.cadence.api.v1.VisibilityAPIGrpc.VisibilityAPIFutureStub;
28
import com.uber.cadence.api.v1.WorkerAPIGrpc;
29
import com.uber.cadence.api.v1.WorkerAPIGrpc.WorkerAPIBlockingStub;
30
import com.uber.cadence.api.v1.WorkerAPIGrpc.WorkerAPIFutureStub;
31
import com.uber.cadence.api.v1.WorkflowAPIGrpc;
32
import com.uber.cadence.api.v1.WorkflowAPIGrpc.WorkflowAPIBlockingStub;
33
import com.uber.cadence.api.v1.WorkflowAPIGrpc.WorkflowAPIFutureStub;
34
import com.uber.cadence.internal.Version;
35
import com.uber.cadence.internal.tracing.TracingPropagator;
36
import com.uber.cadence.serviceclient.ClientOptions;
37
import com.uber.cadence.serviceclient.auth.IAuthorizationProvider;
38
import io.grpc.*;
39
import io.grpc.stub.MetadataUtils;
40
import io.opentelemetry.api.GlobalOpenTelemetry;
41
import io.opentelemetry.context.Context;
42
import io.opentelemetry.context.propagation.TextMapPropagator;
43
import io.opentelemetry.context.propagation.TextMapSetter;
44
import io.opentracing.Scope;
45
import io.opentracing.Span;
46
import io.opentracing.Tracer;
47
import java.nio.charset.StandardCharsets;
48
import java.util.HashMap;
49
import java.util.Map;
50
import java.util.Objects;
51
import java.util.concurrent.TimeUnit;
52
import java.util.concurrent.atomic.AtomicBoolean;
53
import org.slf4j.Logger;
54
import org.slf4j.LoggerFactory;
55

56
final class GrpcServiceStubs implements IGrpcServiceStubs {
57

58
  private static final Logger log = LoggerFactory.getLogger(GrpcServiceStubs.class);
×
59
  private static final Metadata.Key<String> LIBRARY_VERSION_HEADER_KEY =
×
60
      Metadata.Key.of("cadence-client-library-version", Metadata.ASCII_STRING_MARSHALLER);
×
61
  private static final Metadata.Key<String> FEATURE_VERSION_HEADER_KEY =
×
62
      Metadata.Key.of("cadence-client-feature-version", Metadata.ASCII_STRING_MARSHALLER);
×
63
  private static final Metadata.Key<String> CLIENT_IMPL_HEADER_KEY =
×
64
      Metadata.Key.of("cadence-client-name", Metadata.ASCII_STRING_MARSHALLER);
×
65
  private static final Metadata.Key<String> ISOLATION_GROUP_HEADER_KEY =
×
66
      Metadata.Key.of("cadence-client-isolation-group", Metadata.ASCII_STRING_MARSHALLER);
×
67
  private static final Metadata.Key<String> RPC_SERVICE_NAME_HEADER_KEY =
×
68
      Metadata.Key.of("rpc-service", Metadata.ASCII_STRING_MARSHALLER);
×
69
  private static final Metadata.Key<String> RPC_CALLER_NAME_HEADER_KEY =
×
70
      Metadata.Key.of("rpc-caller", Metadata.ASCII_STRING_MARSHALLER);
×
71
  private static final Metadata.Key<String> RPC_ENCODING_HEADER_KEY =
×
72
      Metadata.Key.of("rpc-encoding", Metadata.ASCII_STRING_MARSHALLER);
×
73

74
  private static final Metadata.Key<String> AUTHORIZATION_HEADER_KEY =
×
75
      Metadata.Key.of("cadence-authorization", Metadata.ASCII_STRING_MARSHALLER);
×
76

77
  private static final String CLIENT_IMPL_HEADER_VALUE = "uber-java";
78

79
  private final ClientOptions options;
80
  private final ManagedChannel channel;
81
  private final boolean shutdownChannel;
82
  private final AtomicBoolean shutdownRequested = new AtomicBoolean();
×
83
  private final DomainAPIGrpc.DomainAPIBlockingStub domainBlockingStub;
84
  private final DomainAPIGrpc.DomainAPIFutureStub domainFutureStub;
85
  private final VisibilityAPIGrpc.VisibilityAPIBlockingStub visibilityBlockingStub;
86
  private final VisibilityAPIGrpc.VisibilityAPIFutureStub visibilityFutureStub;
87
  private final WorkerAPIGrpc.WorkerAPIBlockingStub workerBlockingStub;
88
  private final WorkerAPIGrpc.WorkerAPIFutureStub workerFutureStub;
89
  private final WorkflowAPIGrpc.WorkflowAPIBlockingStub workflowBlockingStub;
90
  private final WorkflowAPIGrpc.WorkflowAPIFutureStub workflowFutureStub;
91
  private final MetaAPIGrpc.MetaAPIBlockingStub metaBlockingStub;
92
  private final MetaAPIGrpc.MetaAPIFutureStub metaFutureStub;
93

94
  GrpcServiceStubs(ClientOptions options) {
×
95
    this.options = options;
×
96
    if (options.getGRPCChannel() != null) {
×
97
      this.channel = options.getGRPCChannel();
×
98
      shutdownChannel = false;
×
99
    } else {
100
      this.channel =
×
101
          ManagedChannelBuilder.forAddress(options.getHost(), options.getPort())
×
102
              .defaultLoadBalancingPolicy("round_robin")
×
103
              .usePlaintext()
×
104
              .build();
×
105
      shutdownChannel = true;
×
106
    }
107
    ClientInterceptor deadlineInterceptor = new GrpcDeadlineInterceptor(options);
×
108
    ClientInterceptor tracingInterceptor = newTracingInterceptor();
×
109
    Metadata headers = new Metadata();
×
110
    headers.put(LIBRARY_VERSION_HEADER_KEY, Version.LIBRARY_VERSION);
×
111
    headers.put(FEATURE_VERSION_HEADER_KEY, Version.FEATURE_VERSION);
×
112
    headers.put(CLIENT_IMPL_HEADER_KEY, CLIENT_IMPL_HEADER_VALUE);
×
113
    headers.put(RPC_SERVICE_NAME_HEADER_KEY, options.getServiceName());
×
114
    headers.put(RPC_CALLER_NAME_HEADER_KEY, options.getClientAppName());
×
115
    headers.put(RPC_ENCODING_HEADER_KEY, "proto");
×
116
    if (!Strings.isNullOrEmpty(options.getIsolationGroup())) {
×
117
      headers.put(ISOLATION_GROUP_HEADER_KEY, options.getIsolationGroup());
×
118
    }
119

120
    Channel interceptedChannel =
×
121
        ClientInterceptors.intercept(
×
122
            channel,
123
            deadlineInterceptor,
124
            MetadataUtils.newAttachHeadersInterceptor(headers),
×
125
            newOpenTelemetryInterceptor(),
×
126
            newOpenTracingInterceptor(options.getTracer()));
×
127
    if (log.isTraceEnabled()) {
×
128
      interceptedChannel = ClientInterceptors.intercept(interceptedChannel, tracingInterceptor);
×
129
    }
130
    if (options.getAuthProvider() != null) {
×
131
      interceptedChannel =
×
132
          ClientInterceptors.intercept(
×
133
              interceptedChannel, newAuthorizationInterceptor(options.getAuthProvider()));
×
134
    }
135
    this.domainBlockingStub = DomainAPIGrpc.newBlockingStub(interceptedChannel);
×
136
    this.domainFutureStub = DomainAPIGrpc.newFutureStub(interceptedChannel);
×
137
    this.visibilityBlockingStub = VisibilityAPIGrpc.newBlockingStub(interceptedChannel);
×
138
    this.visibilityFutureStub = VisibilityAPIGrpc.newFutureStub(interceptedChannel);
×
139
    this.workerBlockingStub = WorkerAPIGrpc.newBlockingStub(interceptedChannel);
×
140
    this.workerFutureStub = WorkerAPIGrpc.newFutureStub(interceptedChannel);
×
141
    this.workflowBlockingStub = WorkflowAPIGrpc.newBlockingStub(interceptedChannel);
×
142
    this.workflowFutureStub = WorkflowAPIGrpc.newFutureStub(interceptedChannel);
×
143
    this.metaBlockingStub = MetaAPIGrpc.newBlockingStub(interceptedChannel);
×
144
    this.metaFutureStub = MetaAPIGrpc.newFutureStub(interceptedChannel);
×
145
  }
×
146

147
  private ClientInterceptor newAuthorizationInterceptor(IAuthorizationProvider provider) {
148
    return new ClientInterceptor() {
×
149
      @Override
150
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
151
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
152
        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
×
153
            next.newCall(method, callOptions)) {
×
154

155
          @Override
156
          public void start(Listener<RespT> responseListener, Metadata headers) {
157
            headers.put(
×
158
                AUTHORIZATION_HEADER_KEY,
×
159
                new String(provider.getAuthToken(), StandardCharsets.UTF_8));
×
160

161
            Listener<RespT> listener =
×
162
                new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
163
                    responseListener) {
×
164

165
                  @Override
166
                  public void onHeaders(Metadata headers) {
167
                    super.onHeaders(headers);
×
168
                  }
×
169
                };
170
            super.start(listener, headers);
×
171
          }
×
172
        };
173
      }
174
    };
175
  }
176

177
  private ClientInterceptor newOpenTelemetryInterceptor() {
178
    return new ClientInterceptor() {
×
179
      @Override
180
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
181
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
182
        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
×
183
            next.newCall(method, callOptions)) {
×
184

185
          @Override
186
          public void start(Listener<RespT> responseListener, Metadata headers) {
187
            TextMapPropagator propagator =
188
                GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
×
189

190
            final TextMapSetter<Metadata> setter =
×
191
                (carrier, key, value) -> {
192
                  if (carrier != null) {
×
193
                    carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value);
×
194
                  }
195
                };
×
196
            if (propagator != null) {
×
197
              propagator.inject(Context.current(), headers, setter);
×
198
            }
199

200
            super.start(responseListener, headers);
×
201
          }
×
202
        };
203
      }
204
    };
205
  }
206

207
  private ClientInterceptor newOpenTracingInterceptor(Tracer tracer) {
208
    return new ClientInterceptor() {
×
209
      private final TracingPropagator tracingPropagator = new TracingPropagator(tracer);
×
210
      private final String OPERATIONFORMAT = "cadence-%s";
×
211

212
      @Override
213
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
214
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
215
        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
×
216
            next.newCall(method, callOptions)) {
×
217

218
          @Override
219
          public void start(Listener<RespT> responseListener, Metadata headers) {
220
            Span span =
×
221
                tracingPropagator.spanByServiceMethod(
×
222
                    String.format(OPERATIONFORMAT, method.getBareMethodName()));
×
223
            Scope scope = tracer.activateSpan(span);
×
224
            super.start(
×
225
                new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
226
                    responseListener) {
×
227
                  @Override
228
                  public void onClose(Status status, Metadata trailers) {
229
                    try {
230
                      super.onClose(status, trailers);
×
231
                    } finally {
232
                      span.finish();
×
233
                      scope.close();
×
234
                    }
235
                  }
×
236
                },
237
                headers);
238
          }
×
239

240
          @SuppressWarnings("unchecked")
241
          @Override
242
          public void sendMessage(ReqT message) {
243
            if (Objects.equals(method.getBareMethodName(), "StartWorkflowExecution")
×
244
                && message instanceof StartWorkflowExecutionRequest) {
245
              StartWorkflowExecutionRequest request = (StartWorkflowExecutionRequest) message;
×
246
              Header newHeader = addTracingHeaders(request.getHeader());
×
247

248
              // cast should not throw error as we are using the builder
249
              message = (ReqT) request.toBuilder().setHeader(newHeader).build();
×
250
            } else if (Objects.equals(method.getBareMethodName(), "StartWorkflowExecutionAsync")
×
251
                && message instanceof StartWorkflowExecutionAsyncRequest) {
252
              StartWorkflowExecutionAsyncRequest request =
×
253
                  (StartWorkflowExecutionAsyncRequest) message;
254
              Header newHeader = addTracingHeaders(request.getRequest().getHeader());
×
255

256
              // cast should not throw error as we are using the builder
257
              message =
×
258
                  (ReqT)
259
                      request
260
                          .toBuilder()
×
261
                          .setRequest(request.getRequest().toBuilder().setHeader(newHeader))
×
262
                          .build();
×
263
            } else if (Objects.equals(
×
264
                    method.getBareMethodName(), "SignalWithStartWorkflowExecution")
×
265
                && message instanceof SignalWithStartWorkflowExecutionRequest) {
266
              SignalWithStartWorkflowExecutionRequest request =
×
267
                  (SignalWithStartWorkflowExecutionRequest) message;
268
              Header newHeader = addTracingHeaders(request.getStartRequest().getHeader());
×
269

270
              // cast should not throw error as we are using the builder
271
              message =
×
272
                  (ReqT)
273
                      request
274
                          .toBuilder()
×
275
                          .setStartRequest(
×
276
                              request.getStartRequest().toBuilder().setHeader(newHeader))
×
277
                          .build();
×
278
            } else if (Objects.equals(
×
279
                    method.getBareMethodName(), "SignalWithStartWorkflowExecutionAsync")
×
280
                && message instanceof SignalWithStartWorkflowExecutionAsyncRequest) {
281
              SignalWithStartWorkflowExecutionAsyncRequest request =
×
282
                  (SignalWithStartWorkflowExecutionAsyncRequest) message;
283
              Header newHeader =
×
284
                  addTracingHeaders(request.getRequest().getStartRequest().getHeader());
×
285

286
              // cast should not throw error as we are using the builder
287
              message =
×
288
                  (ReqT)
289
                      request
290
                          .toBuilder()
×
291
                          .setRequest(
×
292
                              request
293
                                  .getRequest()
×
294
                                  .toBuilder()
×
295
                                  .setStartRequest(
×
296
                                      request
297
                                          .getRequest()
×
298
                                          .getStartRequest()
×
299
                                          .toBuilder()
×
300
                                          .setHeader(newHeader)))
×
301
                          .build();
×
302
            }
303
            super.sendMessage(message);
×
304
          }
×
305

306
          private Header addTracingHeaders(Header header) {
307
            Map<String, byte[]> headers = new HashMap<>();
×
308
            tracingPropagator.inject(headers);
×
309
            Header.Builder headerBuilder = header.toBuilder();
×
310
            headers.forEach(
×
311
                (k, v) ->
312
                    headerBuilder.putFields(
×
313
                        k, Payload.newBuilder().setData(ByteString.copyFrom(v)).build()));
×
314
            return headerBuilder.build();
×
315
          }
316
        };
317
      }
318
    };
319
  }
320

321
  private ClientInterceptor newTracingInterceptor() {
322
    return new ClientInterceptor() {
×
323

324
      @Override
325
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
326
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
327
        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
×
328
            next.newCall(method, callOptions)) {
×
329
          @Override
330
          public void sendMessage(ReqT message) {
331
            log.trace("Invoking " + method.getFullMethodName() + "with input: " + message);
×
332
            super.sendMessage(message);
×
333
          }
×
334

335
          @Override
336
          public void start(Listener<RespT> responseListener, Metadata headers) {
337
            Listener<RespT> listener =
×
338
                new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
339
                    responseListener) {
×
340
                  @Override
341
                  public void onMessage(RespT message) {
342
                    if (method == WorkerAPIGrpc.getPollForDecisionTaskMethod()) {
×
343
                      log.trace("Returned " + method.getFullMethodName());
×
344
                    } else {
345
                      log.trace(
×
346
                          "Returned " + method.getFullMethodName() + " with output: " + message);
×
347
                    }
348
                    super.onMessage(message);
×
349
                  }
×
350
                };
351
            super.start(listener, headers);
×
352
          }
×
353
        };
354
      }
355
    };
356
  }
357

358
  public DomainAPIGrpc.DomainAPIBlockingStub domainBlockingStub() {
359
    return domainBlockingStub;
×
360
  }
361

362
  public DomainAPIGrpc.DomainAPIFutureStub domainFutureStub() {
363
    return domainFutureStub;
×
364
  }
365

366
  @Override
367
  public ClientOptions getOptions() {
368
    return options;
×
369
  }
370

371
  @Override
372
  public VisibilityAPIBlockingStub visibilityBlockingStub() {
373
    return visibilityBlockingStub;
×
374
  }
375

376
  @Override
377
  public VisibilityAPIFutureStub visibilityFutureStub() {
378
    return visibilityFutureStub;
×
379
  }
380

381
  @Override
382
  public WorkerAPIBlockingStub workerBlockingStub() {
383
    return workerBlockingStub;
×
384
  }
385

386
  @Override
387
  public WorkerAPIFutureStub workerFutureStub() {
388
    return workerFutureStub;
×
389
  }
390

391
  @Override
392
  public WorkflowAPIBlockingStub workflowBlockingStub() {
393
    return workflowBlockingStub;
×
394
  }
395

396
  @Override
397
  public MetaAPIFutureStub metaFutureStub() {
398
    return metaFutureStub;
×
399
  }
400

401
  @Override
402
  public MetaAPIBlockingStub metaBlockingStub() {
403
    return metaBlockingStub;
×
404
  }
405

406
  @Override
407
  public WorkflowAPIFutureStub workflowFutureStub() {
408
    return workflowFutureStub;
×
409
  }
410

411
  @Override
412
  public void shutdown() {
413
    shutdownRequested.set(true);
×
414
    if (shutdownChannel) {
×
415
      channel.shutdown();
×
416
    }
417
  }
×
418

419
  @Override
420
  public void shutdownNow() {
421
    shutdownRequested.set(true);
×
422
    if (shutdownChannel) {
×
423
      channel.shutdownNow();
×
424
    }
425
  }
×
426

427
  @Override
428
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
429
    if (shutdownChannel) {
×
430
      return channel.awaitTermination(timeout, unit);
×
431
    }
432
    return true;
×
433
  }
434

435
  @Override
436
  public boolean isShutdown() {
437
    if (shutdownChannel) {
×
438
      return channel.isShutdown();
×
439
    }
440
    return shutdownRequested.get();
×
441
  }
442

443
  @Override
444
  public boolean isTerminated() {
445
    if (shutdownChannel) {
×
446
      return channel.isTerminated();
×
447
    }
448
    return shutdownRequested.get();
×
449
  }
450

451
  private static class GrpcDeadlineInterceptor implements ClientInterceptor {
452

453
    private final ClientOptions options;
454

455
    public GrpcDeadlineInterceptor(ClientOptions options) {
×
456
      this.options = options;
×
457
    }
×
458

459
    @Override
460
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
461
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
462
      Deadline deadline = callOptions.getDeadline();
×
463
      long duration;
464
      if (deadline == null) {
×
465
        duration = options.getRpcTimeoutMillis();
×
466
      } else {
467
        duration = deadline.timeRemaining(TimeUnit.MILLISECONDS);
×
468
      }
469
      if (method == WorkflowAPIGrpc.getGetWorkflowExecutionHistoryMethod()) {
×
470
        if (deadline == null) {
×
471
          duration = options.getRpcLongPollTimeoutMillis();
×
472
        } else {
473
          duration = deadline.timeRemaining(TimeUnit.MILLISECONDS);
×
474
          if (duration > options.getRpcLongPollTimeoutMillis()) {
×
475
            duration = options.getRpcLongPollTimeoutMillis();
×
476
          }
477
        }
478
      } else if (method == WorkerAPIGrpc.getPollForDecisionTaskMethod()
×
479
          || method == WorkerAPIGrpc.getPollForActivityTaskMethod()) {
×
480
        duration = options.getRpcLongPollTimeoutMillis();
×
481
      } else if (method == WorkflowAPIGrpc.getQueryWorkflowMethod()) {
×
482
        duration = options.getRpcQueryTimeoutMillis();
×
483
      }
484
      if (log.isTraceEnabled()) {
×
485
        String name = method.getFullMethodName();
×
486
        log.trace("TimeoutInterceptor method=" + name + ", timeoutMs=" + duration);
×
487
      }
488
      return next.newCall(method, callOptions.withDeadlineAfter(duration, TimeUnit.MILLISECONDS));
×
489
    }
490
  }
491
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc