• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #153

pending completion
#153

push

github-actions

web-flow
Eager Workflow Task Dispatch (#1674)

Issue #1646

Signed-off-by: Dmitry Spikhalskiy <dmitry@spikhalskiy.com>

213 of 213 new or added lines in 22 files covered. (100.0%)

16682 of 20566 relevant lines covered (81.11%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.62
/temporal-test-server/src/main/java/io/temporal/internal/testservice/RequestContext.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import com.google.common.base.MoreObjects;
24
import com.google.protobuf.Timestamp;
25
import com.google.protobuf.util.Timestamps;
26
import io.grpc.Status;
27
import io.temporal.api.common.v1.WorkflowExecution;
28
import io.temporal.api.history.v1.HistoryEvent;
29
import io.temporal.internal.common.WorkflowExecutionUtils;
30
import io.temporal.internal.testservice.TestWorkflowStore.ActivityTask;
31
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowTask;
32
import io.temporal.workflow.Functions;
33
import java.time.Duration;
34
import java.util.ArrayList;
35
import java.util.List;
36
import java.util.Objects;
37
import java.util.function.LongSupplier;
38
import javax.annotation.Nonnull;
39
import javax.annotation.Nullable;
40

41
final class RequestContext {
42

43
  @FunctionalInterface
44
  interface CommitCallback {
45

46
    void apply(int historySize);
47
  }
48

49
  static final class Timer {
50

51
    private final Duration delay;
52
    private final Runnable callback;
53
    private final String taskInfo;
54
    private Functions.Proc cancellationHandle;
55
    private final Functions.Proc cancellationHandleWrapper;
56

57
    Timer(Duration delay, Runnable callback, String taskInfo) {
1✔
58
      this.delay = delay;
1✔
59
      this.callback = callback;
1✔
60
      this.taskInfo = taskInfo;
1✔
61
      this.cancellationHandleWrapper =
1✔
62
          () -> {
63
            if (this.cancellationHandle != null) {
1✔
64
              this.cancellationHandle.apply();
1✔
65
            }
66
          };
1✔
67
    }
1✔
68

69
    Duration getDelay() {
70
      return delay;
1✔
71
    }
72

73
    Runnable getCallback() {
74
      return callback;
1✔
75
    }
76

77
    String getTaskInfo() {
78
      return taskInfo;
1✔
79
    }
80

81
    public void setCancellationHandle(Functions.Proc cancellationHandle) {
82
      this.cancellationHandle = cancellationHandle;
1✔
83
    }
1✔
84

85
    public Functions.Proc getCancellationHandle() {
86
      return cancellationHandleWrapper;
1✔
87
    }
88
  }
89

90
  static final class TimerLockChange {
91
    private final String caller;
92
    /** +1 or -1 */
93
    private final int change;
94

95
    TimerLockChange(String caller, int change) {
1✔
96
      this.caller = Objects.requireNonNull(caller);
1✔
97
      if (change != -1 && change != 1) {
1✔
98
        throw new IllegalArgumentException("Invalid change: " + change);
×
99
      }
100
      this.change = change;
1✔
101
    }
1✔
102

103
    public String getCaller() {
104
      return caller;
1✔
105
    }
106

107
    public int getChange() {
108
      return change;
1✔
109
    }
110
  }
111

112
  private final LongSupplier clock;
113

114
  private final ExecutionId executionId;
115

116
  private final TestWorkflowMutableState workflowMutableState;
117

118
  private final long initialEventId;
119

120
  private final List<HistoryEvent> events = new ArrayList<>();
1✔
121
  private final List<CommitCallback> commitCallbacks = new ArrayList<>();
1✔
122
  // Contains a workflow task created by the updater that needs to be persisted into a task queue on
123
  // a commit.
124
  // If an eager dispatch was performed, it should be reset to null
125
  private WorkflowTask workflowTaskForMatching;
126
  private final List<ActivityTask> activityTasks = new ArrayList<>();
1✔
127
  private final List<Timer> timers = new ArrayList<>();
1✔
128
  private long workflowCompletedAtEventId = -1;
1✔
129
  private boolean needWorkflowTask;
130
  // How many times call SelfAdvancedTimer#lockTimeSkipping.
131
  // Negative means how many times to call SelfAdvancedTimer#unlockTimeSkipping.
132
  private final List<TimerLockChange> timerLocks = new ArrayList<>();
1✔
133

134
  // Contains an exception that may be published by the updater in case if updater needs to perform
135
  // and commit the changes.
136
  // The updater can't just throw the exception because it will prevent the changes to be committed.
137
  // This exception should be thrown at the very end after performing all the commit actions
138
  private RuntimeException exception;
139

140
  /**
141
   * Creates an instance of the RequestContext
142
   *
143
   * @param clock clock used to timestamp events and schedule timers.
144
   * @param workflowMutableState state of the execution being updated
145
   * @param initialEventId expected id of the next event added to the history
146
   */
147
  RequestContext(
148
      LongSupplier clock, TestWorkflowMutableState workflowMutableState, long initialEventId) {
1✔
149
    this.clock = Objects.requireNonNull(clock);
1✔
150
    this.workflowMutableState = Objects.requireNonNull(workflowMutableState);
1✔
151
    this.executionId = Objects.requireNonNull(workflowMutableState.getExecutionId());
1✔
152
    if (initialEventId <= 0) {
1✔
153
      throw new IllegalArgumentException("Invalid initialEventId: " + initialEventId);
×
154
    }
155
    this.initialEventId = initialEventId;
1✔
156
  }
1✔
157

158
  void add(RequestContext ctx) {
159
    this.activityTasks.addAll(ctx.getActivityTasks());
1✔
160
    this.timers.addAll(ctx.getTimers());
1✔
161
    this.events.addAll(ctx.getEvents());
1✔
162
  }
1✔
163

164
  void lockTimer(String caller) {
165
    timerLocks.add(new TimerLockChange(caller, +1));
1✔
166
  }
1✔
167

168
  void unlockTimer(String caller) {
169
    timerLocks.add(new TimerLockChange(caller, -1));
1✔
170
  }
1✔
171

172
  List<TimerLockChange> getTimerLocks() {
173
    return timerLocks;
1✔
174
  }
175

176
  void clearTimersAndLocks() {
177
    timerLocks.clear();
1✔
178
    timers.clear();
1✔
179
  }
1✔
180

181
  Timestamp currentTime() {
182
    return Timestamps.fromMillis(clock.getAsLong());
1✔
183
  }
184

185
  /** Returns eventId of the added event; */
186
  long addEvent(HistoryEvent event) {
187
    if (workflowMutableState.isTerminalState()) {
1✔
188
      throw Status.NOT_FOUND
×
189
          .withDescription("workflow execution already completed")
×
190
          .asRuntimeException();
×
191
    }
192
    long eventId = initialEventId + events.size();
1✔
193
    if (WorkflowExecutionUtils.isWorkflowExecutionClosedEvent(event)) {
1✔
194
      workflowCompletedAtEventId = eventId;
1✔
195
    } else {
196
      if (workflowCompletedAtEventId > 0 && workflowCompletedAtEventId < eventId) {
1✔
197
        throw new IllegalStateException("Event added after the workflow completion event");
×
198
      }
199
    }
200
    events.add(event);
1✔
201
    return eventId;
1✔
202
  }
203

204
  WorkflowExecution getExecution() {
205
    return executionId.getExecution();
1✔
206
  }
207

208
  public TestWorkflowMutableState getWorkflowMutableState() {
209
    return workflowMutableState;
1✔
210
  }
211

212
  String getNamespace() {
213
    return executionId.getNamespace();
1✔
214
  }
215

216
  public long getInitialEventId() {
217
    return initialEventId;
1✔
218
  }
219

220
  public long getNextEventId() {
221
    return initialEventId + events.size();
1✔
222
  }
223

224
  /**
225
   * Command needed, but there is one already running. So initiate another one as soon as it
226
   * completes.
227
   */
228
  void setNeedWorkflowTask(boolean needWorkflowTask) {
229
    this.needWorkflowTask = needWorkflowTask;
1✔
230
  }
1✔
231

232
  boolean isNeedWorkflowTask() {
233
    return needWorkflowTask;
1✔
234
  }
235

236
  void setWorkflowTaskForMatching(@Nonnull WorkflowTask workflowTaskForMatching) {
237
    this.workflowTaskForMatching = Objects.requireNonNull(workflowTaskForMatching);
1✔
238
  }
1✔
239

240
  @Nullable
241
  WorkflowTask resetWorkflowTaskForMatching() {
242
    WorkflowTask existingTask = this.workflowTaskForMatching;
1✔
243
    this.workflowTaskForMatching = null;
1✔
244
    return existingTask;
1✔
245
  }
246

247
  WorkflowTask getWorkflowTaskForMatching() {
248
    return workflowTaskForMatching;
1✔
249
  }
250

251
  void addActivityTask(ActivityTask activityTask) {
252
    this.activityTasks.add(activityTask);
1✔
253
  }
1✔
254

255
  /**
256
   * @return cancellation handle
257
   */
258
  Functions.Proc addTimer(Duration delay, Runnable callback, String name) {
259
    Timer timer = new Timer(delay, callback, name);
1✔
260
    this.timers.add(timer);
1✔
261
    return timer.getCancellationHandle();
1✔
262
  }
263

264
  public List<Timer> getTimers() {
265
    return timers;
1✔
266
  }
267

268
  List<ActivityTask> getActivityTasks() {
269
    return activityTasks;
1✔
270
  }
271

272
  List<HistoryEvent> getEvents() {
273
    return events;
1✔
274
  }
275

276
  void onCommit(CommitCallback callback) {
277
    commitCallbacks.add(callback);
1✔
278
  }
1✔
279

280
  /**
281
   * @return nextEventId
282
   */
283
  long commitChanges(TestWorkflowStore store) {
284
    return store.save(this);
1✔
285
  }
286

287
  /** Called by {@link TestWorkflowStore#save(RequestContext)} */
288
  void fireCallbacks(int historySize) {
289
    for (CommitCallback callback : commitCallbacks) {
1✔
290
      callback.apply(historySize);
1✔
291
    }
1✔
292
  }
1✔
293

294
  ExecutionId getExecutionId() {
295
    return executionId;
1✔
296
  }
297

298
  public RuntimeException getException() {
299
    return exception;
1✔
300
  }
301

302
  public void setExceptionIfEmpty(RuntimeException exception) {
303
    this.exception = MoreObjects.firstNonNull(this.exception, exception);
1✔
304
  }
1✔
305
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc