• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 1737

pending completion
1737

push

buildkite

GitHub
[TestEnv] populate cronschedule in history start event and correct isCron in the list workflow response (#790)

Issue

TestEnv doesn't populate cronschedule in history for cron-workflows. This is blocking testing interceptors.

Changes

populate cronschedule on the start event
correct isCron in list workflows

5 of 5 new or added lines in 2 files covered. (100.0%)

11119 of 18396 relevant lines covered (60.44%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.81
/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStoreImpl.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.testservice;
19

20
import com.uber.cadence.BadRequestError;
21
import com.uber.cadence.DataBlob;
22
import com.uber.cadence.EntityNotExistsError;
23
import com.uber.cadence.EventType;
24
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
25
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
26
import com.uber.cadence.History;
27
import com.uber.cadence.HistoryEvent;
28
import com.uber.cadence.HistoryEventFilterType;
29
import com.uber.cadence.InternalServiceError;
30
import com.uber.cadence.PollForActivityTaskRequest;
31
import com.uber.cadence.PollForActivityTaskResponse;
32
import com.uber.cadence.PollForDecisionTaskRequest;
33
import com.uber.cadence.PollForDecisionTaskResponse;
34
import com.uber.cadence.StickyExecutionAttributes;
35
import com.uber.cadence.WorkflowExecution;
36
import com.uber.cadence.WorkflowExecutionInfo;
37
import com.uber.cadence.internal.common.InternalUtils;
38
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
39
import com.uber.cadence.internal.testservice.RequestContext.Timer;
40
import java.time.Duration;
41
import java.util.ArrayList;
42
import java.util.HashMap;
43
import java.util.List;
44
import java.util.Map;
45
import java.util.Map.Entry;
46
import java.util.Optional;
47
import java.util.concurrent.BlockingQueue;
48
import java.util.concurrent.LinkedBlockingQueue;
49
import java.util.concurrent.locks.Condition;
50
import java.util.concurrent.locks.Lock;
51
import java.util.concurrent.locks.ReentrantLock;
52

53
class TestWorkflowStoreImpl implements TestWorkflowStore {
54

55
  private static class HistoryStore {
56

57
    private final Lock lock;
58
    private final Condition newEventsCondition;
59
    private final ExecutionId id;
60
    private final List<HistoryEvent> history = new ArrayList<>();
1✔
61
    private boolean completed;
62

63
    private HistoryStore(ExecutionId id, Lock lock) {
1✔
64
      this.id = id;
1✔
65
      this.lock = lock;
1✔
66
      this.newEventsCondition = lock.newCondition();
1✔
67
    }
1✔
68

69
    public boolean isCompleted() {
70
      return completed;
1✔
71
    }
72

73
    public List<HistoryEvent> getHistory() {
74
      return history;
1✔
75
    }
76

77
    private void checkNextEventId(long nextEventId) {
78
      if (nextEventId != history.size() + 1L && (nextEventId != 0 && history.size() != 0)) {
1✔
79
        throw new IllegalStateException(
×
80
            "NextEventId=" + nextEventId + ", historySize=" + history.size() + " for " + id);
×
81
      }
82
    }
1✔
83

84
    void addAllLocked(List<HistoryEvent> events, long timeInNanos) throws EntityNotExistsError {
85
      for (HistoryEvent event : events) {
1✔
86
        if (completed) {
1✔
87
          throw new EntityNotExistsError(
×
88
              "Attempt to add an event after a completion event: "
89
                  + WorkflowExecutionUtils.prettyPrintHistoryEvent(event));
×
90
        }
91
        event.setEventId(history.size() + 1L);
1✔
92
        // It can be set in StateMachines.startActivityTask
93
        if (!event.isSetTimestamp()) {
1✔
94
          event.setTimestamp(timeInNanos);
1✔
95
        }
96
        history.add(event);
1✔
97
        completed = completed || WorkflowExecutionUtils.isWorkflowExecutionCompletedEvent(event);
1✔
98
      }
1✔
99
      newEventsCondition.signal();
1✔
100
    }
1✔
101

102
    long getNextEventIdLocked() {
103
      return history.size() + 1L;
1✔
104
    }
105

106
    List<HistoryEvent> getEventsLocked() {
107
      return history;
1✔
108
    }
109

110
    List<HistoryEvent> waitForNewEvents(
111
        long expectedNextEventId, HistoryEventFilterType filterType) {
112
      lock.lock();
1✔
113
      try {
114
        while (true) {
115
          if (completed || getNextEventIdLocked() > expectedNextEventId) {
1✔
116
            if (filterType == HistoryEventFilterType.CLOSE_EVENT) {
1✔
117
              if (completed) {
1✔
118
                List<HistoryEvent> result = new ArrayList<>(1);
1✔
119
                result.add(history.get(history.size() - 1));
1✔
120
                return result;
1✔
121
              }
122
              expectedNextEventId = getNextEventIdLocked();
1✔
123
              continue;
1✔
124
            }
125
            List<HistoryEvent> result =
×
126
                new ArrayList<>(((int) (getNextEventIdLocked() - expectedNextEventId)));
×
127
            for (int i = (int) expectedNextEventId; i < getNextEventIdLocked(); i++) {
×
128
              result.add(history.get(i));
×
129
            }
130
            return result;
×
131
          }
132
          try {
133
            newEventsCondition.await();
1✔
134
          } catch (InterruptedException e) {
×
135
            return null;
×
136
          }
1✔
137
        }
138
      } finally {
139
        lock.unlock();
1✔
140
      }
141
    }
142
  }
143

144
  private final Lock lock = new ReentrantLock();
1✔
145

146
  private final Map<ExecutionId, HistoryStore> histories = new HashMap<>();
1✔
147

148
  private final Map<TaskListId, BlockingQueue<PollForActivityTaskResponse>> activityTaskLists =
1✔
149
      new HashMap<>();
150

151
  private final Map<TaskListId, BlockingQueue<PollForDecisionTaskResponse>> decisionTaskLists =
1✔
152
      new HashMap<>();
153

154
  private final SelfAdvancingTimer timerService =
1✔
155
      new SelfAdvancingTimerImpl(System.currentTimeMillis());
1✔
156

157
  public TestWorkflowStoreImpl() {
1✔
158
    // locked until the first save
159
    timerService.lockTimeSkipping("TestWorkflowStoreImpl constructor");
1✔
160
  }
1✔
161

162
  @Override
163
  public SelfAdvancingTimer getTimer() {
164
    return timerService;
1✔
165
  }
166

167
  @Override
168
  public long currentTimeMillis() {
169
    return timerService.getClock().getAsLong();
1✔
170
  }
171

172
  @Override
173
  public long save(RequestContext ctx)
174
      throws InternalServiceError, EntityNotExistsError, BadRequestError {
175
    long result;
176
    lock.lock();
1✔
177
    boolean historiesEmpty = histories.isEmpty();
1✔
178
    try {
179
      ExecutionId executionId = ctx.getExecutionId();
1✔
180
      HistoryStore history = histories.get(executionId);
1✔
181
      List<HistoryEvent> events = ctx.getEvents();
1✔
182
      if (history == null) {
1✔
183
        if (events.isEmpty()
1✔
184
            || events.get(0).getEventType() != EventType.WorkflowExecutionStarted) {
1✔
185
          throw new IllegalStateException("No history found for " + executionId);
×
186
        }
187
        history = new HistoryStore(executionId, lock);
1✔
188
        histories.put(executionId, history);
1✔
189
      }
190
      history.checkNextEventId(ctx.getInitialEventId());
1✔
191
      history.addAllLocked(events, ctx.currentTimeInNanoseconds());
1✔
192
      result = history.getNextEventIdLocked();
1✔
193
      timerService.updateLocks(ctx.getTimerLocks(), "TestWorkflowStoreImpl save");
1✔
194
      ctx.fireCallbacks(history.getEventsLocked().size());
1✔
195
    } finally {
196
      if (historiesEmpty && !histories.isEmpty()) {
1✔
197
        timerService.unlockTimeSkipping(
1✔
198
            "TestWorkflowStoreImpl save"); // Initially locked in the constructor
199
      }
200
      lock.unlock();
1✔
201
    }
202
    // Push tasks to the queues out of locks
203
    DecisionTask decisionTask = ctx.getDecisionTask();
1✔
204

205
    if (decisionTask != null) {
1✔
206
      StickyExecutionAttributes attributes =
1✔
207
          ctx.getWorkflowMutableState().getStickyExecutionAttributes();
1✔
208
      TaskListId id =
1✔
209
          new TaskListId(
210
              decisionTask.getTaskListId().getDomain(),
1✔
211
              attributes == null
1✔
212
                  ? decisionTask.getTaskListId().getTaskListName()
1✔
213
                  : attributes.getWorkerTaskList().getName());
1✔
214

215
      BlockingQueue<PollForDecisionTaskResponse> decisionsQueue = getDecisionTaskListQueue(id);
1✔
216
      decisionsQueue.add(decisionTask.getTask());
1✔
217
    }
218

219
    List<ActivityTask> activityTasks = ctx.getActivityTasks();
1✔
220
    if (activityTasks != null) {
1✔
221
      for (ActivityTask activityTask : activityTasks) {
1✔
222
        BlockingQueue<PollForActivityTaskResponse> activitiesQueue =
1✔
223
            getActivityTaskListQueue(activityTask.getTaskListId());
1✔
224
        activitiesQueue.add(activityTask.getTask());
1✔
225
      }
1✔
226
    }
227

228
    List<Timer> timers = ctx.getTimers();
1✔
229
    if (timers != null) {
1✔
230
      for (Timer t : timers) {
1✔
231
        timerService.schedule(
1✔
232
            Duration.ofSeconds(t.getDelaySeconds()), t.getCallback(), t.getTaskInfo());
1✔
233
      }
1✔
234
    }
235
    return result;
1✔
236
  }
237

238
  @Override
239
  public void applyTimersAndLocks(RequestContext ctx) {
240
    lock.lock();
1✔
241
    try {
242
      timerService.updateLocks(ctx.getTimerLocks(), "TestWorkflowStoreImpl applyTimersAndLocks");
1✔
243
    } finally {
244
      lock.unlock();
1✔
245
    }
246

247
    List<Timer> timers = ctx.getTimers();
1✔
248
    if (timers != null) {
1✔
249
      for (Timer t : timers) {
1✔
250
        timerService.schedule(
1✔
251
            Duration.ofSeconds(t.getDelaySeconds()), t.getCallback(), t.getTaskInfo());
1✔
252
      }
1✔
253
    }
254

255
    ctx.clearTimersAndLocks();
1✔
256
  }
1✔
257

258
  @Override
259
  public void registerDelayedCallback(Duration delay, Runnable r) {
260
    timerService.schedule(delay, r, "registerDelayedCallback");
1✔
261
  }
1✔
262

263
  private BlockingQueue<PollForActivityTaskResponse> getActivityTaskListQueue(
264
      TaskListId taskListId) {
265
    lock.lock();
1✔
266
    try {
267
      {
268
        BlockingQueue<PollForActivityTaskResponse> activitiesQueue =
1✔
269
            activityTaskLists.get(taskListId);
1✔
270
        if (activitiesQueue == null) {
1✔
271
          activitiesQueue = new LinkedBlockingQueue<>();
1✔
272
          activityTaskLists.put(taskListId, activitiesQueue);
1✔
273
        }
274
        return activitiesQueue;
1✔
275
      }
276
    } finally {
277
      lock.unlock();
1✔
278
    }
279
  }
280

281
  private BlockingQueue<PollForDecisionTaskResponse> getDecisionTaskListQueue(
282
      TaskListId taskListId) {
283
    lock.lock();
1✔
284
    try {
285
      BlockingQueue<PollForDecisionTaskResponse> decisionsQueue = decisionTaskLists.get(taskListId);
1✔
286
      if (decisionsQueue == null) {
1✔
287
        decisionsQueue = new LinkedBlockingQueue<>();
1✔
288
        decisionTaskLists.put(taskListId, decisionsQueue);
1✔
289
      }
290
      return decisionsQueue;
1✔
291
    } finally {
292
      lock.unlock();
1✔
293
    }
294
  }
295

296
  @Override
297
  public PollForDecisionTaskResponse pollForDecisionTask(PollForDecisionTaskRequest pollRequest)
298
      throws InterruptedException {
299
    TaskListId taskListId =
1✔
300
        new TaskListId(pollRequest.getDomain(), pollRequest.getTaskList().getName());
1✔
301
    BlockingQueue<PollForDecisionTaskResponse> decisionsQueue =
1✔
302
        getDecisionTaskListQueue(taskListId);
1✔
303
    return decisionsQueue.take();
1✔
304
  }
305

306
  @Override
307
  public PollForActivityTaskResponse pollForActivityTask(PollForActivityTaskRequest pollRequest)
308
      throws InterruptedException {
309
    TaskListId taskListId =
1✔
310
        new TaskListId(pollRequest.getDomain(), pollRequest.getTaskList().getName());
1✔
311
    BlockingQueue<PollForActivityTaskResponse> activityTaskQueue =
1✔
312
        getActivityTaskListQueue(taskListId);
1✔
313
    return activityTaskQueue.take();
1✔
314
  }
315

316
  @Override
317
  public void sendQueryTask(
318
      ExecutionId executionId, TaskListId taskList, PollForDecisionTaskResponse task)
319
      throws EntityNotExistsError {
320
    lock.lock();
1✔
321
    try {
322
      HistoryStore historyStore = getHistoryStore(executionId);
1✔
323
      List<HistoryEvent> events = new ArrayList<>(historyStore.getEventsLocked());
1✔
324
      History history = new History();
1✔
325
      if (taskList.getTaskListName().equals(task.getWorkflowExecutionTaskList().getName())) {
1✔
326
        history.setEvents(events);
1✔
327
      } else {
328
        history.setEvents(new ArrayList<>());
1✔
329
      }
330
      task.setHistory(history);
1✔
331
    } finally {
332
      lock.unlock();
1✔
333
    }
334
    BlockingQueue<PollForDecisionTaskResponse> decisionsQueue = getDecisionTaskListQueue(taskList);
1✔
335
    decisionsQueue.add(task);
1✔
336
  }
1✔
337

338
  @Override
339
  public GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
340
      ExecutionId executionId, GetWorkflowExecutionHistoryRequest getRequest)
341
      throws EntityNotExistsError {
342
    HistoryStore history;
343
    // Used to eliminate the race condition on waitForNewEvents
344
    long expectedNextEventId;
345
    lock.lock();
1✔
346
    try {
347
      history = getHistoryStore(executionId);
1✔
348
      if (!getRequest.isWaitForNewEvent()
1✔
349
          && getRequest.getHistoryEventFilterType() != HistoryEventFilterType.CLOSE_EVENT) {
1✔
350
        List<HistoryEvent> events = history.getEventsLocked();
1✔
351
        List<DataBlob> blobs = InternalUtils.SerializeFromHistoryEventToBlobData(events);
1✔
352
        // Copy the list as it is mutable. Individual events assumed immutable.
353
        ArrayList<HistoryEvent> eventsCopy = new ArrayList<>(events);
1✔
354
        return new GetWorkflowExecutionHistoryResponse()
1✔
355
            .setHistory(new History().setEvents(eventsCopy))
1✔
356
            .setRawHistory(blobs);
1✔
357
      }
358
      expectedNextEventId = history.getNextEventIdLocked();
1✔
359
    } finally {
360
      lock.unlock();
1✔
361
    }
362
    List<HistoryEvent> events =
1✔
363
        history.waitForNewEvents(expectedNextEventId, getRequest.getHistoryEventFilterType());
1✔
364
    List<DataBlob> blobs = InternalUtils.SerializeFromHistoryEventToBlobData(events);
1✔
365
    GetWorkflowExecutionHistoryResponse result = new GetWorkflowExecutionHistoryResponse();
1✔
366
    if (events != null) {
1✔
367
      result.setHistory(new History().setEvents(events));
1✔
368
      result.setRawHistory(blobs);
1✔
369
    }
370
    return result;
1✔
371
  }
372

373
  private HistoryStore getHistoryStore(ExecutionId executionId) throws EntityNotExistsError {
374
    HistoryStore result = histories.get(executionId);
1✔
375
    if (result == null) {
1✔
376
      WorkflowExecution execution = executionId.getExecution();
×
377
      throw new EntityNotExistsError(
×
378
          String.format(
×
379
              "Workflow execution result not found.  " + "WorkflowId: %s, RunId: %s",
380
              execution.getWorkflowId(), execution.getRunId()));
×
381
    }
382
    return result;
1✔
383
  }
384

385
  @Override
386
  public void getDiagnostics(StringBuilder result) {
387
    result.append("Stored Workflows:\n");
1✔
388
    lock.lock();
1✔
389
    try {
390
      {
391
        for (Entry<ExecutionId, HistoryStore> entry : this.histories.entrySet()) {
1✔
392
          result.append(entry.getKey());
1✔
393
          result.append("\n");
1✔
394
          result.append(
1✔
395
              WorkflowExecutionUtils.prettyPrintHistory(
1✔
396
                  entry.getValue().getEventsLocked().iterator(), true));
1✔
397
          result.append("\n");
1✔
398
        }
1✔
399
      }
400
    } finally {
401
      lock.unlock();
1✔
402
    }
403
    // Uncomment to troubleshoot time skipping issues.
404
    timerService.getDiagnostics(result);
1✔
405
  }
1✔
406

407
  @Override
408
  public List<WorkflowExecutionInfo> listWorkflows(
409
      WorkflowState state, Optional<String> filterWorkflowId) {
410
    List<WorkflowExecutionInfo> result = new ArrayList<>();
1✔
411
    for (Entry<ExecutionId, HistoryStore> entry : this.histories.entrySet()) {
1✔
412
      if (state == WorkflowState.OPEN) {
1✔
413
        if (entry.getValue().isCompleted()) {
1✔
414
          continue;
×
415
        }
416
        ExecutionId executionId = entry.getKey();
1✔
417
        String workflowId = executionId.getWorkflowId().getWorkflowId();
1✔
418
        if (filterWorkflowId.isPresent() && !workflowId.equals(filterWorkflowId.get())) {
1✔
419
          continue;
×
420
        }
421
        List<HistoryEvent> history = entry.getValue().getHistory();
1✔
422
        WorkflowExecutionInfo info =
1✔
423
            new WorkflowExecutionInfo()
424
                .setExecution(executionId.getExecution())
1✔
425
                .setHistoryLength(history.size())
1✔
426
                .setStartTime(history.get(0).getTimestamp())
1✔
427
                .setIsCron(
1✔
428
                    history.get(0).getWorkflowExecutionStartedEventAttributes().isSetCronSchedule())
1✔
429
                .setType(
1✔
430
                    history.get(0).getWorkflowExecutionStartedEventAttributes().getWorkflowType());
1✔
431
        result.add(info);
1✔
432
      } else {
1✔
433
        if (!entry.getValue().isCompleted()) {
1✔
434
          continue;
1✔
435
        }
436
        ExecutionId executionId = entry.getKey();
1✔
437
        String workflowId = executionId.getWorkflowId().getWorkflowId();
1✔
438
        if (filterWorkflowId.isPresent() && !workflowId.equals(filterWorkflowId.get())) {
1✔
439
          continue;
×
440
        }
441
        List<HistoryEvent> history = entry.getValue().getHistory();
1✔
442
        WorkflowExecutionInfo info =
1✔
443
            new WorkflowExecutionInfo()
444
                .setExecution(executionId.getExecution())
1✔
445
                .setHistoryLength(history.size())
1✔
446
                .setStartTime(history.get(0).getTimestamp())
1✔
447
                .setIsCron(
1✔
448
                    history.get(0).getWorkflowExecutionStartedEventAttributes().isSetCronSchedule())
1✔
449
                .setType(
1✔
450
                    history.get(0).getWorkflowExecutionStartedEventAttributes().getWorkflowType())
1✔
451
                .setCloseStatus(
1✔
452
                    WorkflowExecutionUtils.getCloseStatus(history.get(history.size() - 1)));
1✔
453
        result.add(info);
1✔
454
      }
455
    }
1✔
456
    return result;
1✔
457
  }
458

459
  @Override
460
  public void close() {
461
    timerService.shutdown();
1✔
462
  }
1✔
463
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc