• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2242

11 Apr 2024 07:03PM UTC coverage: 60.244% (-0.02%) from 60.263%
2242

push

buildkite

web-flow
Enable open tracing propagation in workflow lifecycles (#876)

How it works
Context Propagation in Cadence (Customer)

On start workflow API, trace span with context is written into workflow start event attributes, which is persisted in cadence server side.
On workflow-start in client, this span is referenced and activated on execute workflow.
On scheduling child workflows and activities (including local activities), the span is written into child workflow's workflow start event attributes and activity's schedule activity event attributes.
On processing activities/childworkflows, the persisted span is referenced and activated again.

Sample Spans

Notes: Poll + Respond apis spans are omitted here

{traceId:1, spanId:2, parentId:0, operationName:"cadence-RegisterDomain"}
{traceId:1, spanId:3, parentId:2, operationName:"Test Started"}
{traceId:1, spanId:18, parentId:3, operationName:"cadence-StartWorkflowExecution"}
{traceId:1, spanId:19, parentId:18, operationName:"cadence-GetWorkflowExecutionHistory"}
{traceId:1, spanId:21, parentId:18, operationName:"cadence-ExecuteWorkflow"}
{traceId:1, spanId:24, parentId:21, operationName:"cadence-ExecuteActivity"}
{traceId:1, spanId:25, parentId:24, operationName:"cadence-RespondActivityTaskCompleted"}
{traceId:1, spanId:31, parentId:21, operationName:"cadence-ExecuteWorkflow"}
{traceId:1, spanId:32, parentId:31, operationName:"cadence-ExecuteLocalActivity"}

What changed?

added an Propagator entity with tracing extract/inject logic
added trace activation logic in activity and workflow executors
added trace activation on service client (Tchannel + GRPC)
Why?

improve observability

How did you test it?

integration test

111 of 175 new or added lines in 13 files covered. (63.43%)

11 existing lines in 5 files now uncovered.

11447 of 19001 relevant lines covered (60.24%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java
1
/*
2
 *  Modifications Copyright (c) 2017-2021 Uber Technologies Inc.
3
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4
 *
5
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
6
 *  use this file except in compliance with the License. A copy of the License is
7
 *  located at
8
 *
9
 *  http://aws.amazon.com/apache2.0
10
 *
11
 *  or in the "license" file accompanying this file. This file is distributed on
12
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13
 *  express or implied. See the License for the specific language governing
14
 *  permissions and limitations under the License.
15
 */
16
package com.uber.cadence.internal.compatibility.proto.serviceclient;
17

18
import com.google.common.base.Strings;
19
import com.google.protobuf.ByteString;
20
import com.uber.cadence.api.v1.*;
21
import com.uber.cadence.api.v1.DomainAPIGrpc;
22
import com.uber.cadence.api.v1.MetaAPIGrpc;
23
import com.uber.cadence.api.v1.MetaAPIGrpc.MetaAPIBlockingStub;
24
import com.uber.cadence.api.v1.MetaAPIGrpc.MetaAPIFutureStub;
25
import com.uber.cadence.api.v1.VisibilityAPIGrpc;
26
import com.uber.cadence.api.v1.VisibilityAPIGrpc.VisibilityAPIBlockingStub;
27
import com.uber.cadence.api.v1.VisibilityAPIGrpc.VisibilityAPIFutureStub;
28
import com.uber.cadence.api.v1.WorkerAPIGrpc;
29
import com.uber.cadence.api.v1.WorkerAPIGrpc.WorkerAPIBlockingStub;
30
import com.uber.cadence.api.v1.WorkerAPIGrpc.WorkerAPIFutureStub;
31
import com.uber.cadence.api.v1.WorkflowAPIGrpc;
32
import com.uber.cadence.api.v1.WorkflowAPIGrpc.WorkflowAPIBlockingStub;
33
import com.uber.cadence.api.v1.WorkflowAPIGrpc.WorkflowAPIFutureStub;
34
import com.uber.cadence.internal.Version;
35
import com.uber.cadence.internal.tracing.TracingPropagator;
36
import com.uber.cadence.serviceclient.ClientOptions;
37
import io.grpc.CallOptions;
38
import io.grpc.Channel;
39
import io.grpc.ClientCall;
40
import io.grpc.ClientInterceptor;
41
import io.grpc.ClientInterceptors;
42
import io.grpc.Deadline;
43
import io.grpc.ForwardingClientCall;
44
import io.grpc.ForwardingClientCallListener;
45
import io.grpc.ManagedChannel;
46
import io.grpc.ManagedChannelBuilder;
47
import io.grpc.Metadata;
48
import io.grpc.MethodDescriptor;
49
import io.grpc.stub.MetadataUtils;
50
import io.opentelemetry.api.GlobalOpenTelemetry;
51
import io.opentelemetry.context.Context;
52
import io.opentelemetry.context.propagation.TextMapPropagator;
53
import io.opentelemetry.context.propagation.TextMapSetter;
54
import io.opentracing.Span;
55
import io.opentracing.Tracer;
56
import java.util.HashMap;
57
import java.util.Map;
58
import java.util.Objects;
59
import java.util.concurrent.TimeUnit;
60
import java.util.concurrent.atomic.AtomicBoolean;
61
import org.slf4j.Logger;
62
import org.slf4j.LoggerFactory;
63

64
final class GrpcServiceStubs implements IGrpcServiceStubs {
65

66
  private static final Logger log = LoggerFactory.getLogger(GrpcServiceStubs.class);
×
67
  private static final Metadata.Key<String> LIBRARY_VERSION_HEADER_KEY =
×
68
      Metadata.Key.of("cadence-client-library-version", Metadata.ASCII_STRING_MARSHALLER);
×
69
  private static final Metadata.Key<String> FEATURE_VERSION_HEADER_KEY =
×
70
      Metadata.Key.of("cadence-client-feature-version", Metadata.ASCII_STRING_MARSHALLER);
×
71
  private static final Metadata.Key<String> CLIENT_IMPL_HEADER_KEY =
×
72
      Metadata.Key.of("cadence-client-name", Metadata.ASCII_STRING_MARSHALLER);
×
73
  private static final Metadata.Key<String> ISOLATION_GROUP_HEADER_KEY =
×
74
      Metadata.Key.of("cadence-client-isolation-group", Metadata.ASCII_STRING_MARSHALLER);
×
75
  private static final Metadata.Key<String> RPC_SERVICE_NAME_HEADER_KEY =
×
76
      Metadata.Key.of("rpc-service", Metadata.ASCII_STRING_MARSHALLER);
×
77
  private static final Metadata.Key<String> RPC_CALLER_NAME_HEADER_KEY =
×
78
      Metadata.Key.of("rpc-caller", Metadata.ASCII_STRING_MARSHALLER);
×
79
  private static final Metadata.Key<String> RPC_ENCODING_HEADER_KEY =
×
80
      Metadata.Key.of("rpc-encoding", Metadata.ASCII_STRING_MARSHALLER);
×
81

82
  private static final String CLIENT_IMPL_HEADER_VALUE = "uber-java";
83

84
  private final ManagedChannel channel;
85
  private final boolean shutdownChannel;
86
  private final AtomicBoolean shutdownRequested = new AtomicBoolean();
×
87
  private final DomainAPIGrpc.DomainAPIBlockingStub domainBlockingStub;
88
  private final DomainAPIGrpc.DomainAPIFutureStub domainFutureStub;
89
  private final VisibilityAPIGrpc.VisibilityAPIBlockingStub visibilityBlockingStub;
90
  private final VisibilityAPIGrpc.VisibilityAPIFutureStub visibilityFutureStub;
91
  private final WorkerAPIGrpc.WorkerAPIBlockingStub workerBlockingStub;
92
  private final WorkerAPIGrpc.WorkerAPIFutureStub workerFutureStub;
93
  private final WorkflowAPIGrpc.WorkflowAPIBlockingStub workflowBlockingStub;
94
  private final WorkflowAPIGrpc.WorkflowAPIFutureStub workflowFutureStub;
95
  private final MetaAPIGrpc.MetaAPIBlockingStub metaBlockingStub;
96
  private final MetaAPIGrpc.MetaAPIFutureStub metaFutureStub;
97

98
  GrpcServiceStubs(ClientOptions options) {
×
99
    if (options.getGRPCChannel() != null) {
×
100
      this.channel = options.getGRPCChannel();
×
101
      shutdownChannel = false;
×
102
    } else {
103
      this.channel =
×
104
          ManagedChannelBuilder.forAddress(options.getHost(), options.getPort())
×
105
              .defaultLoadBalancingPolicy("round_robin")
×
106
              .usePlaintext()
×
107
              .build();
×
108
      shutdownChannel = true;
×
109
    }
110
    ClientInterceptor deadlineInterceptor = new GrpcDeadlineInterceptor(options);
×
111
    ClientInterceptor tracingInterceptor = newTracingInterceptor();
×
112
    Metadata headers = new Metadata();
×
113
    headers.put(LIBRARY_VERSION_HEADER_KEY, Version.LIBRARY_VERSION);
×
114
    headers.put(FEATURE_VERSION_HEADER_KEY, Version.FEATURE_VERSION);
×
115
    headers.put(CLIENT_IMPL_HEADER_KEY, CLIENT_IMPL_HEADER_VALUE);
×
116
    headers.put(RPC_SERVICE_NAME_HEADER_KEY, options.getServiceName());
×
117
    headers.put(RPC_CALLER_NAME_HEADER_KEY, options.getClientAppName());
×
118
    headers.put(RPC_ENCODING_HEADER_KEY, "proto");
×
119
    if (!Strings.isNullOrEmpty(options.getIsolationGroup())) {
×
120
      headers.put(ISOLATION_GROUP_HEADER_KEY, options.getIsolationGroup());
×
121
    }
122
    Channel interceptedChannel =
×
123
        ClientInterceptors.intercept(
×
124
            channel,
125
            deadlineInterceptor,
126
            MetadataUtils.newAttachHeadersInterceptor(headers),
×
NEW
127
            newOpenTelemetryInterceptor(),
×
NEW
128
            newOpenTracingInterceptor(options.getTracer()));
×
129
    if (log.isTraceEnabled()) {
×
130
      interceptedChannel = ClientInterceptors.intercept(interceptedChannel, tracingInterceptor);
×
131
    }
132
    this.domainBlockingStub = DomainAPIGrpc.newBlockingStub(interceptedChannel);
×
133
    this.domainFutureStub = DomainAPIGrpc.newFutureStub(interceptedChannel);
×
134
    this.visibilityBlockingStub = VisibilityAPIGrpc.newBlockingStub(interceptedChannel);
×
135
    this.visibilityFutureStub = VisibilityAPIGrpc.newFutureStub(interceptedChannel);
×
136
    this.workerBlockingStub = WorkerAPIGrpc.newBlockingStub(interceptedChannel);
×
137
    this.workerFutureStub = WorkerAPIGrpc.newFutureStub(interceptedChannel);
×
138
    this.workflowBlockingStub = WorkflowAPIGrpc.newBlockingStub(interceptedChannel);
×
139
    this.workflowFutureStub = WorkflowAPIGrpc.newFutureStub(interceptedChannel);
×
140
    this.metaBlockingStub = MetaAPIGrpc.newBlockingStub(interceptedChannel);
×
141
    this.metaFutureStub = MetaAPIGrpc.newFutureStub(interceptedChannel);
×
142
  }
×
143

144
  private ClientInterceptor newOpenTelemetryInterceptor() {
145
    return new ClientInterceptor() {
×
146
      @Override
147
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
148
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
149
        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
×
150
            next.newCall(method, callOptions)) {
×
151

152
          @Override
153
          public void start(Listener<RespT> responseListener, Metadata headers) {
154
            TextMapPropagator propagator =
155
                GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
×
156

157
            final TextMapSetter<Metadata> setter =
×
158
                (carrier, key, value) -> {
159
                  if (carrier != null) {
×
160
                    carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value);
×
161
                  }
162
                };
×
163
            if (propagator != null) {
×
164
              propagator.inject(Context.current(), headers, setter);
×
165
            }
166

167
            super.start(responseListener, headers);
×
168
          }
×
169
        };
170
      }
171
    };
172
  }
173

174
  private ClientInterceptor newOpenTracingInterceptor(Tracer tracer) {
NEW
175
    return new ClientInterceptor() {
×
NEW
176
      private final TracingPropagator tracingPropagator = new TracingPropagator(tracer);
×
NEW
177
      private final String OPERATIONFORMAT = "cadence-%s";
×
178

179
      @Override
180
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
181
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
NEW
182
        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
×
NEW
183
            next.newCall(method, callOptions)) {
×
184

185
          @Override
186
          public void start(Listener<RespT> responseListener, Metadata headers) {
NEW
187
            Span span =
×
NEW
188
                tracingPropagator.activateSpanByServiceMethod(
×
NEW
189
                    String.format(OPERATIONFORMAT, method.getBareMethodName()));
×
NEW
190
            super.start(responseListener, headers);
×
NEW
191
            span.finish();
×
NEW
192
          }
×
193

194
          @Override
195
          public void sendMessage(ReqT message) {
NEW
196
            if (Objects.equals(method.getBareMethodName(), "StartWorkflowExecution")
×
197
                && message instanceof StartWorkflowExecutionRequest) {
NEW
198
              StartWorkflowExecutionRequest request = (StartWorkflowExecutionRequest) message;
×
NEW
199
              Map<String, byte[]> headers = new HashMap<>();
×
NEW
200
              tracingPropagator.inject(headers);
×
NEW
201
              Header.Builder headerBuilder = request.getHeader().toBuilder();
×
NEW
202
              headers.forEach(
×
203
                  (k, v) -> {
NEW
204
                    headerBuilder.putFields(
×
NEW
205
                        k, Payload.newBuilder().setData(ByteString.copyFrom(v)).build());
×
NEW
206
                  });
×
207

208
              // cast should not throw error as we are using the builder
NEW
209
              message =
×
210
                  (ReqT)
211
                      ((StartWorkflowExecutionRequest) message)
NEW
212
                          .toBuilder()
×
NEW
213
                          .setHeader(headerBuilder.build())
×
NEW
214
                          .build();
×
215
            }
NEW
216
            if (Objects.equals(method.getBareMethodName(), "SignalWithStartWorkflowExecution")
×
217
                && message instanceof SignalWithStartWorkflowExecutionRequest) {
NEW
218
              SignalWithStartWorkflowExecutionRequest request =
×
219
                  (SignalWithStartWorkflowExecutionRequest) message;
NEW
220
              Map<String, byte[]> headers = new HashMap<>();
×
NEW
221
              tracingPropagator.inject(headers);
×
NEW
222
              Header.Builder headerBuilder = request.getStartRequest().getHeader().toBuilder();
×
NEW
223
              headers.forEach(
×
224
                  (k, v) -> {
NEW
225
                    headerBuilder.putFields(
×
NEW
226
                        k, Payload.newBuilder().setData(ByteString.copyFrom(v)).build());
×
NEW
227
                  });
×
228

229
              // cast should not throw error as we are using the builder
NEW
230
              message =
×
231
                  (ReqT)
232
                      ((SignalWithStartWorkflowExecutionRequest) message)
NEW
233
                          .toBuilder()
×
NEW
234
                          .setStartRequest(
×
235
                              request
NEW
236
                                  .getStartRequest()
×
NEW
237
                                  .toBuilder()
×
NEW
238
                                  .setHeader(headerBuilder.build()))
×
NEW
239
                          .build();
×
240
            }
NEW
241
            super.sendMessage(message);
×
NEW
242
          }
×
243
        };
244
      }
245
    };
246
  }
247

248
  private ClientInterceptor newTracingInterceptor() {
249
    return new ClientInterceptor() {
×
250

251
      @Override
252
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
253
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
254
        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
×
255
            next.newCall(method, callOptions)) {
×
256
          @Override
257
          public void sendMessage(ReqT message) {
258
            log.trace("Invoking " + method.getFullMethodName() + "with input: " + message);
×
259
            super.sendMessage(message);
×
260
          }
×
261

262
          @Override
263
          public void start(Listener<RespT> responseListener, Metadata headers) {
264
            Listener<RespT> listener =
×
265
                new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
266
                    responseListener) {
×
267
                  @Override
268
                  public void onMessage(RespT message) {
269
                    if (method == WorkerAPIGrpc.getPollForDecisionTaskMethod()) {
×
270
                      log.trace("Returned " + method.getFullMethodName());
×
271
                    } else {
272
                      log.trace(
×
273
                          "Returned " + method.getFullMethodName() + " with output: " + message);
×
274
                    }
275
                    super.onMessage(message);
×
276
                  }
×
277
                };
278
            super.start(listener, headers);
×
279
          }
×
280
        };
281
      }
282
    };
283
  }
284

285
  public DomainAPIGrpc.DomainAPIBlockingStub domainBlockingStub() {
286
    return domainBlockingStub;
×
287
  }
288

289
  public DomainAPIGrpc.DomainAPIFutureStub domainFutureStub() {
290
    return domainFutureStub;
×
291
  }
292

293
  @Override
294
  public VisibilityAPIBlockingStub visibilityBlockingStub() {
295
    return visibilityBlockingStub;
×
296
  }
297

298
  @Override
299
  public VisibilityAPIFutureStub visibilityFutureStub() {
300
    return visibilityFutureStub;
×
301
  }
302

303
  @Override
304
  public WorkerAPIBlockingStub workerBlockingStub() {
305
    return workerBlockingStub;
×
306
  }
307

308
  @Override
309
  public WorkerAPIFutureStub workerFutureStub() {
310
    return workerFutureStub;
×
311
  }
312

313
  @Override
314
  public WorkflowAPIBlockingStub workflowBlockingStub() {
315
    return workflowBlockingStub;
×
316
  }
317

318
  @Override
319
  public MetaAPIFutureStub metaFutureStub() {
320
    return metaFutureStub;
×
321
  }
322

323
  @Override
324
  public MetaAPIBlockingStub metaBlockingStub() {
325
    return metaBlockingStub;
×
326
  }
327

328
  @Override
329
  public WorkflowAPIFutureStub workflowFutureStub() {
330
    return workflowFutureStub;
×
331
  }
332

333
  @Override
334
  public void shutdown() {
335
    shutdownRequested.set(true);
×
336
    if (shutdownChannel) {
×
337
      channel.shutdown();
×
338
    }
339
  }
×
340

341
  @Override
342
  public void shutdownNow() {
343
    shutdownRequested.set(true);
×
344
    if (shutdownChannel) {
×
345
      channel.shutdownNow();
×
346
    }
347
  }
×
348

349
  @Override
350
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
351
    if (shutdownChannel) {
×
352
      return channel.awaitTermination(timeout, unit);
×
353
    }
354
    return true;
×
355
  }
356

357
  @Override
358
  public boolean isShutdown() {
359
    if (shutdownChannel) {
×
360
      return channel.isShutdown();
×
361
    }
362
    return shutdownRequested.get();
×
363
  }
364

365
  @Override
366
  public boolean isTerminated() {
367
    if (shutdownChannel) {
×
368
      return channel.isTerminated();
×
369
    }
370
    return shutdownRequested.get();
×
371
  }
372

373
  private static class GrpcDeadlineInterceptor implements ClientInterceptor {
374

375
    private final ClientOptions options;
376

377
    public GrpcDeadlineInterceptor(ClientOptions options) {
×
378
      this.options = options;
×
379
    }
×
380

381
    @Override
382
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
383
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
384
      Deadline deadline = callOptions.getDeadline();
×
385
      long duration;
386
      if (deadline == null) {
×
387
        duration = options.getRpcTimeoutMillis();
×
388
      } else {
389
        duration = deadline.timeRemaining(TimeUnit.MILLISECONDS);
×
390
      }
391
      if (method == WorkflowAPIGrpc.getGetWorkflowExecutionHistoryMethod()) {
×
392
        if (deadline == null) {
×
393
          duration = options.getRpcLongPollTimeoutMillis();
×
394
        } else {
395
          duration = deadline.timeRemaining(TimeUnit.MILLISECONDS);
×
396
          if (duration > options.getRpcLongPollTimeoutMillis()) {
×
397
            duration = options.getRpcLongPollTimeoutMillis();
×
398
          }
399
        }
400
      } else if (method == WorkerAPIGrpc.getPollForDecisionTaskMethod()
×
401
          || method == WorkerAPIGrpc.getPollForActivityTaskMethod()) {
×
402
        duration = options.getRpcLongPollTimeoutMillis();
×
403
      } else if (method == WorkflowAPIGrpc.getQueryWorkflowMethod()) {
×
404
        duration = options.getRpcQueryTimeoutMillis();
×
405
      }
406
      if (log.isTraceEnabled()) {
×
407
        String name = method.getFullMethodName();
×
408
        log.trace("TimeoutInterceptor method=" + name + ", timeoutMs=" + duration);
×
409
      }
410
      return next.newCall(method, callOptions.withDeadlineAfter(duration, TimeUnit.MILLISECONDS));
×
411
    }
412
  }
413
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc