• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2242

11 Apr 2024 07:03PM UTC coverage: 60.244% (-0.02%) from 60.263%
2242

push

buildkite

web-flow
Enable open tracing propagation in workflow lifecycles (#876)

How it works
Context Propagation in Cadence (Customer)

On start workflow API, trace span with context is written into workflow start event attributes, which is persisted in cadence server side.
On workflow-start in client, this span is referenced and activated on execute workflow.
On scheduling child workflows and activities (including local activities), the span is written into child workflow's workflow start event attributes and activity's schedule activity event attributes.
On processing activities/childworkflows, the persisted span is referenced and activated again.

Sample Spans

Notes: Poll + Respond apis spans are omitted here

{traceId:1, spanId:2, parentId:0, operationName:"cadence-RegisterDomain"}
{traceId:1, spanId:3, parentId:2, operationName:"Test Started"}
{traceId:1, spanId:18, parentId:3, operationName:"cadence-StartWorkflowExecution"}
{traceId:1, spanId:19, parentId:18, operationName:"cadence-GetWorkflowExecutionHistory"}
{traceId:1, spanId:21, parentId:18, operationName:"cadence-ExecuteWorkflow"}
{traceId:1, spanId:24, parentId:21, operationName:"cadence-ExecuteActivity"}
{traceId:1, spanId:25, parentId:24, operationName:"cadence-RespondActivityTaskCompleted"}
{traceId:1, spanId:31, parentId:21, operationName:"cadence-ExecuteWorkflow"}
{traceId:1, spanId:32, parentId:31, operationName:"cadence-ExecuteLocalActivity"}

What changed?

added an Propagator entity with tracing extract/inject logic
added trace activation logic in activity and workflow executors
added trace activation on service client (Tchannel + GRPC)
Why?

improve observability

How did you test it?

integration test

111 of 175 new or added lines in 13 files covered. (63.43%)

11 existing lines in 5 files now uncovered.

11447 of 19001 relevant lines covered (60.24%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.34
/src/main/java/com/uber/cadence/worker/WorkerOptions.java
1
/*
2
 *  Modifications Copyright (c) 2017-2020 Uber Technologies Inc.
3
 *  Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
4
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.worker;
19

20
import com.uber.cadence.internal.worker.PollerOptions;
21
import com.uber.cadence.workflow.WorkflowInterceptor;
22
import io.opentracing.Tracer;
23
import io.opentracing.noop.NoopTracerFactory;
24
import java.util.Objects;
25
import java.util.function.Function;
26

27
public final class WorkerOptions {
28
  public static Builder newBuilder() {
29
    return new Builder();
1✔
30
  }
31

32
  public static Builder newBuilder(WorkerOptions options) {
33
    return new Builder(options);
×
34
  }
35

36
  public static WorkerOptions defaultInstance() {
37
    return DEFAULT_INSTANCE;
1✔
38
  }
39

40
  private static final WorkerOptions DEFAULT_INSTANCE;
41

42
  static {
43
    DEFAULT_INSTANCE = WorkerOptions.newBuilder().build();
1✔
44
  }
1✔
45

46
  public static final class Builder {
47

48
    private double workerActivitiesPerSecond;
49
    private int maxConcurrentActivityExecutionSize = 100;
1✔
50
    private int maxConcurrentWorkflowExecutionSize = 50;
1✔
51
    private int maxConcurrentLocalActivityExecutionSize = 100;
1✔
52
    private double taskListActivitiesPerSecond;
53
    private PollerOptions activityPollerOptions;
54
    private PollerOptions workflowPollerOptions;
55
    private Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory = (n) -> n;
1✔
56
    // by default NoopTracer
57
    private Tracer tracer = NoopTracerFactory.create();
1✔
58

59
    private Builder() {}
1✔
60

61
    private Builder(WorkerOptions options) {
×
62
      this.workerActivitiesPerSecond = options.workerActivitiesPerSecond;
×
63
      this.maxConcurrentActivityExecutionSize = options.maxConcurrentActivityExecutionSize;
×
64
      this.maxConcurrentWorkflowExecutionSize = options.maxConcurrentWorkflowExecutionSize;
×
65
      this.maxConcurrentLocalActivityExecutionSize =
×
66
          options.maxConcurrentLocalActivityExecutionSize;
×
67
      this.taskListActivitiesPerSecond = options.taskListActivitiesPerSecond;
×
68
      this.activityPollerOptions = options.activityPollerOptions;
×
69
      this.workflowPollerOptions = options.workflowPollerOptions;
×
70
      this.interceptorFactory = options.interceptorFactory;
×
NEW
71
      this.tracer = options.tracer;
×
UNCOV
72
    }
×
73

74
    /** Maximum number of activities started per second. Default is 0 which means unlimited. */
75
    public Builder setWorkerActivitiesPerSecond(double workerActivitiesPerSecond) {
76
      if (workerActivitiesPerSecond <= 0) {
×
77
        throw new IllegalArgumentException("Negative or zero: " + workerActivitiesPerSecond);
×
78
      }
79
      this.workerActivitiesPerSecond = workerActivitiesPerSecond;
×
80
      return this;
×
81
    }
82

83
    /** Maximum number of parallely executed activities. */
84
    public Builder setMaxConcurrentActivityExecutionSize(int maxConcurrentActivityExecutionSize) {
85
      if (maxConcurrentActivityExecutionSize <= 0) {
1✔
86
        throw new IllegalArgumentException(
×
87
            "Negative or zero: " + maxConcurrentActivityExecutionSize);
88
      }
89
      this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize;
1✔
90
      return this;
1✔
91
    }
92

93
    /** Maximum number of parallely executed decision tasks. */
94
    public Builder setMaxConcurrentWorkflowExecutionSize(int maxConcurrentWorkflowExecutionSize) {
95
      if (maxConcurrentWorkflowExecutionSize <= 0) {
1✔
96
        throw new IllegalArgumentException(
×
97
            "Negative or zero: " + maxConcurrentWorkflowExecutionSize);
98
      }
99
      this.maxConcurrentWorkflowExecutionSize = maxConcurrentWorkflowExecutionSize;
1✔
100
      return this;
1✔
101
    }
102

103
    /** Maximum number of parallely executed local activities. */
104
    public Builder setMaxConcurrentLocalActivityExecutionSize(
105
        int maxConcurrentLocalActivityExecutionSize) {
106
      if (maxConcurrentLocalActivityExecutionSize <= 0) {
×
107
        throw new IllegalArgumentException(
×
108
            "Negative or zero: " + maxConcurrentLocalActivityExecutionSize);
109
      }
110
      this.maxConcurrentLocalActivityExecutionSize = maxConcurrentLocalActivityExecutionSize;
×
111
      return this;
×
112
    }
113

114
    public Builder setActivityPollerOptions(PollerOptions activityPollerOptions) {
115
      this.activityPollerOptions = Objects.requireNonNull(activityPollerOptions);
1✔
116
      return this;
1✔
117
    }
118

119
    public Builder setWorkflowPollerOptions(PollerOptions workflowPollerOptions) {
120
      this.workflowPollerOptions = Objects.requireNonNull(workflowPollerOptions);
×
121
      return this;
×
122
    }
123

124
    public Builder setInterceptorFactory(
125
        Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory) {
126
      this.interceptorFactory = Objects.requireNonNull(interceptorFactory);
1✔
127
      return this;
1✔
128
    }
129

130
    /**
131
     * Optional: Sets the rate limiting on number of activities that can be executed per second.
132
     * This is managed by the server and controls activities per second for your entire tasklist.
133
     * Notice that the number is represented in double, so that you can set it to less than 1 if
134
     * needed. For example, set the number to 0.1 means you want your activity to be executed once
135
     * every 10 seconds. This can be used to protect down stream services from flooding. The zero
136
     * value means there's no limit.
137
     */
138
    public Builder setTaskListActivitiesPerSecond(double taskListActivitiesPerSecond) {
139
      this.taskListActivitiesPerSecond = taskListActivitiesPerSecond;
×
140
      return this;
×
141
    }
142

143
    /**
144
     * Optional: Sets the tracer to use for tracing. Default is NoopTracer
145
     *
146
     * @param tracer
147
     * @return
148
     */
149
    public Builder setTracer(Tracer tracer) {
NEW
150
      this.tracer = tracer;
×
NEW
151
      return this;
×
152
    }
153

154
    public WorkerOptions build() {
155
      return new WorkerOptions(
1✔
156
          workerActivitiesPerSecond,
157
          maxConcurrentActivityExecutionSize,
158
          maxConcurrentWorkflowExecutionSize,
159
          maxConcurrentLocalActivityExecutionSize,
160
          taskListActivitiesPerSecond,
161
          activityPollerOptions,
162
          workflowPollerOptions,
163
          interceptorFactory,
164
          tracer);
165
    }
166
  }
167

168
  private final double workerActivitiesPerSecond;
169
  private final int maxConcurrentActivityExecutionSize;
170
  private final int maxConcurrentWorkflowExecutionSize;
171
  private final int maxConcurrentLocalActivityExecutionSize;
172
  private final double taskListActivitiesPerSecond;
173
  private final PollerOptions activityPollerOptions;
174
  private final PollerOptions workflowPollerOptions;
175
  private final Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory;
176
  private final Tracer tracer;
177

178
  private WorkerOptions(
179
      double workerActivitiesPerSecond,
180
      int maxConcurrentActivityExecutionSize,
181
      int maxConcurrentWorkflowExecutionSize,
182
      int maxConcurrentLocalActivityExecutionSize,
183
      double taskListActivitiesPerSecond,
184
      PollerOptions activityPollerOptions,
185
      PollerOptions workflowPollerOptions,
186
      Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory,
187
      Tracer tracer) {
1✔
188
    this.workerActivitiesPerSecond = workerActivitiesPerSecond;
1✔
189
    this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize;
1✔
190
    this.maxConcurrentWorkflowExecutionSize = maxConcurrentWorkflowExecutionSize;
1✔
191
    this.maxConcurrentLocalActivityExecutionSize = maxConcurrentLocalActivityExecutionSize;
1✔
192
    this.taskListActivitiesPerSecond = taskListActivitiesPerSecond;
1✔
193
    this.activityPollerOptions = activityPollerOptions;
1✔
194
    this.workflowPollerOptions = workflowPollerOptions;
1✔
195
    this.interceptorFactory = interceptorFactory;
1✔
196
    this.tracer = tracer;
1✔
197
  }
1✔
198

199
  public double getWorkerActivitiesPerSecond() {
200
    return workerActivitiesPerSecond;
×
201
  }
202

203
  public int getMaxConcurrentActivityExecutionSize() {
204
    return maxConcurrentActivityExecutionSize;
1✔
205
  }
206

207
  public int getMaxConcurrentWorkflowExecutionSize() {
208
    return maxConcurrentWorkflowExecutionSize;
1✔
209
  }
210

211
  public int getMaxConcurrentLocalActivityExecutionSize() {
212
    return maxConcurrentLocalActivityExecutionSize;
1✔
213
  }
214

215
  public double getTaskListActivitiesPerSecond() {
216
    return taskListActivitiesPerSecond;
1✔
217
  }
218

219
  public PollerOptions getActivityPollerOptions() {
220
    return activityPollerOptions;
1✔
221
  }
222

223
  public PollerOptions getWorkflowPollerOptions() {
224
    return workflowPollerOptions;
1✔
225
  }
226

227
  public Function<WorkflowInterceptor, WorkflowInterceptor> getInterceptorFactory() {
228
    return interceptorFactory;
1✔
229
  }
230

231
  public Tracer getTracer() {
232
    return tracer;
1✔
233
  }
234

235
  @Override
236
  public String toString() {
237
    return "WorkerOptions{"
×
238
        + "workerActivitiesPerSecond="
239
        + workerActivitiesPerSecond
240
        + ", maxConcurrentActivityExecutionSize="
241
        + maxConcurrentActivityExecutionSize
242
        + ", maxConcurrentWorkflowExecutionSize="
243
        + maxConcurrentWorkflowExecutionSize
244
        + ", maxConcurrentLocalActivityExecutionSize="
245
        + maxConcurrentLocalActivityExecutionSize
246
        + ", taskListActivitiesPerSecond="
247
        + taskListActivitiesPerSecond
248
        + ", activityPollerOptions="
249
        + activityPollerOptions
250
        + ", workflowPollerOptions="
251
        + workflowPollerOptions
252
        + '}';
253
  }
254
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc