• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #166

pending completion
#166

push

github-actions

web-flow
Add comment on Workflow#newQueue (#1747)

17109 of 20962 relevant lines covered (81.62%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.85
/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.replay;
22

23
import com.uber.m3.tally.Scope;
24
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
25
import io.temporal.api.command.v1.RequestCancelExternalWorkflowExecutionCommandAttributes;
26
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributes;
27
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
28
import io.temporal.api.command.v1.StartTimerCommandAttributes;
29
import io.temporal.api.common.v1.*;
30
import io.temporal.api.failure.v1.Failure;
31
import io.temporal.api.history.v1.HistoryEvent;
32
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
33
import io.temporal.failure.CanceledFailure;
34
import io.temporal.internal.common.ProtobufTimeUtils;
35
import io.temporal.internal.statemachines.*;
36
import io.temporal.internal.worker.SingleWorkerOptions;
37
import io.temporal.workflow.Functions;
38
import io.temporal.workflow.Functions.Func;
39
import io.temporal.workflow.Functions.Func1;
40
import java.time.Duration;
41
import java.util.*;
42
import javax.annotation.Nonnull;
43
import javax.annotation.Nullable;
44

45
/**
46
 * TODO callbacks usage is non consistent. It accepts Optional and Exception which can be null.
47
 * Switch both to nullable.
48
 */
49
final class ReplayWorkflowContextImpl implements ReplayWorkflowContext {
50
  private final BasicWorkflowContext basicWorkflowContext;
51
  private final WorkflowStateMachines workflowStateMachines;
52
  private final WorkflowMutableState mutableState;
53
  private final @Nullable String fullReplayDirectQueryName;
54
  private final Scope replayAwareWorkflowMetricsScope;
55
  private final SingleWorkerOptions workerOptions;
56

57
  /**
58
   * @param fullReplayDirectQueryName query name if an execution is a full replay caused by a direct
59
   *     query, null otherwise
60
   */
61
  ReplayWorkflowContextImpl(
62
      WorkflowStateMachines workflowStateMachines,
63
      String namespace,
64
      WorkflowExecutionStartedEventAttributes startedAttributes,
65
      WorkflowExecution workflowExecution,
66
      long runStartedTimestampMillis,
67
      @Nullable String fullReplayDirectQueryName,
68
      SingleWorkerOptions workerOptions,
69
      Scope workflowMetricsScope) {
1✔
70
    this.workflowStateMachines = workflowStateMachines;
1✔
71
    this.basicWorkflowContext =
1✔
72
        new BasicWorkflowContext(
73
            namespace, workflowExecution, startedAttributes, runStartedTimestampMillis);
74
    this.mutableState = new WorkflowMutableState(startedAttributes);
1✔
75
    this.fullReplayDirectQueryName = fullReplayDirectQueryName;
1✔
76
    this.replayAwareWorkflowMetricsScope =
1✔
77
        new ReplayAwareScope(workflowMetricsScope, this, workflowStateMachines::currentTimeMillis);
1✔
78
    this.workerOptions = workerOptions;
1✔
79
  }
1✔
80

81
  @Override
82
  public boolean getEnableLoggingInReplay() {
83
    return workerOptions.getEnableLoggingInReplay();
×
84
  }
85

86
  @Override
87
  public UUID randomUUID() {
88
    return workflowStateMachines.randomUUID();
1✔
89
  }
90

91
  @Override
92
  public Random newRandom() {
93
    return workflowStateMachines.newRandom();
1✔
94
  }
95

96
  @Override
97
  public Scope getMetricsScope() {
98
    return replayAwareWorkflowMetricsScope;
1✔
99
  }
100

101
  @Nonnull
102
  @Override
103
  public WorkflowExecution getWorkflowExecution() {
104
    return basicWorkflowContext.getWorkflowExecution();
1✔
105
  }
106

107
  @Override
108
  public WorkflowExecution getParentWorkflowExecution() {
109
    return basicWorkflowContext.getParentWorkflowExecution();
1✔
110
  }
111

112
  @Override
113
  public String getFirstExecutionRunId() {
114
    return basicWorkflowContext.getFirstExecutionRunId();
×
115
  }
116

117
  @Override
118
  public Optional<String> getContinuedExecutionRunId() {
119
    return basicWorkflowContext.getContinuedExecutionRunId();
1✔
120
  }
121

122
  @Override
123
  public String getOriginalExecutionRunId() {
124
    return basicWorkflowContext.getOriginalExecutionRunId();
×
125
  }
126

127
  @Override
128
  public WorkflowType getWorkflowType() {
129
    return basicWorkflowContext.getWorkflowType();
1✔
130
  }
131

132
  @Nonnull
133
  @Override
134
  public Duration getWorkflowTaskTimeout() {
135
    return basicWorkflowContext.getWorkflowTaskTimeout();
1✔
136
  }
137

138
  @Override
139
  public String getTaskQueue() {
140
    return basicWorkflowContext.getTaskQueue();
1✔
141
  }
142

143
  @Override
144
  public String getNamespace() {
145
    return basicWorkflowContext.getNamespace();
1✔
146
  }
147

148
  @Override
149
  public String getWorkflowId() {
150
    return basicWorkflowContext.getWorkflowExecution().getWorkflowId();
1✔
151
  }
152

153
  @Nonnull
154
  @Override
155
  public String getRunId() {
156
    String result = basicWorkflowContext.getWorkflowExecution().getRunId();
1✔
157
    if (result.isEmpty()) {
1✔
158
      return null;
×
159
    }
160
    return result;
1✔
161
  }
162

163
  @Override
164
  public Duration getWorkflowRunTimeout() {
165
    return basicWorkflowContext.getWorkflowRunTimeout();
×
166
  }
167

168
  @Override
169
  public Duration getWorkflowExecutionTimeout() {
170
    return basicWorkflowContext.getWorkflowExecutionTimeout();
×
171
  }
172

173
  @Override
174
  public long getRunStartedTimestampMillis() {
175
    return basicWorkflowContext.getRunStartedTimestampMillis();
1✔
176
  }
177

178
  @Override
179
  public Payload getMemo(String key) {
180
    return basicWorkflowContext.getMemo(key);
1✔
181
  }
182

183
  @Override
184
  @Nullable
185
  public SearchAttributes getSearchAttributes() {
186
    return mutableState.getSearchAttributes();
1✔
187
  }
188

189
  @Override
190
  public ScheduleActivityTaskOutput scheduleActivityTask(
191
      ExecuteActivityParameters parameters, Functions.Proc2<Optional<Payloads>, Failure> callback) {
192
    ScheduleActivityTaskCommandAttributes.Builder attributes = parameters.getAttributes();
1✔
193
    if (attributes.getActivityId().isEmpty()) {
1✔
194
      attributes.setActivityId(workflowStateMachines.randomUUID().toString());
1✔
195
    }
196
    Functions.Proc cancellationHandler =
1✔
197
        workflowStateMachines.scheduleActivityTask(parameters, callback);
1✔
198
    return new ScheduleActivityTaskOutput(
1✔
199
        attributes.getActivityId(), (exception) -> cancellationHandler.apply());
1✔
200
  }
201

202
  @Override
203
  public Functions.Proc scheduleLocalActivityTask(
204
      ExecuteLocalActivityParameters parameters, LocalActivityCallback callback) {
205
    return workflowStateMachines.scheduleLocalActivityTask(parameters, callback);
1✔
206
  }
207

208
  @Override
209
  public Functions.Proc1<Exception> startChildWorkflow(
210
      StartChildWorkflowExecutionParameters parameters,
211
      Functions.Proc2<WorkflowExecution, Exception> startCallback,
212
      Functions.Proc2<Optional<Payloads>, Exception> completionCallback) {
213
    Functions.Proc cancellationHandler =
1✔
214
        workflowStateMachines.startChildWorkflow(parameters, startCallback, completionCallback);
1✔
215
    return (exception) -> cancellationHandler.apply();
1✔
216
  }
217

218
  @Override
219
  public Functions.Proc1<Exception> signalExternalWorkflowExecution(
220
      SignalExternalWorkflowExecutionCommandAttributes.Builder attributes,
221
      Functions.Proc2<Void, Failure> callback) {
222
    Functions.Proc cancellationHandler =
1✔
223
        workflowStateMachines.signalExternalWorkflowExecution(attributes.build(), callback);
1✔
224
    return (e) -> cancellationHandler.apply();
1✔
225
  }
226

227
  @Override
228
  public void requestCancelExternalWorkflowExecution(
229
      WorkflowExecution execution, Functions.Proc2<Void, RuntimeException> callback) {
230
    RequestCancelExternalWorkflowExecutionCommandAttributes attributes =
231
        RequestCancelExternalWorkflowExecutionCommandAttributes.newBuilder()
×
232
            .setWorkflowId(execution.getWorkflowId())
×
233
            .setRunId(execution.getRunId())
×
234
            .build();
×
235
    workflowStateMachines.requestCancelExternalWorkflowExecution(attributes, callback);
×
236
  }
×
237

238
  @Override
239
  public boolean isReplaying() {
240
    return workflowStateMachines.isReplaying();
1✔
241
  }
242

243
  @Override
244
  public Functions.Proc1<RuntimeException> newTimer(
245
      Duration delay, Functions.Proc1<RuntimeException> callback) {
246
    if (delay.compareTo(Duration.ZERO) <= 0) {
1✔
247
      callback.apply(null);
1✔
248
      return (e) -> {};
1✔
249
    }
250
    StartTimerCommandAttributes attributes =
251
        StartTimerCommandAttributes.newBuilder()
1✔
252
            .setStartToFireTimeout(ProtobufTimeUtils.toProtoDuration(delay))
1✔
253
            .setTimerId(workflowStateMachines.randomUUID().toString())
1✔
254
            .build();
1✔
255
    Functions.Proc cancellationHandler =
1✔
256
        workflowStateMachines.newTimer(attributes, (event) -> handleTimerCallback(callback, event));
1✔
257
    return (e) -> cancellationHandler.apply();
1✔
258
  }
259

260
  private void handleTimerCallback(Functions.Proc1<RuntimeException> callback, HistoryEvent event) {
261
    switch (event.getEventType()) {
1✔
262
      case EVENT_TYPE_TIMER_FIRED:
263
        {
264
          callback.apply(null);
1✔
265
          return;
1✔
266
        }
267
      case EVENT_TYPE_TIMER_CANCELED:
268
        {
269
          CanceledFailure exception = new CanceledFailure("Canceled by request");
1✔
270
          callback.apply(exception);
1✔
271
          return;
1✔
272
        }
273
      default:
274
        throw new IllegalArgumentException("Unexpected event type: " + event.getEventType());
×
275
    }
276
  }
277

278
  @Override
279
  public void sideEffect(
280
      Func<Optional<Payloads>> func, Functions.Proc1<Optional<Payloads>> callback) {
281
    workflowStateMachines.sideEffect(func, callback);
1✔
282
  }
1✔
283

284
  @Override
285
  public void mutableSideEffect(
286
      String id,
287
      Func1<Optional<Payloads>, Optional<Payloads>> func,
288
      Functions.Proc1<Optional<Payloads>> callback) {
289
    workflowStateMachines.mutableSideEffect(id, func, callback);
1✔
290
  }
1✔
291

292
  @Override
293
  public void getVersion(
294
      String changeId,
295
      int minSupported,
296
      int maxSupported,
297
      Functions.Proc2<Integer, RuntimeException> callback) {
298
    workflowStateMachines.getVersion(changeId, minSupported, maxSupported, callback);
1✔
299
  }
1✔
300

301
  @Override
302
  public long currentTimeMillis() {
303
    return workflowStateMachines.currentTimeMillis();
1✔
304
  }
305

306
  @Override
307
  public void upsertSearchAttributes(@Nonnull SearchAttributes searchAttributes) {
308
    workflowStateMachines.upsertSearchAttributes(searchAttributes);
1✔
309
    mutableState.upsertSearchAttributes(searchAttributes);
1✔
310
  }
1✔
311

312
  @Override
313
  public int getAttempt() {
314
    return basicWorkflowContext.getAttempt();
1✔
315
  }
316

317
  @Override
318
  public String getCronSchedule() {
319
    return basicWorkflowContext.getCronSchedule();
1✔
320
  }
321

322
  @Override
323
  @Nullable
324
  public Payloads getLastCompletionResult() {
325
    return basicWorkflowContext.getLastCompletionResult();
1✔
326
  }
327

328
  @Override
329
  @Nullable
330
  public Failure getPreviousRunFailure() {
331
    return basicWorkflowContext.getPreviousRunFailure();
1✔
332
  }
333

334
  @Nullable
335
  @Override
336
  public String getFullReplayDirectQueryName() {
337
    return fullReplayDirectQueryName;
1✔
338
  }
339

340
  @Override
341
  public Map<String, Payload> getHeader() {
342
    return basicWorkflowContext.getHeader();
1✔
343
  }
344

345
  @Override
346
  public long getCurrentWorkflowTaskStartedEventId() {
347
    return workflowStateMachines.getCurrentStartedEventId();
1✔
348
  }
349

350
  /*
351
   * MUTABLE STATE OPERATIONS
352
   */
353

354
  @Override
355
  public boolean isCancelRequested() {
356
    return mutableState.isCancelRequested();
1✔
357
  }
358

359
  @Override
360
  public void setCancelRequested() {
361
    mutableState.setCancelRequested();
1✔
362
  }
1✔
363

364
  public boolean isWorkflowMethodCompleted() {
365
    return mutableState.isWorkflowMethodCompleted();
1✔
366
  }
367

368
  @Override
369
  public void setWorkflowMethodCompleted() {
370
    this.mutableState.setWorkflowMethodCompleted();
1✔
371
  }
1✔
372

373
  @Override
374
  public ContinueAsNewWorkflowExecutionCommandAttributes getContinueAsNewOnCompletion() {
375
    return mutableState.getContinueAsNewOnCompletion();
1✔
376
  }
377

378
  @Override
379
  public void continueAsNewOnCompletion(
380
      ContinueAsNewWorkflowExecutionCommandAttributes attributes) {
381
    mutableState.continueAsNewOnCompletion(attributes);
1✔
382
  }
1✔
383

384
  @Override
385
  public Throwable getWorkflowTaskFailure() {
386
    return mutableState.getWorkflowTaskFailure();
1✔
387
  }
388

389
  @Override
390
  public void failWorkflowTask(Throwable failure) {
391
    mutableState.failWorkflowTask(failure);
×
392
  }
×
393
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc