• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #204

30 Oct 2023 06:00PM UTC coverage: 77.389% (+0.001%) from 77.388%
#204

push

github-actions

web-flow
Verify history is replayed up to StartedEventId (#1916)

Verify history is replayed up to StartedEventId

10 of 10 new or added lines in 2 files covered. (100.0%)

18725 of 24196 relevant lines covered (77.39%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.59
/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowStoreImpl.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import com.google.common.collect.Iterators;
24
import com.google.common.collect.PeekingIterator;
25
import com.google.protobuf.Timestamp;
26
import com.google.protobuf.util.Timestamps;
27
import io.grpc.Deadline;
28
import io.grpc.Status;
29
import io.temporal.api.common.v1.WorkflowExecution;
30
import io.temporal.api.enums.v1.EventType;
31
import io.temporal.api.enums.v1.HistoryEventFilterType;
32
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
33
import io.temporal.api.history.v1.History;
34
import io.temporal.api.history.v1.HistoryEvent;
35
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
36
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
37
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest;
38
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse;
39
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueRequest;
40
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
41
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueRequest;
42
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
43
import io.temporal.common.WorkflowExecutionHistory;
44
import io.temporal.failure.ApplicationFailure;
45
import io.temporal.internal.common.WorkflowExecutionUtils;
46
import io.temporal.internal.testservice.RequestContext.Timer;
47
import io.temporal.workflow.Functions;
48
import java.time.Duration;
49
import java.util.ArrayList;
50
import java.util.HashMap;
51
import java.util.List;
52
import java.util.Map;
53
import java.util.Map.Entry;
54
import java.util.Optional;
55
import java.util.concurrent.Future;
56
import java.util.concurrent.TimeUnit;
57
import java.util.concurrent.locks.Condition;
58
import java.util.concurrent.locks.Lock;
59
import java.util.concurrent.locks.ReentrantLock;
60
import java.util.stream.Collectors;
61
import org.slf4j.Logger;
62
import org.slf4j.LoggerFactory;
63

64
class TestWorkflowStoreImpl implements TestWorkflowStore {
65

66
  private static final Logger log = LoggerFactory.getLogger(TestWorkflowStoreImpl.class);
1✔
67

68
  private final Lock lock = new ReentrantLock();
1✔
69
  private final Map<ExecutionId, HistoryStore> histories = new HashMap<>();
1✔
70
  private final Map<TaskQueueId, TaskQueue<PollActivityTaskQueueResponse.Builder>>
1✔
71
      activityTaskQueues = new HashMap<>();
72
  private final Map<TaskQueueId, TaskQueue<PollWorkflowTaskQueueResponse.Builder>>
1✔
73
      workflowTaskQueues = new HashMap<>();
74
  private final SelfAdvancingTimer selfAdvancingTimer;
75

76
  private static class HistoryStore {
77

78
    private final ExecutionId id;
79
    private final Lock lock;
80
    private final Condition newEventsCondition;
81
    private final List<HistoryEvent> history = new ArrayList<>();
1✔
82
    private boolean completed;
83

84
    private HistoryStore(ExecutionId id, Lock lock) {
1✔
85
      this.id = id;
1✔
86
      this.lock = lock;
1✔
87
      this.newEventsCondition = lock.newCondition();
1✔
88
    }
1✔
89

90
    public boolean isCompleted() {
91
      return completed;
1✔
92
    }
93

94
    public List<HistoryEvent> getHistory() {
95
      return history;
1✔
96
    }
97

98
    private void checkNextEventId(long nextEventId) {
99
      if (nextEventId != history.size() + 1L && (nextEventId != 0 && history.size() != 0)) {
1✔
100
        throw new IllegalStateException(
×
101
            "NextEventId=" + nextEventId + ", historySize=" + history.size() + " for " + id);
×
102
      }
103
    }
1✔
104

105
    void addAllLocked(List<HistoryEvent> events, Timestamp eventTime) {
106
      for (HistoryEvent event : events) {
1✔
107
        HistoryEvent.Builder eBuilder = event.toBuilder();
1✔
108
        if (completed) {
1✔
109
          throw ApplicationFailure.newNonRetryableFailure("Workflow execution completed.", "test");
×
110
        }
111
        eBuilder.setEventId(history.size() + 1L);
1✔
112
        // It can be set in StateMachines.startActivityTask
113
        if (Timestamps.toMillis(eBuilder.getEventTime()) == 0) {
1✔
114
          eBuilder.setEventTime(eventTime);
1✔
115
        }
116
        history.add(eBuilder.build());
1✔
117
        completed = completed || WorkflowExecutionUtils.isWorkflowExecutionClosedEvent(eBuilder);
1✔
118
      }
1✔
119
      newEventsCondition.signalAll();
1✔
120
    }
1✔
121

122
    long getNextEventIdLocked() {
123
      return history.size() + 1L;
1✔
124
    }
125

126
    List<HistoryEvent> getEventsLocked() {
127
      return history;
1✔
128
    }
129

130
    List<HistoryEvent> waitForNewEvents(
131
        long expectedNextEventId, HistoryEventFilterType filterType, Deadline deadline) {
132
      lock.lock();
1✔
133
      try {
134
        while (true) {
135
          if (completed || getNextEventIdLocked() > expectedNextEventId) {
1✔
136
            if (filterType == HistoryEventFilterType.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT) {
1✔
137
              if (completed) {
1✔
138
                List<HistoryEvent> result = new ArrayList<>(1);
1✔
139
                result.add(history.get(history.size() - 1));
1✔
140
                return result;
1✔
141
              }
142
              expectedNextEventId = getNextEventIdLocked();
1✔
143
              continue;
1✔
144
            }
145
            List<HistoryEvent> result =
×
146
                new ArrayList<>(((int) (getNextEventIdLocked() - expectedNextEventId)));
×
147
            for (int i = (int) expectedNextEventId; i < getNextEventIdLocked(); i++) {
×
148
              result.add(history.get(i));
×
149
            }
150
            return result;
×
151
          }
152
          try {
153
            long toWait;
154
            if (deadline != null) {
1✔
155
              toWait = deadline.timeRemaining(TimeUnit.MILLISECONDS);
1✔
156
              if (toWait <= 0) {
1✔
157
                return null;
1✔
158
              }
159
              newEventsCondition.await(toWait, TimeUnit.MILLISECONDS);
1✔
160
            } else {
161
              newEventsCondition.await();
×
162
            }
163
          } catch (InterruptedException e) {
×
164
            Thread.currentThread().interrupt();
×
165
            return null;
×
166
          }
1✔
167
        }
168
      } finally {
169
        lock.unlock();
1✔
170
      }
171
    }
172
  }
173

174
  public TestWorkflowStoreImpl(SelfAdvancingTimer selfAdvancingTimer) {
1✔
175
    this.selfAdvancingTimer = selfAdvancingTimer;
1✔
176
  }
1✔
177

178
  @Override
179
  public Timestamp currentTime() {
180
    return Timestamps.fromMillis(selfAdvancingTimer.getClock().getAsLong());
1✔
181
  }
182

183
  @Override
184
  public long save(RequestContext ctx) {
185
    long result;
186
    lock.lock();
1✔
187
    try {
188
      ExecutionId executionId = ctx.getExecutionId();
1✔
189
      HistoryStore history = histories.get(executionId);
1✔
190
      List<HistoryEvent> events = ctx.getEvents();
1✔
191
      if (history == null) {
1✔
192
        if (events.isEmpty()
1✔
193
            || events.get(0).getEventType() != EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED) {
1✔
194
          throw new IllegalStateException("No history found for " + executionId);
×
195
        }
196
        history = new HistoryStore(executionId, lock);
1✔
197
        histories.put(executionId, history);
1✔
198
      }
199
      history.checkNextEventId(ctx.getInitialEventId());
1✔
200
      history.addAllLocked(events, ctx.currentTime());
1✔
201
      result = history.getNextEventIdLocked();
1✔
202
      selfAdvancingTimer.updateLocks(ctx.getTimerLocks());
1✔
203
      ctx.fireCallbacks(history.getEventsLocked().size());
1✔
204
    } finally {
205
      lock.unlock();
1✔
206
    }
207
    // Push tasks to the queues out of locks
208
    WorkflowTask workflowTask = ctx.getWorkflowTaskForMatching();
1✔
209

210
    if (workflowTask != null) {
1✔
211
      StickyExecutionAttributes attributes =
1✔
212
          ctx.getWorkflowMutableState().getStickyExecutionAttributes();
1✔
213
      TaskQueueId id =
1✔
214
          new TaskQueueId(
215
              workflowTask.getTaskQueueId().getNamespace(),
1✔
216
              attributes == null
1✔
217
                  ? workflowTask.getTaskQueueId().getTaskQueueName()
1✔
218
                  : attributes.getWorkerTaskQueue().getName());
1✔
219
      if (id.getTaskQueueName().isEmpty() || id.getNamespace().isEmpty()) {
1✔
220
        throw Status.INTERNAL.withDescription("Invalid TaskQueueId: " + id).asRuntimeException();
×
221
      }
222
      TaskQueue<PollWorkflowTaskQueueResponse.Builder> workflowTaskQueue =
1✔
223
          getWorkflowTaskQueueQueue(id);
1✔
224
      workflowTaskQueue.add(workflowTask.getTask());
1✔
225
    }
226

227
    List<ActivityTask> activityTasks = ctx.getActivityTasks();
1✔
228
    if (activityTasks != null) {
1✔
229
      for (ActivityTask activityTask : activityTasks) {
1✔
230
        TaskQueue<PollActivityTaskQueueResponse.Builder> activityTaskQueue =
1✔
231
            getActivityTaskQueueQueue(activityTask.getTaskQueueId());
1✔
232
        activityTaskQueue.add(activityTask.getTask());
1✔
233
      }
1✔
234
    }
235

236
    List<Timer> timers = ctx.getTimers();
1✔
237
    if (timers != null) {
1✔
238
      for (Timer t : timers) {
1✔
239
        log.trace(
1✔
240
            "scheduling timer with " + t.getDelay() + "delay. Current time=" + this.currentTime());
1✔
241
        Functions.Proc cancellationHandle =
1✔
242
            selfAdvancingTimer.schedule(t.getDelay(), t.getCallback(), t.getTaskInfo());
1✔
243
        t.setCancellationHandle(cancellationHandle);
1✔
244
      }
1✔
245
    }
246
    return result;
1✔
247
  }
248

249
  @Override
250
  public void applyTimersAndLocks(RequestContext ctx) {
251
    lock.lock();
1✔
252
    try {
253
      selfAdvancingTimer.updateLocks(ctx.getTimerLocks());
1✔
254
    } finally {
255
      lock.unlock();
1✔
256
    }
257

258
    List<Timer> timers = ctx.getTimers();
1✔
259
    if (timers != null) {
1✔
260
      for (Timer t : timers) {
1✔
261
        Functions.Proc cancellationHandle =
1✔
262
            selfAdvancingTimer.schedule(t.getDelay(), t.getCallback(), t.getTaskInfo());
1✔
263
        t.setCancellationHandle(cancellationHandle);
1✔
264
      }
1✔
265
    }
266

267
    ctx.clearTimersAndLocks();
1✔
268
  }
1✔
269

270
  @Override
271
  public void registerDelayedCallback(Duration delay, Runnable r) {
272
    selfAdvancingTimer.schedule(delay, r, "registerDelayedCallback");
1✔
273
  }
1✔
274

275
  private TaskQueue<PollActivityTaskQueueResponse.Builder> getActivityTaskQueueQueue(
276
      TaskQueueId taskQueueId) {
277
    lock.lock();
1✔
278
    try {
279
      TaskQueue<PollActivityTaskQueueResponse.Builder> activityTaskQueue =
1✔
280
          activityTaskQueues.get(taskQueueId);
1✔
281
      if (activityTaskQueue == null) {
1✔
282
        activityTaskQueue = new TaskQueue<>();
1✔
283
        activityTaskQueues.put(taskQueueId, activityTaskQueue);
1✔
284
      }
285
      return activityTaskQueue;
1✔
286
    } finally {
287
      lock.unlock();
1✔
288
    }
289
  }
290

291
  private TaskQueue<PollWorkflowTaskQueueResponse.Builder> getWorkflowTaskQueueQueue(
292
      TaskQueueId taskQueueId) {
293
    lock.lock();
1✔
294
    try {
295
      TaskQueue<PollWorkflowTaskQueueResponse.Builder> workflowTaskQueue =
1✔
296
          workflowTaskQueues.get(taskQueueId);
1✔
297
      if (workflowTaskQueue == null) {
1✔
298
        workflowTaskQueue = new TaskQueue<>();
1✔
299
        workflowTaskQueues.put(taskQueueId, workflowTaskQueue);
1✔
300
      }
301
      return workflowTaskQueue;
1✔
302
    } finally {
303
      lock.unlock();
1✔
304
    }
305
  }
306

307
  @Override
308
  public Future<PollWorkflowTaskQueueResponse.Builder> pollWorkflowTaskQueue(
309
      PollWorkflowTaskQueueRequest pollRequest) {
310
    final TaskQueueId taskQueueId =
1✔
311
        new TaskQueueId(pollRequest.getNamespace(), pollRequest.getTaskQueue().getName());
1✔
312
    return getWorkflowTaskQueueQueue(taskQueueId).poll();
1✔
313
  }
314

315
  @Override
316
  public Future<PollActivityTaskQueueResponse.Builder> pollActivityTaskQueue(
317
      PollActivityTaskQueueRequest pollRequest) {
318
    final TaskQueueId taskQueueId =
1✔
319
        new TaskQueueId(pollRequest.getNamespace(), pollRequest.getTaskQueue().getName());
1✔
320
    return getActivityTaskQueueQueue(taskQueueId).poll();
1✔
321
  }
322

323
  @Override
324
  public void sendQueryTask(
325
      ExecutionId executionId, TaskQueueId taskQueue, PollWorkflowTaskQueueResponse.Builder task) {
326
    lock.lock();
1✔
327
    try {
328
      HistoryStore historyStore = getHistoryStore(executionId);
1✔
329
      List<HistoryEvent> events = new ArrayList<>(historyStore.getEventsLocked());
1✔
330
      History.Builder history = History.newBuilder();
1✔
331
      PeekingIterator<HistoryEvent> iterator = Iterators.peekingIterator(events.iterator());
1✔
332
      long previousStaredEventId = 0;
1✔
333
      while (iterator.hasNext()) {
1✔
334
        HistoryEvent event = iterator.next();
1✔
335
        if (event.getEventType() == EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED) {
1✔
336
          if (!iterator.hasNext()
1✔
337
              || iterator.peek().getEventType() == EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED) {
1✔
338
            previousStaredEventId = event.getEventId();
1✔
339
          }
340
        } else if (WorkflowExecutionUtils.isWorkflowExecutionClosedEvent(event)) {
1✔
341
          if (iterator.hasNext()) {
1✔
342
            throw Status.INTERNAL
×
343
                .withDescription("Unexpected event after the completion event: " + iterator.peek())
×
344
                .asRuntimeException();
×
345
          }
346
        }
347
      }
1✔
348
      task.setPreviousStartedEventId(previousStaredEventId);
1✔
349
      // it's not a real workflow task and the server sends 0 for startedEventId for such a workflow
350
      // task
351
      task.setStartedEventId(0);
1✔
352
      if (taskQueue.getTaskQueueName().equals(task.getWorkflowExecutionTaskQueue().getName())) {
1✔
353
        history.addAllEvents(events);
1✔
354
      } else {
355
        history.addAllEvents(new ArrayList<>());
1✔
356
      }
357
      task.setHistory(history);
1✔
358
    } finally {
359
      lock.unlock();
1✔
360
    }
361
    TaskQueue<PollWorkflowTaskQueueResponse.Builder> workflowTaskQueue =
1✔
362
        getWorkflowTaskQueueQueue(taskQueue);
1✔
363
    workflowTaskQueue.add(task);
1✔
364
  }
1✔
365

366
  @Override
367
  public GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
368
      ExecutionId executionId,
369
      GetWorkflowExecutionHistoryRequest getRequest,
370
      Deadline deadlineToReturnEmptyResponse) {
371
    HistoryStore history;
372
    // Used to eliminate the race condition on waitForNewEvents
373
    long expectedNextEventId;
374
    lock.lock();
1✔
375
    try {
376
      history = getHistoryStore(executionId);
1✔
377
      if (!getRequest.getWaitNewEvent()) {
1✔
378
        List<HistoryEvent> events = history.getEventsLocked();
1✔
379
        // Copy the list as it is mutable. Individual events assumed immutable.
380
        List<HistoryEvent> eventsCopy =
1✔
381
            events.stream()
1✔
382
                .filter(
1✔
383
                    e -> {
384
                      if (getRequest.getHistoryEventFilterType()
1✔
385
                          != HistoryEventFilterType.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT) {
386
                        return true;
1✔
387
                      }
388

389
                      // They asked for only the close event. There are a variety of ways a workflow
390
                      // can close.
391
                      return WorkflowExecutionUtils.isWorkflowExecutionClosedEvent(e);
×
392
                    })
393
                .collect(Collectors.toList());
1✔
394
        return GetWorkflowExecutionHistoryResponse.newBuilder()
1✔
395
            .setHistory(History.newBuilder().addAllEvents(eventsCopy))
1✔
396
            .build();
1✔
397
      }
398
      expectedNextEventId = history.getNextEventIdLocked();
1✔
399
    } finally {
400
      lock.unlock();
1✔
401
    }
402
    List<HistoryEvent> events =
1✔
403
        history.waitForNewEvents(
1✔
404
            expectedNextEventId,
405
            getRequest.getHistoryEventFilterType(),
1✔
406
            deadlineToReturnEmptyResponse);
407
    GetWorkflowExecutionHistoryResponse.Builder result =
408
        GetWorkflowExecutionHistoryResponse.newBuilder();
1✔
409
    if (events != null) {
1✔
410
      result.setHistory(History.newBuilder().addAllEvents(events));
1✔
411
    }
412
    return result.build();
1✔
413
  }
414

415
  private HistoryStore getHistoryStore(ExecutionId executionId) {
416
    HistoryStore result = histories.get(executionId);
1✔
417
    if (result == null) {
1✔
418
      WorkflowExecution execution = executionId.getExecution();
1✔
419
      throw Status.NOT_FOUND
1✔
420
          .withDescription(
1✔
421
              String.format(
1✔
422
                  "Workflow execution result not found.  " + "WorkflowId: %s, RunId: %s",
423
                  execution.getWorkflowId(), execution.getRunId()))
1✔
424
          .asRuntimeException();
1✔
425
    }
426
    return result;
1✔
427
  }
428

429
  @Override
430
  public void getDiagnostics(StringBuilder result) {
431
    result.append("Stored Workflows:\n");
×
432
    lock.lock();
×
433
    try {
434
      {
435
        for (Entry<ExecutionId, HistoryStore> entry : this.histories.entrySet()) {
×
436
          result.append(entry.getKey());
×
437
          result.append("\n\n");
×
438
          result.append(
×
439
              new WorkflowExecutionHistory(
440
                      History.newBuilder().addAllEvents(entry.getValue().getEventsLocked()).build())
×
441
                  .toProtoText(true));
×
442
          result.append("\n");
×
443
        }
×
444
      }
445
    } finally {
446
      lock.unlock();
×
447
    }
448
    // Uncomment to troubleshoot time skipping issues.
449
    //    timerService.getDiagnostics(result);
450
  }
×
451

452
  @Override
453
  public List<WorkflowExecutionInfo> listWorkflows(
454
      WorkflowState state, Optional<String> filterWorkflowId) {
455
    List<WorkflowExecutionInfo> result = new ArrayList<>();
1✔
456
    for (Entry<ExecutionId, HistoryStore> entry : this.histories.entrySet()) {
1✔
457
      ExecutionId executionId = entry.getKey();
1✔
458
      String workflowId = executionId.getWorkflowId().getWorkflowId();
1✔
459
      if (filterWorkflowId.isPresent() && !workflowId.equals(filterWorkflowId.get())) {
1✔
460
        continue;
×
461
      }
462

463
      if (state == WorkflowState.OPEN) {
1✔
464
        if (entry.getValue().isCompleted()) {
1✔
465
          continue;
×
466
        }
467
        result.add(
1✔
468
            constructWorkflowExecutionInfo(
1✔
469
                entry, executionId, WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING));
470
      } else {
471
        if (!entry.getValue().isCompleted()) {
1✔
472
          continue;
×
473
        }
474
        List<HistoryEvent> history = entry.getValue().getHistory();
1✔
475
        WorkflowExecutionStatus status =
1✔
476
            WorkflowExecutionUtils.getCloseStatus(history.get(history.size() - 1));
1✔
477
        result.add(constructWorkflowExecutionInfo(entry, executionId, status));
1✔
478
      }
479
    }
1✔
480
    return result;
1✔
481
  }
482

483
  private WorkflowExecutionInfo constructWorkflowExecutionInfo(
484
      Entry<ExecutionId, HistoryStore> entry,
485
      ExecutionId executionId,
486
      WorkflowExecutionStatus status) {
487
    List<HistoryEvent> history = entry.getValue().getHistory();
1✔
488
    WorkflowExecutionInfo.Builder info =
489
        WorkflowExecutionInfo.newBuilder()
1✔
490
            .setExecution(executionId.getExecution())
1✔
491
            .setHistoryLength(history.size())
1✔
492
            .setStartTime(history.get(0).getEventTime())
1✔
493
            .setType(history.get(0).getWorkflowExecutionStartedEventAttributes().getWorkflowType());
1✔
494
    if (status != null) {
1✔
495
      info.setStatus(status);
1✔
496
    }
497
    return info.build();
1✔
498
  }
499

500
  @Override
501
  public void close() {
502
    selfAdvancingTimer.shutdown();
1✔
503
  }
1✔
504
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc