• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #333

16 Oct 2024 07:28PM UTC coverage: 78.65% (+0.6%) from 78.085%
#333

push

github

web-flow
Fix code coverage (#2275)

22670 of 28824 relevant lines covered (78.65%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.33
/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.replay;
22

23
import com.uber.m3.tally.Scope;
24
import io.temporal.api.command.v1.*;
25
import io.temporal.api.common.v1.*;
26
import io.temporal.api.failure.v1.Failure;
27
import io.temporal.api.history.v1.HistoryEvent;
28
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
29
import io.temporal.api.sdk.v1.UserMetadata;
30
import io.temporal.common.RetryOptions;
31
import io.temporal.failure.CanceledFailure;
32
import io.temporal.internal.common.ProtobufTimeUtils;
33
import io.temporal.internal.common.SdkFlag;
34
import io.temporal.internal.statemachines.*;
35
import io.temporal.internal.worker.SingleWorkerOptions;
36
import io.temporal.workflow.Functions;
37
import io.temporal.workflow.Functions.Func;
38
import io.temporal.workflow.Functions.Func1;
39
import java.time.Duration;
40
import java.util.*;
41
import javax.annotation.Nonnull;
42
import javax.annotation.Nullable;
43

44
/**
45
 * TODO callbacks usage is non consistent. It accepts Optional and Exception which can be null.
46
 * Switch both to nullable.
47
 */
48
final class ReplayWorkflowContextImpl implements ReplayWorkflowContext {
49
  private final BasicWorkflowContext basicWorkflowContext;
50
  private final WorkflowStateMachines workflowStateMachines;
51
  private final WorkflowMutableState mutableState;
52
  private final @Nullable String fullReplayDirectQueryName;
53
  private final Scope replayAwareWorkflowMetricsScope;
54
  private final SingleWorkerOptions workerOptions;
55

56
  /**
57
   * @param fullReplayDirectQueryName query name if an execution is a full replay caused by a direct
58
   *     query, null otherwise
59
   */
60
  ReplayWorkflowContextImpl(
61
      WorkflowStateMachines workflowStateMachines,
62
      String namespace,
63
      WorkflowExecutionStartedEventAttributes startedAttributes,
64
      WorkflowExecution workflowExecution,
65
      long runStartedTimestampMillis,
66
      @Nullable String fullReplayDirectQueryName,
67
      SingleWorkerOptions workerOptions,
68
      Scope workflowMetricsScope) {
1✔
69
    this.workflowStateMachines = workflowStateMachines;
1✔
70
    this.basicWorkflowContext =
1✔
71
        new BasicWorkflowContext(
72
            namespace, workflowExecution, startedAttributes, runStartedTimestampMillis);
73
    this.mutableState = new WorkflowMutableState(startedAttributes);
1✔
74
    this.fullReplayDirectQueryName = fullReplayDirectQueryName;
1✔
75
    this.replayAwareWorkflowMetricsScope =
1✔
76
        new ReplayAwareScope(workflowMetricsScope, this, workflowStateMachines::currentTimeMillis);
1✔
77
    this.workerOptions = workerOptions;
1✔
78
  }
1✔
79

80
  @Override
81
  public boolean getEnableLoggingInReplay() {
82
    return workerOptions.getEnableLoggingInReplay();
1✔
83
  }
84

85
  @Override
86
  public UUID randomUUID() {
87
    return workflowStateMachines.randomUUID();
1✔
88
  }
89

90
  @Override
91
  public Random newRandom() {
92
    return workflowStateMachines.newRandom();
1✔
93
  }
94

95
  @Override
96
  public Scope getMetricsScope() {
97
    return replayAwareWorkflowMetricsScope;
1✔
98
  }
99

100
  @Nonnull
101
  @Override
102
  public WorkflowExecution getWorkflowExecution() {
103
    return basicWorkflowContext.getWorkflowExecution();
1✔
104
  }
105

106
  @Override
107
  public WorkflowExecution getParentWorkflowExecution() {
108
    return basicWorkflowContext.getParentWorkflowExecution();
1✔
109
  }
110

111
  @Override
112
  public String getFirstExecutionRunId() {
113
    return basicWorkflowContext.getFirstExecutionRunId();
×
114
  }
115

116
  @Override
117
  public Optional<String> getContinuedExecutionRunId() {
118
    return basicWorkflowContext.getContinuedExecutionRunId();
1✔
119
  }
120

121
  @Override
122
  public String getOriginalExecutionRunId() {
123
    return basicWorkflowContext.getOriginalExecutionRunId();
×
124
  }
125

126
  @Override
127
  public WorkflowType getWorkflowType() {
128
    return basicWorkflowContext.getWorkflowType();
1✔
129
  }
130

131
  @Nonnull
132
  @Override
133
  public Duration getWorkflowTaskTimeout() {
134
    return basicWorkflowContext.getWorkflowTaskTimeout();
1✔
135
  }
136

137
  @Override
138
  public String getTaskQueue() {
139
    return basicWorkflowContext.getTaskQueue();
1✔
140
  }
141

142
  @Nullable
143
  @Override
144
  public RetryOptions getRetryOptions() {
145
    return basicWorkflowContext.getRetryOptions();
1✔
146
  }
147

148
  @Override
149
  public String getNamespace() {
150
    return basicWorkflowContext.getNamespace();
1✔
151
  }
152

153
  @Override
154
  public String getWorkflowId() {
155
    return basicWorkflowContext.getWorkflowExecution().getWorkflowId();
1✔
156
  }
157

158
  @Nonnull
159
  @Override
160
  public String getRunId() {
161
    String result = basicWorkflowContext.getWorkflowExecution().getRunId();
1✔
162
    if (result.isEmpty()) {
1✔
163
      return null;
×
164
    }
165
    return result;
1✔
166
  }
167

168
  @Override
169
  public Duration getWorkflowRunTimeout() {
170
    return basicWorkflowContext.getWorkflowRunTimeout();
×
171
  }
172

173
  @Override
174
  public Duration getWorkflowExecutionTimeout() {
175
    return basicWorkflowContext.getWorkflowExecutionTimeout();
×
176
  }
177

178
  @Override
179
  public long getRunStartedTimestampMillis() {
180
    return basicWorkflowContext.getRunStartedTimestampMillis();
1✔
181
  }
182

183
  @Override
184
  public Payload getMemo(String key) {
185
    return mutableState.getMemo(key);
1✔
186
  }
187

188
  @Override
189
  @Nullable
190
  public SearchAttributes getSearchAttributes() {
191
    return mutableState.getSearchAttributes();
1✔
192
  }
193

194
  @Override
195
  public ScheduleActivityTaskOutput scheduleActivityTask(
196
      ExecuteActivityParameters parameters, Functions.Proc2<Optional<Payloads>, Failure> callback) {
197
    ScheduleActivityTaskCommandAttributes.Builder attributes = parameters.getAttributes();
1✔
198
    if (attributes.getActivityId().isEmpty()) {
1✔
199
      attributes.setActivityId(workflowStateMachines.randomUUID().toString());
1✔
200
    }
201
    Functions.Proc cancellationHandler =
1✔
202
        workflowStateMachines.scheduleActivityTask(parameters, callback);
1✔
203
    return new ScheduleActivityTaskOutput(
1✔
204
        attributes.getActivityId(), (exception) -> cancellationHandler.apply());
1✔
205
  }
206

207
  @Override
208
  public Functions.Proc scheduleLocalActivityTask(
209
      ExecuteLocalActivityParameters parameters, LocalActivityCallback callback) {
210
    return workflowStateMachines.scheduleLocalActivityTask(parameters, callback);
1✔
211
  }
212

213
  @Override
214
  public Functions.Proc1<Exception> startChildWorkflow(
215
      StartChildWorkflowExecutionParameters parameters,
216
      Functions.Proc2<WorkflowExecution, Exception> startCallback,
217
      Functions.Proc2<Optional<Payloads>, Exception> completionCallback) {
218
    Functions.Proc cancellationHandler =
1✔
219
        workflowStateMachines.startChildWorkflow(parameters, startCallback, completionCallback);
1✔
220
    return (exception) -> cancellationHandler.apply();
1✔
221
  }
222

223
  @Override
224
  public Functions.Proc1<Exception> startNexusOperation(
225
      ScheduleNexusOperationCommandAttributes attributes,
226
      Functions.Proc2<Optional<String>, Failure> startedCallback,
227
      Functions.Proc2<Optional<Payload>, Failure> completionCallback) {
228
    Functions.Proc cancellationHandler =
1✔
229
        workflowStateMachines.startNexusOperation(attributes, startedCallback, completionCallback);
1✔
230
    return (exception) -> cancellationHandler.apply();
1✔
231
  }
232

233
  @Override
234
  public Functions.Proc1<Exception> signalExternalWorkflowExecution(
235
      SignalExternalWorkflowExecutionCommandAttributes.Builder attributes,
236
      Functions.Proc2<Void, Failure> callback) {
237
    Functions.Proc cancellationHandler =
1✔
238
        workflowStateMachines.signalExternalWorkflowExecution(attributes.build(), callback);
1✔
239
    return (e) -> cancellationHandler.apply();
1✔
240
  }
241

242
  @Override
243
  public void requestCancelExternalWorkflowExecution(
244
      WorkflowExecution execution, Functions.Proc2<Void, RuntimeException> callback) {
245
    RequestCancelExternalWorkflowExecutionCommandAttributes attributes =
246
        RequestCancelExternalWorkflowExecutionCommandAttributes.newBuilder()
×
247
            .setWorkflowId(execution.getWorkflowId())
×
248
            .setRunId(execution.getRunId())
×
249
            .build();
×
250
    workflowStateMachines.requestCancelExternalWorkflowExecution(attributes, callback);
×
251
  }
×
252

253
  @Override
254
  public boolean isReplaying() {
255
    return workflowStateMachines.isReplaying();
1✔
256
  }
257

258
  @Override
259
  public boolean tryUseSdkFlag(SdkFlag flag) {
260
    return workflowStateMachines.tryUseSdkFlag(flag);
1✔
261
  }
262

263
  @Override
264
  public Optional<String> getCurrentBuildId() {
265
    String curTaskBID = workflowStateMachines.getCurrentTaskBuildId();
×
266
    // The current task started id == 0 check is to avoid setting the build id to this worker's ID
267
    // in the event we're
268
    // servicing a query, in which case we do want to use the ID from history.
269
    if (!workflowStateMachines.isReplaying()
×
270
        && workflowStateMachines.getCurrentWFTStartedEventId() != 0) {
×
271
      curTaskBID = workerOptions.getBuildId();
×
272
    }
273
    return Optional.ofNullable(curTaskBID);
×
274
  }
275

276
  @Override
277
  public Functions.Proc1<RuntimeException> newTimer(
278
      Duration delay, UserMetadata metadata, Functions.Proc1<RuntimeException> callback) {
279
    if (delay.compareTo(Duration.ZERO) <= 0) {
1✔
280
      callback.apply(null);
1✔
281
      return (e) -> {};
1✔
282
    }
283
    StartTimerCommandAttributes attributes =
284
        StartTimerCommandAttributes.newBuilder()
1✔
285
            .setStartToFireTimeout(ProtobufTimeUtils.toProtoDuration(delay))
1✔
286
            .setTimerId(workflowStateMachines.randomUUID().toString())
1✔
287
            .build();
1✔
288
    Functions.Proc cancellationHandler =
1✔
289
        workflowStateMachines.newTimer(
1✔
290
            attributes, metadata, (event) -> handleTimerCallback(callback, event));
1✔
291
    return (e) -> cancellationHandler.apply();
1✔
292
  }
293

294
  private void handleTimerCallback(Functions.Proc1<RuntimeException> callback, HistoryEvent event) {
295
    switch (event.getEventType()) {
1✔
296
      case EVENT_TYPE_TIMER_FIRED:
297
        {
298
          callback.apply(null);
1✔
299
          return;
1✔
300
        }
301
      case EVENT_TYPE_TIMER_CANCELED:
302
        {
303
          CanceledFailure exception = new CanceledFailure("Canceled by request");
1✔
304
          callback.apply(exception);
1✔
305
          return;
1✔
306
        }
307
      default:
308
        throw new IllegalArgumentException("Unexpected event type: " + event.getEventType());
×
309
    }
310
  }
311

312
  @Override
313
  public void sideEffect(
314
      Func<Optional<Payloads>> func, Functions.Proc1<Optional<Payloads>> callback) {
315
    workflowStateMachines.sideEffect(func, callback);
1✔
316
  }
1✔
317

318
  @Override
319
  public void mutableSideEffect(
320
      String id,
321
      Func1<Optional<Payloads>, Optional<Payloads>> func,
322
      Functions.Proc1<Optional<Payloads>> callback) {
323
    workflowStateMachines.mutableSideEffect(id, func, callback);
1✔
324
  }
1✔
325

326
  @Override
327
  public boolean getVersion(
328
      String changeId,
329
      int minSupported,
330
      int maxSupported,
331
      Functions.Proc2<Integer, RuntimeException> callback) {
332
    return workflowStateMachines.getVersion(changeId, minSupported, maxSupported, callback);
1✔
333
  }
334

335
  @Override
336
  public long currentTimeMillis() {
337
    return workflowStateMachines.currentTimeMillis();
1✔
338
  }
339

340
  @Override
341
  public void upsertSearchAttributes(@Nonnull SearchAttributes searchAttributes) {
342
    workflowStateMachines.upsertSearchAttributes(searchAttributes);
1✔
343
    mutableState.upsertSearchAttributes(searchAttributes);
1✔
344
  }
1✔
345

346
  @Override
347
  public void upsertMemo(@Nonnull Memo memo) {
348
    workflowStateMachines.upsertMemo(memo);
1✔
349
    mutableState.upsertMemo(memo);
1✔
350
  }
1✔
351

352
  @Override
353
  public int getAttempt() {
354
    return basicWorkflowContext.getAttempt();
1✔
355
  }
356

357
  @Override
358
  public String getCronSchedule() {
359
    return basicWorkflowContext.getCronSchedule();
1✔
360
  }
361

362
  @Override
363
  @Nullable
364
  public Payloads getLastCompletionResult() {
365
    return basicWorkflowContext.getLastCompletionResult();
1✔
366
  }
367

368
  @Override
369
  @Nullable
370
  public Failure getPreviousRunFailure() {
371
    return basicWorkflowContext.getPreviousRunFailure();
1✔
372
  }
373

374
  @Nullable
375
  @Override
376
  public String getFullReplayDirectQueryName() {
377
    return fullReplayDirectQueryName;
1✔
378
  }
379

380
  @Override
381
  public Map<String, Payload> getHeader() {
382
    return basicWorkflowContext.getHeader();
1✔
383
  }
384

385
  @Override
386
  public long getLastWorkflowTaskStartedEventId() {
387
    return workflowStateMachines.getLastWFTStartedEventId();
1✔
388
  }
389

390
  @Override
391
  public long getHistorySize() {
392
    return workflowStateMachines.getHistorySize();
1✔
393
  }
394

395
  @Override
396
  public boolean isContinueAsNewSuggested() {
397
    return workflowStateMachines.isContinueAsNewSuggested();
1✔
398
  }
399

400
  /*
401
   * MUTABLE STATE OPERATIONS
402
   */
403

404
  @Override
405
  public boolean isCancelRequested() {
406
    return mutableState.isCancelRequested();
1✔
407
  }
408

409
  @Override
410
  public void setCancelRequested() {
411
    mutableState.setCancelRequested();
1✔
412
  }
1✔
413

414
  public boolean isWorkflowMethodCompleted() {
415
    return mutableState.isWorkflowMethodCompleted();
1✔
416
  }
417

418
  @Override
419
  public void setWorkflowMethodCompleted() {
420
    this.mutableState.setWorkflowMethodCompleted();
1✔
421
  }
1✔
422

423
  @Override
424
  public ContinueAsNewWorkflowExecutionCommandAttributes getContinueAsNewOnCompletion() {
425
    return mutableState.getContinueAsNewOnCompletion();
1✔
426
  }
427

428
  @Override
429
  public void continueAsNewOnCompletion(
430
      ContinueAsNewWorkflowExecutionCommandAttributes attributes) {
431
    mutableState.continueAsNewOnCompletion(attributes);
1✔
432
  }
1✔
433

434
  @Override
435
  public Throwable getWorkflowTaskFailure() {
436
    return mutableState.getWorkflowTaskFailure();
1✔
437
  }
438

439
  @Override
440
  public void failWorkflowTask(Throwable failure) {
441
    mutableState.failWorkflowTask(failure);
×
442
  }
×
443
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc