• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 1759

pending completion
1759

push

buildkite

GitHub
Exposed startedEventAttribute and DataConverter (#799)

* exposing startedEventAttributes through DecisionContext in WorkflowInfo for using in ContinueAsNew

* addition of workflowEventAttributes in dummy implementation

* Addition of dataConverter

8 of 8 new or added lines in 3 files covered. (100.0%)

11114 of 18404 relevant lines covered (60.39%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.95
/src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.replay;
19

20
import com.uber.cadence.DecisionTaskFailedCause;
21
import com.uber.cadence.DecisionTaskFailedEventAttributes;
22
import com.uber.cadence.HistoryEvent;
23
import com.uber.cadence.PollForDecisionTaskResponse;
24
import com.uber.cadence.SearchAttributes;
25
import com.uber.cadence.TimerFiredEventAttributes;
26
import com.uber.cadence.UpsertWorkflowSearchAttributesEventAttributes;
27
import com.uber.cadence.WorkflowExecution;
28
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
29
import com.uber.cadence.WorkflowType;
30
import com.uber.cadence.context.ContextPropagator;
31
import com.uber.cadence.converter.DataConverter;
32
import com.uber.cadence.internal.common.InternalUtils;
33
import com.uber.cadence.internal.metrics.ReplayAwareScope;
34
import com.uber.cadence.internal.worker.LocalActivityWorker;
35
import com.uber.cadence.internal.worker.SingleWorkerOptions;
36
import com.uber.cadence.workflow.Functions.Func;
37
import com.uber.cadence.workflow.Functions.Func1;
38
import com.uber.cadence.workflow.Promise;
39
import com.uber.cadence.workflow.Workflow;
40
import com.uber.m3.tally.Scope;
41
import java.time.Duration;
42
import java.util.List;
43
import java.util.Map;
44
import java.util.Optional;
45
import java.util.Random;
46
import java.util.UUID;
47
import java.util.function.BiConsumer;
48
import java.util.function.BiFunction;
49
import java.util.function.Consumer;
50
import org.slf4j.Logger;
51
import org.slf4j.LoggerFactory;
52

53
final class DecisionContextImpl implements DecisionContext, HistoryEventHandler {
54

55
  private static final Logger log = LoggerFactory.getLogger(DecisionContextImpl.class);
1✔
56

57
  private final ActivityDecisionContext activityClient;
58
  private final WorkflowDecisionContext workflowClient;
59
  private final ClockDecisionContext workflowClock;
60
  private final WorkflowContext workflowContext;
61
  private final Scope metricsScope;
62
  private final boolean enableLoggingInReplay;
63
  private final WorkflowExecutionStartedEventAttributes startedEventAttributes;
64
  private final DataConverter dataConverter;
65

66
  DecisionContextImpl(
67
      DecisionsHelper decisionsHelper,
68
      String domain,
69
      PollForDecisionTaskResponse decisionTask,
70
      WorkflowExecutionStartedEventAttributes startedAttributes,
71
      SingleWorkerOptions options,
72
      BiFunction<LocalActivityWorker.Task, Duration, Boolean> laTaskPoller,
73
      ReplayDecider replayDecider) {
1✔
74
    this.activityClient = new ActivityDecisionContext(decisionsHelper);
1✔
75
    this.workflowContext =
1✔
76
        new WorkflowContext(
77
            domain, decisionTask, startedAttributes, options.getContextPropagators());
1✔
78
    this.workflowClient = new WorkflowDecisionContext(decisionsHelper, workflowContext);
1✔
79
    this.workflowClock =
1✔
80
        new ClockDecisionContext(
81
            decisionsHelper, laTaskPoller, replayDecider, options.getDataConverter());
1✔
82
    this.startedEventAttributes = startedAttributes;
1✔
83
    this.dataConverter = options.getDataConverter();
1✔
84
    this.enableLoggingInReplay = options.getEnableLoggingInReplay();
1✔
85
    this.metricsScope =
1✔
86
        new ReplayAwareScope(options.getMetricsScope(), this, workflowClock::currentTimeMillis);
1✔
87
  }
1✔
88

89
  @Override
90
  public boolean getEnableLoggingInReplay() {
91
    return enableLoggingInReplay;
1✔
92
  }
93

94
  @Override
95
  public UUID randomUUID() {
96
    return workflowClient.randomUUID();
1✔
97
  }
98

99
  @Override
100
  public Random newRandom() {
101
    return workflowClient.newRandom();
1✔
102
  }
103

104
  @Override
105
  public Scope getMetricsScope() {
106
    return metricsScope;
1✔
107
  }
108

109
  @Override
110
  public WorkflowExecution getWorkflowExecution() {
111
    return workflowContext.getWorkflowExecution();
1✔
112
  }
113

114
  @Override
115
  public WorkflowExecution getParentWorkflowExecution() {
116
    return workflowContext.getParentWorkflowExecution();
1✔
117
  }
118

119
  @Override
120
  public WorkflowType getWorkflowType() {
121
    return workflowContext.getWorkflowType();
1✔
122
  }
123

124
  @Override
125
  public boolean isCancelRequested() {
126
    return workflowContext.isCancelRequested();
×
127
  }
128

129
  void setCancelRequested(boolean flag) {
130
    workflowContext.setCancelRequested(flag);
1✔
131
  }
1✔
132

133
  @Override
134
  public ContinueAsNewWorkflowExecutionParameters getContinueAsNewOnCompletion() {
135
    return workflowContext.getContinueAsNewOnCompletion();
1✔
136
  }
137

138
  @Override
139
  public void setContinueAsNewOnCompletion(
140
      ContinueAsNewWorkflowExecutionParameters continueParameters) {
141
    workflowContext.setContinueAsNewOnCompletion(continueParameters);
×
142
  }
×
143

144
  @Override
145
  public int getExecutionStartToCloseTimeoutSeconds() {
146
    return workflowContext.getExecutionStartToCloseTimeoutSeconds();
×
147
  }
148

149
  @Override
150
  public Duration getDecisionTaskTimeout() {
151
    return Duration.ofSeconds(workflowContext.getDecisionTaskTimeoutSeconds());
×
152
  }
153

154
  @Override
155
  public String getTaskList() {
156
    return workflowContext.getTaskList();
1✔
157
  }
158

159
  @Override
160
  public String getDomain() {
161
    return workflowContext.getDomain();
1✔
162
  }
163

164
  @Override
165
  public String getWorkflowId() {
166
    return workflowContext.getWorkflowExecution().getWorkflowId();
1✔
167
  }
168

169
  @Override
170
  public String getRunId() {
171
    return workflowContext.getWorkflowExecution().getRunId();
1✔
172
  }
173

174
  @Override
175
  public Duration getExecutionStartToCloseTimeout() {
176
    return Duration.ofSeconds(workflowContext.getExecutionStartToCloseTimeoutSeconds());
×
177
  }
178

179
  @Override
180
  public SearchAttributes getSearchAttributes() {
181
    return workflowContext.getSearchAttributes();
1✔
182
  }
183

184
  @Override
185
  public List<ContextPropagator> getContextPropagators() {
186
    return workflowContext.getContextPropagators();
1✔
187
  }
188

189
  @Override
190
  public WorkflowExecutionStartedEventAttributes getWorkflowExecutionStartedEventAttributes() {
191
    return startedEventAttributes;
×
192
  }
193

194
  @Override
195
  public DataConverter getDataConverter() {
196
    return dataConverter;
×
197
  }
198

199
  @Override
200
  public Map<String, Object> getPropagatedContexts() {
201
    return workflowContext.getPropagatedContexts();
1✔
202
  }
203

204
  @Override
205
  public Consumer<Exception> scheduleActivityTask(
206
      ExecuteActivityParameters parameters, BiConsumer<byte[], Exception> callback) {
207
    return activityClient.scheduleActivityTask(parameters, callback);
1✔
208
  }
209

210
  @Override
211
  public Consumer<Exception> scheduleLocalActivityTask(
212
      ExecuteLocalActivityParameters parameters, BiConsumer<byte[], Exception> callback) {
213
    return workflowClock.scheduleLocalActivityTask(parameters, callback);
1✔
214
  }
215

216
  @Override
217
  public Consumer<Exception> startChildWorkflow(
218
      StartChildWorkflowExecutionParameters parameters,
219
      Consumer<WorkflowExecution> executionCallback,
220
      BiConsumer<byte[], Exception> callback) {
221
    return workflowClient.startChildWorkflow(parameters, executionCallback, callback);
1✔
222
  }
223

224
  @Override
225
  public boolean isServerSideChildWorkflowRetry() {
226
    return workflowClient.isChildWorkflowExecutionStartedWithRetryOptions();
1✔
227
  }
228

229
  @Override
230
  public boolean isServerSideActivityRetry() {
231
    return activityClient.isActivityScheduledWithRetryOptions();
1✔
232
  }
233

234
  @Override
235
  public Consumer<Exception> signalWorkflowExecution(
236
      SignalExternalWorkflowParameters signalParameters, BiConsumer<Void, Exception> callback) {
237
    return workflowClient.signalWorkflowExecution(signalParameters, callback);
1✔
238
  }
239

240
  @Override
241
  public Promise<Void> requestCancelWorkflowExecution(WorkflowExecution execution) {
242
    workflowClient.requestCancelWorkflowExecution(execution);
×
243
    // TODO: Make promise return success or failure of the cancellation request.
244
    return Workflow.newPromise(null);
×
245
  }
246

247
  @Override
248
  public void continueAsNewOnCompletion(ContinueAsNewWorkflowExecutionParameters parameters) {
249
    workflowClient.continueAsNewOnCompletion(parameters);
1✔
250
  }
1✔
251

252
  void setReplayCurrentTimeMilliseconds(long replayCurrentTimeMilliseconds) {
253
    if (replayCurrentTimeMilliseconds < workflowClock.currentTimeMillis()) {
1✔
254
      if (log.isWarnEnabled()) {
×
255
        log.warn(
×
256
            "Trying to set workflow clock back from "
257
                + workflowClock.currentTimeMillis()
×
258
                + " to "
259
                + replayCurrentTimeMilliseconds
260
                + ". This will be a no-op.");
261
      }
262
      return;
×
263
    }
264
    workflowClock.setReplayCurrentTimeMilliseconds(replayCurrentTimeMilliseconds);
1✔
265
  }
1✔
266

267
  long getReplayCurrentTimeMilliseconds() {
268
    return workflowClock.currentTimeMillis();
1✔
269
  }
270

271
  @Override
272
  public boolean isReplaying() {
273
    return workflowClock.isReplaying();
1✔
274
  }
275

276
  @Override
277
  public Consumer<Exception> createTimer(long delaySeconds, Consumer<Exception> callback) {
278
    return workflowClock.createTimer(delaySeconds, callback);
1✔
279
  }
280

281
  @Override
282
  public byte[] sideEffect(Func<byte[]> func) {
283
    return workflowClock.sideEffect(func);
1✔
284
  }
285

286
  @Override
287
  public Optional<byte[]> mutableSideEffect(
288
      String id, DataConverter converter, Func1<Optional<byte[]>, Optional<byte[]>> func) {
289
    return workflowClock.mutableSideEffect(id, converter, func);
1✔
290
  }
291

292
  @Override
293
  public int getVersion(
294
      String changeID, DataConverter converter, int minSupported, int maxSupported) {
295
    final ClockDecisionContext.GetVersionResult results =
1✔
296
        workflowClock.getVersion(changeID, converter, minSupported, maxSupported);
1✔
297
    if (results.shouldUpdateCadenceChangeVersion()) {
1✔
298
      upsertSearchAttributes(
1✔
299
          InternalUtils.convertMapToSearchAttributes(
1✔
300
              results.getSearchAttributesForChangeVersion()));
1✔
301
    }
302
    return results.getVersion();
1✔
303
  }
304

305
  @Override
306
  public long currentTimeMillis() {
307
    return workflowClock.currentTimeMillis();
1✔
308
  }
309

310
  void setReplaying(boolean replaying) {
311
    workflowClock.setReplaying(replaying);
1✔
312
  }
1✔
313

314
  @Override
315
  public void handleActivityTaskCanceled(HistoryEvent event) {
316
    activityClient.handleActivityTaskCanceled(event);
1✔
317
  }
1✔
318

319
  @Override
320
  public void handleActivityTaskCompleted(HistoryEvent event) {
321
    activityClient.handleActivityTaskCompleted(event);
1✔
322
  }
1✔
323

324
  @Override
325
  public void handleActivityTaskFailed(HistoryEvent event) {
326
    activityClient.handleActivityTaskFailed(event);
1✔
327
  }
1✔
328

329
  @Override
330
  public void handleActivityTaskTimedOut(HistoryEvent event) {
331
    activityClient.handleActivityTaskTimedOut(event);
1✔
332
  }
1✔
333

334
  @Override
335
  public void handleChildWorkflowExecutionCancelRequested(HistoryEvent event) {}
×
336

337
  @Override
338
  public void handleChildWorkflowExecutionCanceled(HistoryEvent event) {
339
    workflowClient.handleChildWorkflowExecutionCanceled(event);
1✔
340
  }
1✔
341

342
  @Override
343
  public void handleChildWorkflowExecutionStarted(HistoryEvent event) {
344
    workflowClient.handleChildWorkflowExecutionStarted(event);
1✔
345
  }
1✔
346

347
  @Override
348
  public void handleChildWorkflowExecutionTimedOut(HistoryEvent event) {
349
    workflowClient.handleChildWorkflowExecutionTimedOut(event);
1✔
350
  }
1✔
351

352
  @Override
353
  public void handleChildWorkflowExecutionTerminated(HistoryEvent event) {
354
    workflowClient.handleChildWorkflowExecutionTerminated(event);
×
355
  }
×
356

357
  @Override
358
  public void handleStartChildWorkflowExecutionFailed(HistoryEvent event) {
359
    workflowClient.handleStartChildWorkflowExecutionFailed(event);
1✔
360
  }
1✔
361

362
  @Override
363
  public void handleChildWorkflowExecutionFailed(HistoryEvent event) {
364
    workflowClient.handleChildWorkflowExecutionFailed(event);
1✔
365
  }
1✔
366

367
  @Override
368
  public void handleChildWorkflowExecutionCompleted(HistoryEvent event) {
369
    workflowClient.handleChildWorkflowExecutionCompleted(event);
1✔
370
  }
1✔
371

372
  @Override
373
  public void handleTimerFired(TimerFiredEventAttributes attributes) {
374
    workflowClock.handleTimerFired(attributes);
1✔
375
  }
1✔
376

377
  @Override
378
  public void handleTimerCanceled(HistoryEvent event) {
379
    workflowClock.handleTimerCanceled(event);
1✔
380
  }
1✔
381

382
  void handleSignalExternalWorkflowExecutionFailed(HistoryEvent event) {
383
    workflowClient.handleSignalExternalWorkflowExecutionFailed(event);
1✔
384
  }
1✔
385

386
  @Override
387
  public void handleExternalWorkflowExecutionSignaled(HistoryEvent event) {
388
    workflowClient.handleExternalWorkflowExecutionSignaled(event);
1✔
389
  }
1✔
390

391
  @Override
392
  public void handleMarkerRecorded(HistoryEvent event) {
393
    workflowClock.handleMarkerRecorded(event);
1✔
394
  }
1✔
395

396
  public void handleDecisionTaskFailed(HistoryEvent event) {
397
    DecisionTaskFailedEventAttributes attr = event.getDecisionTaskFailedEventAttributes();
1✔
398
    if (attr != null && attr.getCause() == DecisionTaskFailedCause.RESET_WORKFLOW) {
1✔
399
      workflowContext.setCurrentRunId(attr.getNewRunId());
1✔
400
    }
401
  }
1✔
402

403
  boolean startUnstartedLaTasks(Duration maxWaitAllowed) {
404
    return workflowClock.startUnstartedLaTasks(maxWaitAllowed);
1✔
405
  }
406

407
  int numPendingLaTasks() {
408
    return workflowClock.numPendingLaTasks();
1✔
409
  }
410

411
  void awaitTaskCompletion(Duration duration) throws InterruptedException {
412
    workflowClock.awaitTaskCompletion(duration);
1✔
413
  }
1✔
414

415
  @Override
416
  public void upsertSearchAttributes(SearchAttributes searchAttributes) {
417
    workflowClock.upsertSearchAttributes(searchAttributes);
1✔
418
    workflowContext.mergeSearchAttributes(searchAttributes);
1✔
419
  }
1✔
420

421
  @Override
422
  public void handleUpsertSearchAttributes(HistoryEvent event) {
423
    UpsertWorkflowSearchAttributesEventAttributes attr =
1✔
424
        event.getUpsertWorkflowSearchAttributesEventAttributes();
1✔
425
    if (attr != null) {
1✔
426
      SearchAttributes searchAttributes = attr.getSearchAttributes();
1✔
427
      workflowContext.mergeSearchAttributes(searchAttributes);
1✔
428
    }
429
  }
1✔
430
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc