• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2241

11 Apr 2024 07:03PM UTC coverage: 60.286% (+0.02%) from 60.263%
2241

push

buildkite

web-flow
Enable open tracing propagation in workflow lifecycles (#876)

How it works
Context Propagation in Cadence (Customer)

On start workflow API, trace span with context is written into workflow start event attributes, which is persisted in cadence server side.
On workflow-start in client, this span is referenced and activated on execute workflow.
On scheduling child workflows and activities (including local activities), the span is written into child workflow's workflow start event attributes and activity's schedule activity event attributes.
On processing activities/childworkflows, the persisted span is referenced and activated again.

Sample Spans

Notes: Poll + Respond apis spans are omitted here

{traceId:1, spanId:2, parentId:0, operationName:"cadence-RegisterDomain"}
{traceId:1, spanId:3, parentId:2, operationName:"Test Started"}
{traceId:1, spanId:18, parentId:3, operationName:"cadence-StartWorkflowExecution"}
{traceId:1, spanId:19, parentId:18, operationName:"cadence-GetWorkflowExecutionHistory"}
{traceId:1, spanId:21, parentId:18, operationName:"cadence-ExecuteWorkflow"}
{traceId:1, spanId:24, parentId:21, operationName:"cadence-ExecuteActivity"}
{traceId:1, spanId:25, parentId:24, operationName:"cadence-RespondActivityTaskCompleted"}
{traceId:1, spanId:31, parentId:21, operationName:"cadence-ExecuteWorkflow"}
{traceId:1, spanId:32, parentId:31, operationName:"cadence-ExecuteLocalActivity"}

What changed?

added an Propagator entity with tracing extract/inject logic
added trace activation logic in activity and workflow executors
added trace activation on service client (Tchannel + GRPC)
Why?

improve observability

How did you test it?

integration test

111 of 175 new or added lines in 13 files covered. (63.43%)

12 existing lines in 6 files now uncovered.

11455 of 19001 relevant lines covered (60.29%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.74
/src/main/java/com/uber/cadence/internal/tracing/TracingPropagator.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.tracing;
19

20
import com.uber.cadence.Header;
21
import com.uber.cadence.PollForActivityTaskResponse;
22
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
23
import com.uber.cadence.internal.replay.DecisionContext;
24
import com.uber.cadence.internal.replay.ExecuteLocalActivityParameters;
25
import com.uber.cadence.internal.worker.LocalActivityWorker.Task;
26
import io.opentracing.References;
27
import io.opentracing.Span;
28
import io.opentracing.SpanContext;
29
import io.opentracing.Tracer;
30
import io.opentracing.noop.NoopSpan;
31
import io.opentracing.propagation.*;
32
import io.opentracing.propagation.Format;
33
import java.nio.ByteBuffer;
34
import java.util.HashMap;
35
import java.util.Map;
36
import java.util.stream.Collectors;
37

38
public class TracingPropagator {
39
  // span names
40
  private static final String EXECUTE_WORKFLOW = "cadence-ExecuteWorkflow";
41
  private static final String EXECUTE_ACTIVITY = "cadence-ExecuteActivity";
42
  private static final String EXECUTE_LOCAL_ACTIVITY = "cadence-ExecuteLocalActivity";
43

44
  // span tags
45
  private static final String TAG_WORKFLOW_ID = "cadenceWorkflowID";
46
  private static final String TAG_WORKFLOW_TYPE = "cadenceWorkflowType";
47
  private static final String TAG_WORKFLOW_RUN_ID = "cadenceRunID";
48
  private static final String TAG_ACTIVITY_TYPE = "cadenceActivityType";
49

50
  private final Tracer tracer;
51

52
  public TracingPropagator(Tracer tracer) {
1✔
53
    this.tracer = tracer;
1✔
54
  }
1✔
55

56
  public Span activateSpanByServiceMethod(String serviceMethod) {
57
    Span span = tracer.buildSpan(serviceMethod).asChildOf(tracer.activeSpan()).start();
1✔
58
    tracer.activateSpan(span);
1✔
59
    return span;
1✔
60
  }
61

62
  public Span activateSpanForExecuteWorkflow(DecisionContext context) {
63
    WorkflowExecutionStartedEventAttributes attributes =
1✔
64
        context.getWorkflowExecutionStartedEventAttributes();
1✔
65
    SpanContext parent = extract(attributes.getHeader());
1✔
66

67
    Span span =
1✔
68
        tracer
69
            .buildSpan(EXECUTE_WORKFLOW)
1✔
70
            .addReference(
1✔
71
                References.FOLLOWS_FROM, parent != NoopSpan.INSTANCE.context() ? parent : null)
1✔
72
            .withTag(TAG_WORKFLOW_TYPE, context.getWorkflowType().getName())
1✔
73
            .withTag(TAG_WORKFLOW_ID, context.getWorkflowId())
1✔
74
            .withTag(TAG_WORKFLOW_RUN_ID, context.getRunId())
1✔
75
            .start();
1✔
76
    tracer.activateSpan(span);
1✔
77
    return span;
1✔
78
  }
79

80
  public Span activateSpanForExecuteActivity(PollForActivityTaskResponse task) {
81
    SpanContext parent = extract(task.getHeader());
1✔
82
    Span span =
1✔
83
        tracer
84
            .buildSpan(EXECUTE_ACTIVITY)
1✔
85
            .addReference(
1✔
86
                References.FOLLOWS_FROM, parent != NoopSpan.INSTANCE.context() ? parent : null)
1✔
87
            .withTag(
1✔
88
                TAG_WORKFLOW_TYPE,
89
                task.isSetWorkflowType() ? task.getWorkflowType().getName() : "null")
1✔
90
            .withTag(
1✔
91
                TAG_WORKFLOW_ID,
92
                task.isSetWorkflowExecution()
1✔
93
                    ? task.getWorkflowExecution().getWorkflowId()
1✔
NEW
94
                    : "null")
×
95
            .withTag(
1✔
96
                TAG_WORKFLOW_RUN_ID,
97
                task.isSetWorkflowExecution() ? task.getWorkflowExecution().getRunId() : "null")
1✔
98
            .withTag(
1✔
99
                TAG_ACTIVITY_TYPE,
100
                task.isSetActivityType() ? task.getActivityType().getName() : "null")
1✔
101
            .start();
1✔
102
    tracer.activateSpan(span);
1✔
103
    return span;
1✔
104
  }
105

106
  public Span activateSpanForExecuteLocalActivity(Task task) {
107
    ExecuteLocalActivityParameters params = task.getExecuteLocalActivityParameters();
1✔
108

109
    // retrieve spancontext from params
110
    SpanContext parent = extract(params.getContext());
1✔
111

112
    Span span =
1✔
113
        tracer
114
            .buildSpan(EXECUTE_LOCAL_ACTIVITY)
1✔
115
            .addReference(References.FOLLOWS_FROM, parent)
1✔
116
            .withTag(TAG_WORKFLOW_ID, params.getWorkflowExecution().getWorkflowId())
1✔
117
            .withTag(TAG_WORKFLOW_RUN_ID, params.getWorkflowExecution().getRunId())
1✔
118
            .withTag(TAG_ACTIVITY_TYPE, params.getActivityType().getName())
1✔
119
            .start();
1✔
120
    tracer.activateSpan(span);
1✔
121
    return span;
1✔
122
  }
123

124
  public void inject(Map<String, byte[]> headers) {
125
    Map<String, String> context = getCurrentContext();
1✔
126
    context.forEach(
1✔
127
        (k, v) -> {
NEW
128
          headers.put(k, v.getBytes());
×
NEW
129
        });
×
130
  }
1✔
131

132
  public void inject(Header header) {
NEW
133
    Map<String, String> context = getCurrentContext();
×
NEW
134
    context.forEach(
×
135
        (k, v) -> {
NEW
136
          header.putToFields(k, ByteBuffer.wrap(v.getBytes()));
×
NEW
137
        });
×
NEW
138
  }
×
139

140
  private Map<String, String> getCurrentContext() {
141
    Map<String, String> context = new HashMap<>();
1✔
142
    if (tracer.activeSpan() != null) {
1✔
143
      tracer.inject(
1✔
144
          tracer.activeSpan().context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(context));
1✔
145
    }
146
    return context;
1✔
147
  }
148

149
  private SpanContext extract(Map<String, byte[]> headers) {
150
    if (headers == null) return NoopSpan.INSTANCE.context();
1✔
151
    return tracer.extract(
1✔
152
        Format.Builtin.TEXT_MAP,
153
        new TextMapAdapter(
154
            headers
155
                .entrySet()
1✔
156
                .stream()
1✔
157
                .collect(Collectors.toMap(Map.Entry::getKey, e -> new String(e.getValue())))));
1✔
158
  }
159

160
  private SpanContext extract(Header header) {
161
    if (header == null || header.getFields() == null) return NoopSpan.INSTANCE.context();
1✔
162
    return tracer.extract(
1✔
163
        Format.Builtin.TEXT_MAP,
164
        new TextMapAdapter(
165
            header
166
                .getFields()
1✔
167
                .entrySet()
1✔
168
                .stream()
1✔
169
                .collect(
1✔
170
                    Collectors.toMap(
1✔
171
                        Map.Entry::getKey,
172
                        e -> {
173
                          byte[] bytes = new byte[e.getValue().remaining()];
1✔
174
                          e.getValue().get(bytes);
1✔
175
                          return new String(bytes);
1✔
176
                        }))));
177
  }
178
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc