• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #181

pending completion
#181

push

github-actions

web-flow
Properly wrap exceptions from schedule client (#1827)

Wrap schedule exception

37 of 37 new or added lines in 1 file covered. (100.0%)

18557 of 23894 relevant lines covered (77.66%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.85
/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.replay;
22

23
import com.uber.m3.tally.Scope;
24
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
25
import io.temporal.api.command.v1.RequestCancelExternalWorkflowExecutionCommandAttributes;
26
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributes;
27
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
28
import io.temporal.api.command.v1.StartTimerCommandAttributes;
29
import io.temporal.api.common.v1.*;
30
import io.temporal.api.failure.v1.Failure;
31
import io.temporal.api.history.v1.HistoryEvent;
32
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
33
import io.temporal.failure.CanceledFailure;
34
import io.temporal.internal.common.ProtobufTimeUtils;
35
import io.temporal.internal.common.SdkFlag;
36
import io.temporal.internal.statemachines.*;
37
import io.temporal.internal.worker.SingleWorkerOptions;
38
import io.temporal.workflow.Functions;
39
import io.temporal.workflow.Functions.Func;
40
import io.temporal.workflow.Functions.Func1;
41
import java.time.Duration;
42
import java.util.*;
43
import javax.annotation.Nonnull;
44
import javax.annotation.Nullable;
45

46
/**
47
 * TODO callbacks usage is non consistent. It accepts Optional and Exception which can be null.
48
 * Switch both to nullable.
49
 */
50
final class ReplayWorkflowContextImpl implements ReplayWorkflowContext {
51
  private final BasicWorkflowContext basicWorkflowContext;
52
  private final WorkflowStateMachines workflowStateMachines;
53
  private final WorkflowMutableState mutableState;
54
  private final @Nullable String fullReplayDirectQueryName;
55
  private final Scope replayAwareWorkflowMetricsScope;
56
  private final SingleWorkerOptions workerOptions;
57

58
  /**
59
   * @param fullReplayDirectQueryName query name if an execution is a full replay caused by a direct
60
   *     query, null otherwise
61
   */
62
  ReplayWorkflowContextImpl(
63
      WorkflowStateMachines workflowStateMachines,
64
      String namespace,
65
      WorkflowExecutionStartedEventAttributes startedAttributes,
66
      WorkflowExecution workflowExecution,
67
      long runStartedTimestampMillis,
68
      @Nullable String fullReplayDirectQueryName,
69
      SingleWorkerOptions workerOptions,
70
      Scope workflowMetricsScope) {
1✔
71
    this.workflowStateMachines = workflowStateMachines;
1✔
72
    this.basicWorkflowContext =
1✔
73
        new BasicWorkflowContext(
74
            namespace, workflowExecution, startedAttributes, runStartedTimestampMillis);
75
    this.mutableState = new WorkflowMutableState(startedAttributes);
1✔
76
    this.fullReplayDirectQueryName = fullReplayDirectQueryName;
1✔
77
    this.replayAwareWorkflowMetricsScope =
1✔
78
        new ReplayAwareScope(workflowMetricsScope, this, workflowStateMachines::currentTimeMillis);
1✔
79
    this.workerOptions = workerOptions;
1✔
80
  }
1✔
81

82
  @Override
83
  public boolean getEnableLoggingInReplay() {
84
    return workerOptions.getEnableLoggingInReplay();
×
85
  }
86

87
  @Override
88
  public UUID randomUUID() {
89
    return workflowStateMachines.randomUUID();
1✔
90
  }
91

92
  @Override
93
  public Random newRandom() {
94
    return workflowStateMachines.newRandom();
1✔
95
  }
96

97
  @Override
98
  public Scope getMetricsScope() {
99
    return replayAwareWorkflowMetricsScope;
1✔
100
  }
101

102
  @Nonnull
103
  @Override
104
  public WorkflowExecution getWorkflowExecution() {
105
    return basicWorkflowContext.getWorkflowExecution();
1✔
106
  }
107

108
  @Override
109
  public WorkflowExecution getParentWorkflowExecution() {
110
    return basicWorkflowContext.getParentWorkflowExecution();
1✔
111
  }
112

113
  @Override
114
  public String getFirstExecutionRunId() {
115
    return basicWorkflowContext.getFirstExecutionRunId();
×
116
  }
117

118
  @Override
119
  public Optional<String> getContinuedExecutionRunId() {
120
    return basicWorkflowContext.getContinuedExecutionRunId();
1✔
121
  }
122

123
  @Override
124
  public String getOriginalExecutionRunId() {
125
    return basicWorkflowContext.getOriginalExecutionRunId();
×
126
  }
127

128
  @Override
129
  public WorkflowType getWorkflowType() {
130
    return basicWorkflowContext.getWorkflowType();
1✔
131
  }
132

133
  @Nonnull
134
  @Override
135
  public Duration getWorkflowTaskTimeout() {
136
    return basicWorkflowContext.getWorkflowTaskTimeout();
1✔
137
  }
138

139
  @Override
140
  public String getTaskQueue() {
141
    return basicWorkflowContext.getTaskQueue();
1✔
142
  }
143

144
  @Override
145
  public String getNamespace() {
146
    return basicWorkflowContext.getNamespace();
1✔
147
  }
148

149
  @Override
150
  public String getWorkflowId() {
151
    return basicWorkflowContext.getWorkflowExecution().getWorkflowId();
1✔
152
  }
153

154
  @Nonnull
155
  @Override
156
  public String getRunId() {
157
    String result = basicWorkflowContext.getWorkflowExecution().getRunId();
1✔
158
    if (result.isEmpty()) {
1✔
159
      return null;
×
160
    }
161
    return result;
1✔
162
  }
163

164
  @Override
165
  public Duration getWorkflowRunTimeout() {
166
    return basicWorkflowContext.getWorkflowRunTimeout();
×
167
  }
168

169
  @Override
170
  public Duration getWorkflowExecutionTimeout() {
171
    return basicWorkflowContext.getWorkflowExecutionTimeout();
×
172
  }
173

174
  @Override
175
  public long getRunStartedTimestampMillis() {
176
    return basicWorkflowContext.getRunStartedTimestampMillis();
1✔
177
  }
178

179
  @Override
180
  public Payload getMemo(String key) {
181
    return basicWorkflowContext.getMemo(key);
1✔
182
  }
183

184
  @Override
185
  @Nullable
186
  public SearchAttributes getSearchAttributes() {
187
    return mutableState.getSearchAttributes();
1✔
188
  }
189

190
  @Override
191
  public ScheduleActivityTaskOutput scheduleActivityTask(
192
      ExecuteActivityParameters parameters, Functions.Proc2<Optional<Payloads>, Failure> callback) {
193
    ScheduleActivityTaskCommandAttributes.Builder attributes = parameters.getAttributes();
1✔
194
    if (attributes.getActivityId().isEmpty()) {
1✔
195
      attributes.setActivityId(workflowStateMachines.randomUUID().toString());
1✔
196
    }
197
    Functions.Proc cancellationHandler =
1✔
198
        workflowStateMachines.scheduleActivityTask(parameters, callback);
1✔
199
    return new ScheduleActivityTaskOutput(
1✔
200
        attributes.getActivityId(), (exception) -> cancellationHandler.apply());
1✔
201
  }
202

203
  @Override
204
  public Functions.Proc scheduleLocalActivityTask(
205
      ExecuteLocalActivityParameters parameters, LocalActivityCallback callback) {
206
    return workflowStateMachines.scheduleLocalActivityTask(parameters, callback);
1✔
207
  }
208

209
  @Override
210
  public Functions.Proc1<Exception> startChildWorkflow(
211
      StartChildWorkflowExecutionParameters parameters,
212
      Functions.Proc2<WorkflowExecution, Exception> startCallback,
213
      Functions.Proc2<Optional<Payloads>, Exception> completionCallback) {
214
    Functions.Proc cancellationHandler =
1✔
215
        workflowStateMachines.startChildWorkflow(parameters, startCallback, completionCallback);
1✔
216
    return (exception) -> cancellationHandler.apply();
1✔
217
  }
218

219
  @Override
220
  public Functions.Proc1<Exception> signalExternalWorkflowExecution(
221
      SignalExternalWorkflowExecutionCommandAttributes.Builder attributes,
222
      Functions.Proc2<Void, Failure> callback) {
223
    Functions.Proc cancellationHandler =
1✔
224
        workflowStateMachines.signalExternalWorkflowExecution(attributes.build(), callback);
1✔
225
    return (e) -> cancellationHandler.apply();
1✔
226
  }
227

228
  @Override
229
  public void requestCancelExternalWorkflowExecution(
230
      WorkflowExecution execution, Functions.Proc2<Void, RuntimeException> callback) {
231
    RequestCancelExternalWorkflowExecutionCommandAttributes attributes =
232
        RequestCancelExternalWorkflowExecutionCommandAttributes.newBuilder()
×
233
            .setWorkflowId(execution.getWorkflowId())
×
234
            .setRunId(execution.getRunId())
×
235
            .build();
×
236
    workflowStateMachines.requestCancelExternalWorkflowExecution(attributes, callback);
×
237
  }
×
238

239
  @Override
240
  public boolean isReplaying() {
241
    return workflowStateMachines.isReplaying();
1✔
242
  }
243

244
  @Override
245
  public boolean tryUseSdkFlag(SdkFlag flag) {
246
    return workflowStateMachines.tryUseSdkFlag(flag);
1✔
247
  }
248

249
  @Override
250
  public Functions.Proc1<RuntimeException> newTimer(
251
      Duration delay, Functions.Proc1<RuntimeException> callback) {
252
    if (delay.compareTo(Duration.ZERO) <= 0) {
1✔
253
      callback.apply(null);
1✔
254
      return (e) -> {};
1✔
255
    }
256
    StartTimerCommandAttributes attributes =
257
        StartTimerCommandAttributes.newBuilder()
1✔
258
            .setStartToFireTimeout(ProtobufTimeUtils.toProtoDuration(delay))
1✔
259
            .setTimerId(workflowStateMachines.randomUUID().toString())
1✔
260
            .build();
1✔
261
    Functions.Proc cancellationHandler =
1✔
262
        workflowStateMachines.newTimer(attributes, (event) -> handleTimerCallback(callback, event));
1✔
263
    return (e) -> cancellationHandler.apply();
1✔
264
  }
265

266
  private void handleTimerCallback(Functions.Proc1<RuntimeException> callback, HistoryEvent event) {
267
    switch (event.getEventType()) {
1✔
268
      case EVENT_TYPE_TIMER_FIRED:
269
        {
270
          callback.apply(null);
1✔
271
          return;
1✔
272
        }
273
      case EVENT_TYPE_TIMER_CANCELED:
274
        {
275
          CanceledFailure exception = new CanceledFailure("Canceled by request");
1✔
276
          callback.apply(exception);
1✔
277
          return;
1✔
278
        }
279
      default:
280
        throw new IllegalArgumentException("Unexpected event type: " + event.getEventType());
×
281
    }
282
  }
283

284
  @Override
285
  public void sideEffect(
286
      Func<Optional<Payloads>> func, Functions.Proc1<Optional<Payloads>> callback) {
287
    workflowStateMachines.sideEffect(func, callback);
1✔
288
  }
1✔
289

290
  @Override
291
  public void mutableSideEffect(
292
      String id,
293
      Func1<Optional<Payloads>, Optional<Payloads>> func,
294
      Functions.Proc1<Optional<Payloads>> callback) {
295
    workflowStateMachines.mutableSideEffect(id, func, callback);
1✔
296
  }
1✔
297

298
  @Override
299
  public boolean getVersion(
300
      String changeId,
301
      int minSupported,
302
      int maxSupported,
303
      Functions.Proc2<Integer, RuntimeException> callback) {
304
    return workflowStateMachines.getVersion(changeId, minSupported, maxSupported, callback);
1✔
305
  }
306

307
  @Override
308
  public long currentTimeMillis() {
309
    return workflowStateMachines.currentTimeMillis();
1✔
310
  }
311

312
  @Override
313
  public void upsertSearchAttributes(@Nonnull SearchAttributes searchAttributes) {
314
    workflowStateMachines.upsertSearchAttributes(searchAttributes);
1✔
315
    mutableState.upsertSearchAttributes(searchAttributes);
1✔
316
  }
1✔
317

318
  @Override
319
  public int getAttempt() {
320
    return basicWorkflowContext.getAttempt();
1✔
321
  }
322

323
  @Override
324
  public String getCronSchedule() {
325
    return basicWorkflowContext.getCronSchedule();
1✔
326
  }
327

328
  @Override
329
  @Nullable
330
  public Payloads getLastCompletionResult() {
331
    return basicWorkflowContext.getLastCompletionResult();
1✔
332
  }
333

334
  @Override
335
  @Nullable
336
  public Failure getPreviousRunFailure() {
337
    return basicWorkflowContext.getPreviousRunFailure();
1✔
338
  }
339

340
  @Nullable
341
  @Override
342
  public String getFullReplayDirectQueryName() {
343
    return fullReplayDirectQueryName;
1✔
344
  }
345

346
  @Override
347
  public Map<String, Payload> getHeader() {
348
    return basicWorkflowContext.getHeader();
1✔
349
  }
350

351
  @Override
352
  public long getCurrentWorkflowTaskStartedEventId() {
353
    return workflowStateMachines.getCurrentStartedEventId();
1✔
354
  }
355

356
  /*
357
   * MUTABLE STATE OPERATIONS
358
   */
359

360
  @Override
361
  public boolean isCancelRequested() {
362
    return mutableState.isCancelRequested();
1✔
363
  }
364

365
  @Override
366
  public void setCancelRequested() {
367
    mutableState.setCancelRequested();
1✔
368
  }
1✔
369

370
  public boolean isWorkflowMethodCompleted() {
371
    return mutableState.isWorkflowMethodCompleted();
1✔
372
  }
373

374
  @Override
375
  public void setWorkflowMethodCompleted() {
376
    this.mutableState.setWorkflowMethodCompleted();
1✔
377
  }
1✔
378

379
  @Override
380
  public ContinueAsNewWorkflowExecutionCommandAttributes getContinueAsNewOnCompletion() {
381
    return mutableState.getContinueAsNewOnCompletion();
1✔
382
  }
383

384
  @Override
385
  public void continueAsNewOnCompletion(
386
      ContinueAsNewWorkflowExecutionCommandAttributes attributes) {
387
    mutableState.continueAsNewOnCompletion(attributes);
1✔
388
  }
1✔
389

390
  @Override
391
  public Throwable getWorkflowTaskFailure() {
392
    return mutableState.getWorkflowTaskFailure();
1✔
393
  }
394

395
  @Override
396
  public void failWorkflowTask(Throwable failure) {
397
    mutableState.failWorkflowTask(failure);
×
398
  }
×
399
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc