• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #169

pending completion
#169

push

github-actions

web-flow
Remove use of deprecated API (#1758)

4 of 4 new or added lines in 1 file covered. (100.0%)

17345 of 21558 relevant lines covered (80.46%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.0
/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.replay;
22

23
import static io.temporal.internal.common.ProtobufTimeUtils.toJavaDuration;
24
import static io.temporal.serviceclient.CheckedExceptionWrapper.wrap;
25

26
import com.google.common.annotations.VisibleForTesting;
27
import com.google.common.base.Preconditions;
28
import com.google.common.base.Throwables;
29
import com.google.protobuf.util.Durations;
30
import com.google.protobuf.util.Timestamps;
31
import com.uber.m3.tally.Scope;
32
import com.uber.m3.tally.Stopwatch;
33
import io.grpc.Deadline;
34
import io.temporal.api.command.v1.Command;
35
import io.temporal.api.common.v1.Payloads;
36
import io.temporal.api.enums.v1.QueryResultType;
37
import io.temporal.api.history.v1.HistoryEvent;
38
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
39
import io.temporal.api.protocol.v1.Message;
40
import io.temporal.api.query.v1.WorkflowQuery;
41
import io.temporal.api.query.v1.WorkflowQueryResult;
42
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
43
import io.temporal.internal.Config;
44
import io.temporal.internal.common.UpdateMessage;
45
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
46
import io.temporal.internal.statemachines.StatesMachinesCallback;
47
import io.temporal.internal.statemachines.WorkflowStateMachines;
48
import io.temporal.internal.worker.*;
49
import io.temporal.worker.MetricsType;
50
import io.temporal.worker.WorkflowImplementationOptions;
51
import io.temporal.workflow.Functions;
52
import java.time.Duration;
53
import java.util.*;
54
import java.util.concurrent.BlockingQueue;
55
import java.util.concurrent.LinkedBlockingDeque;
56
import java.util.concurrent.TimeUnit;
57
import java.util.concurrent.atomic.AtomicInteger;
58
import java.util.concurrent.locks.Lock;
59
import java.util.concurrent.locks.ReentrantLock;
60

61
/**
62
 * Implements workflow executor that relies on replay of a workflow code. An instance of this class
63
 * is created per cached workflow run.
64
 */
65
class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler {
66
  private final Scope metricsScope;
67

68
  private final WorkflowExecutionStartedEventAttributes startedEvent;
69

70
  private final Lock lock = new ReentrantLock();
1✔
71

72
  private final Functions.Proc1<LocalActivityResult> localActivityCompletionSink;
73

74
  private final BlockingQueue<LocalActivityResult> localActivityCompletionQueue =
1✔
75
      new LinkedBlockingDeque<>();
76

77
  private final LocalActivityDispatcher localActivityDispatcher;
78

79
  private final LocalActivityMeteringHelper localActivityMeteringHelper;
80

81
  private final ReplayWorkflow workflow;
82

83
  private final WorkflowStateMachines workflowStateMachines;
84

85
  /** Number of non completed local activity tasks */
86
  // TODO move and maintain this counter inside workflowStateMachines
87
  private int localActivityTaskCount;
88

89
  private final ReplayWorkflowContextImpl context;
90

91
  private final ReplayWorkflowExecutor replayWorkflowExecutor;
92

93
  ReplayWorkflowRunTaskHandler(
94
      String namespace,
95
      ReplayWorkflow workflow,
96
      PollWorkflowTaskQueueResponseOrBuilder workflowTask,
97
      SingleWorkerOptions workerOptions,
98
      Scope metricsScope,
99
      LocalActivityDispatcher localActivityDispatcher) {
1✔
100
    HistoryEvent startedEvent = workflowTask.getHistory().getEvents(0);
1✔
101
    if (!startedEvent.hasWorkflowExecutionStartedEventAttributes()) {
1✔
102
      throw new IllegalArgumentException(
1✔
103
          "First event in the history is not WorkflowExecutionStarted");
104
    }
105
    this.startedEvent = startedEvent.getWorkflowExecutionStartedEventAttributes();
1✔
106
    this.metricsScope = metricsScope;
1✔
107
    this.localActivityDispatcher = localActivityDispatcher;
1✔
108
    this.workflow = workflow;
1✔
109

110
    this.workflowStateMachines = new WorkflowStateMachines(new StatesMachinesCallbackImpl());
1✔
111
    String fullReplayDirectQueryType =
112
        workflowTask.hasQuery() ? workflowTask.getQuery().getQueryType() : null;
1✔
113
    this.context =
1✔
114
        new ReplayWorkflowContextImpl(
115
            workflowStateMachines,
116
            namespace,
117
            this.startedEvent,
118
            workflowTask.getWorkflowExecution(),
1✔
119
            Timestamps.toMillis(startedEvent.getEventTime()),
1✔
120
            fullReplayDirectQueryType,
121
            workerOptions,
122
            metricsScope);
123

124
    this.replayWorkflowExecutor =
1✔
125
        new ReplayWorkflowExecutor(workflow, workflowStateMachines, context);
126
    this.localActivityCompletionSink = localActivityCompletionQueue::add;
1✔
127
    this.localActivityMeteringHelper = new LocalActivityMeteringHelper();
1✔
128
  }
1✔
129

130
  @Override
131
  public WorkflowTaskResult handleWorkflowTask(
132
      PollWorkflowTaskQueueResponseOrBuilder workflowTask, WorkflowHistoryIterator historyIterator)
133
      throws Throwable {
134
    lock.lock();
1✔
135
    try {
136
      localActivityMeteringHelper.newWFTStarting();
1✔
137

138
      Deadline wftHearbeatDeadline =
1✔
139
          Deadline.after(
1✔
140
              (long)
141
                  (Durations.toNanos(startedEvent.getWorkflowTaskTimeout())
1✔
142
                      * Config.WORKFLOW_TAK_HEARTBEAT_COEFFICIENT),
143
              TimeUnit.NANOSECONDS);
144

145
      if (workflowTask.getPreviousStartedEventId()
1✔
146
          < workflowStateMachines.getCurrentStartedEventId()) {
1✔
147
        // if previousStartedEventId < currentStartedEventId - the last workflow task handled by
148
        // these state machines is ahead of the last handled workflow task known by the server.
149
        // Something is off, the server lost progress.
150
        // If the fact that we error out here becomes undesirable, because we fail the workflow
151
        // task,
152
        // we always can rework it to graceful invalidation of the cache entity and a full replay
153
        // from the server
154
        throw new IllegalStateException(
×
155
            "Server history for the workflow is below the progress of the workflow on the worker, the progress needs to be discarded");
156
      }
157

158
      handleWorkflowTaskImpl(workflowTask, historyIterator);
1✔
159
      processLocalActivityRequests(wftHearbeatDeadline);
1✔
160
      List<Command> commands = workflowStateMachines.takeCommands();
1✔
161
      List<Message> messages = workflowStateMachines.takeMessages();
1✔
162
      if (context.isWorkflowMethodCompleted()) {
1✔
163
        // it's important for query, otherwise the WorkflowTaskHandler is responsible for closing
164
        // and invalidation
165
        close();
1✔
166
      }
167
      if (context.getWorkflowTaskFailure() != null) {
1✔
168
        throw context.getWorkflowTaskFailure();
×
169
      }
170
      Map<String, WorkflowQueryResult> queryResults = executeQueries(workflowTask.getQueriesMap());
1✔
171
      return WorkflowTaskResult.newBuilder()
1✔
172
          .setCommands(commands)
1✔
173
          .setMessages(messages)
1✔
174
          .setQueryResults(queryResults)
1✔
175
          .setFinalCommand(context.isWorkflowMethodCompleted())
1✔
176
          .setForceWorkflowTask(localActivityTaskCount > 0 && !context.isWorkflowMethodCompleted())
1✔
177
          .setNonfirstLocalActivityAttempts(localActivityMeteringHelper.getNonfirstAttempts())
1✔
178
          .build();
1✔
179
    } finally {
180
      lock.unlock();
1✔
181
    }
182
  }
183

184
  @Override
185
  public QueryResult handleDirectQueryWorkflowTask(
186
      PollWorkflowTaskQueueResponseOrBuilder workflowTask, WorkflowHistoryIterator historyIterator)
187
      throws Throwable {
188
    WorkflowQuery query = workflowTask.getQuery();
1✔
189
    lock.lock();
1✔
190
    try {
191
      handleWorkflowTaskImpl(workflowTask, historyIterator);
1✔
192
      if (context.isWorkflowMethodCompleted()) {
1✔
193
        // it's important for query, otherwise the WorkflowTaskHandler is responsible for closing
194
        // and invalidation
195
        close();
1✔
196
      }
197
      if (context.getWorkflowTaskFailure() != null) {
1✔
198
        throw context.getWorkflowTaskFailure();
×
199
      }
200
      Optional<Payloads> resultPayloads = replayWorkflowExecutor.query(query);
1✔
201
      return new QueryResult(resultPayloads, context.isWorkflowMethodCompleted());
1✔
202
    } finally {
203
      lock.unlock();
1✔
204
    }
205
  }
206

207
  @Override
208
  public void setCurrentStartedEvenId(Long eventId) {
209
    workflowStateMachines.setCurrentStartedEventId(eventId);
×
210
  }
×
211

212
  private void handleWorkflowTaskImpl(
213
      PollWorkflowTaskQueueResponseOrBuilder workflowTask,
214
      WorkflowHistoryIterator historyIterator) {
215
    workflowStateMachines.setWorklfowStartedEventId(workflowTask.getStartedEventId());
1✔
216
    workflowStateMachines.setReplaying(workflowTask.getPreviousStartedEventId() > 0);
1✔
217
    workflowStateMachines.setMessages(workflowTask.getMessagesList());
1✔
218
    applyServerHistory(historyIterator);
1✔
219
  }
1✔
220

221
  private void applyServerHistory(WorkflowHistoryIterator historyIterator) {
222
    Duration expiration = toJavaDuration(startedEvent.getWorkflowTaskTimeout());
1✔
223
    historyIterator.initDeadline(Deadline.after(expiration.toMillis(), TimeUnit.MILLISECONDS));
1✔
224

225
    boolean timerStopped = false;
1✔
226
    Stopwatch sw = metricsScope.timer(MetricsType.WORKFLOW_TASK_REPLAY_LATENCY).start();
1✔
227
    try {
228
      while (historyIterator.hasNext()) {
1✔
229
        // iteration itself is intentionally left outside the try-catch below,
230
        // as gRPC exception happened during history iteration should never ever fail the workflow
231
        HistoryEvent event = historyIterator.next();
1✔
232
        boolean hasNext = historyIterator.hasNext();
1✔
233
        try {
234
          workflowStateMachines.handleEvent(event, hasNext);
1✔
235
        } catch (Throwable e) {
1✔
236
          // Fail workflow if exception is of the specified type
237
          WorkflowImplementationOptions implementationOptions =
1✔
238
              workflow.getWorkflowContext().getWorkflowImplementationOptions();
1✔
239
          Class<? extends Throwable>[] failTypes =
1✔
240
              implementationOptions.getFailWorkflowExceptionTypes();
1✔
241
          for (Class<? extends Throwable> failType : failTypes) {
1✔
242
            if (failType.isAssignableFrom(e.getClass())) {
1✔
243
              throw new WorkflowExecutionException(
1✔
244
                  workflow.getWorkflowContext().mapWorkflowExceptionToFailure(e));
1✔
245
            }
246
          }
247
          throw wrap(e);
1✔
248
        }
1✔
249
        if (!timerStopped && !workflowStateMachines.isReplaying()) {
1✔
250
          sw.stop();
1✔
251
          timerStopped = true;
1✔
252
        }
253
      }
1✔
254
    } finally {
255
      if (!timerStopped) {
1✔
256
        sw.stop();
1✔
257
      }
258
    }
259
  }
1✔
260

261
  private Map<String, WorkflowQueryResult> executeQueries(Map<String, WorkflowQuery> queries) {
262
    Map<String, WorkflowQueryResult> queryResults = new HashMap<>();
1✔
263
    for (Map.Entry<String, WorkflowQuery> entry : queries.entrySet()) {
1✔
264
      WorkflowQuery query = entry.getValue();
1✔
265
      try {
266
        Optional<Payloads> queryResult = replayWorkflowExecutor.query(query);
1✔
267
        WorkflowQueryResult.Builder result =
268
            WorkflowQueryResult.newBuilder()
1✔
269
                .setResultType(QueryResultType.QUERY_RESULT_TYPE_ANSWERED);
1✔
270
        if (queryResult.isPresent()) {
1✔
271
          result.setAnswer(queryResult.get());
1✔
272
        }
273
        queryResults.put(entry.getKey(), result.build());
1✔
274
      } catch (Exception e) {
1✔
275
        String stackTrace = Throwables.getStackTraceAsString(e);
1✔
276
        queryResults.put(
1✔
277
            entry.getKey(),
1✔
278
            WorkflowQueryResult.newBuilder()
1✔
279
                .setResultType(QueryResultType.QUERY_RESULT_TYPE_FAILED)
1✔
280
                .setErrorMessage(e + "\n" + stackTrace)
1✔
281
                .build());
1✔
282
      }
1✔
283
    }
1✔
284
    return queryResults;
1✔
285
  }
286

287
  @Override
288
  public void close() {
289
    lock.lock();
1✔
290
    try {
291
      replayWorkflowExecutor.close();
1✔
292
    } finally {
293
      lock.unlock();
1✔
294
    }
295
  }
1✔
296

297
  private void processLocalActivityRequests(Deadline wftHeartbeatDeadline)
298
      throws InterruptedException, Throwable {
299

300
    while (true) {
301
      List<ExecuteLocalActivityParameters> laRequests =
1✔
302
          workflowStateMachines.takeLocalActivityRequests();
1✔
303
      localActivityTaskCount += laRequests.size();
1✔
304

305
      for (ExecuteLocalActivityParameters laRequest : laRequests) {
1✔
306
        boolean accepted =
1✔
307
            localActivityDispatcher.dispatch(
1✔
308
                laRequest, localActivityCompletionSink, wftHeartbeatDeadline);
309
        // TODO do we have to fail? if we didn't fit in a potentially tight timeout left until
310
        // wftHeartbeatDeadline,
311
        //  maybe we can return control, heartbeat and try again with fresh timeout one more time?
312
        Preconditions.checkState(
1✔
313
            accepted,
314
            "Unable to schedule local activity for execution, "
315
                + "no more slots available and local activity task queue is full");
316

317
        localActivityMeteringHelper.addNewLocalActivity(laRequest);
1✔
318
      }
1✔
319

320
      if (localActivityTaskCount == 0) {
1✔
321
        // No outstanding local activity requests
322
        break;
1✔
323
      }
324

325
      long maxWaitTimeTillHeartbeatNs = wftHeartbeatDeadline.timeRemaining(TimeUnit.NANOSECONDS);
1✔
326
      LocalActivityResult laCompletion =
1✔
327
          localActivityCompletionQueue.poll(maxWaitTimeTillHeartbeatNs, TimeUnit.NANOSECONDS);
1✔
328
      if (laCompletion == null) {
1✔
329
        // Need to force a new task as we are out of time
330
        break;
1✔
331
      }
332

333
      localActivityTaskCount--;
1✔
334
      localActivityMeteringHelper.markLocalActivityComplete(laCompletion.getActivityId());
1✔
335

336
      if (laCompletion.getProcessingError() != null) {
1✔
337
        throw laCompletion.getProcessingError().getThrowable();
1✔
338
      }
339

340
      workflowStateMachines.handleLocalActivityCompletion(laCompletion);
1✔
341
      // handleLocalActivityCompletion triggers eventLoop.
342
      // After this call, there may be new local activity requests available in
343
      // workflowStateMachines.takeLocalActivityRequests()
344
      // These requests need to be processed and accounted for, otherwise we may end up not
345
      // heartbeating and completing workflow task instead. So we have to make another iteration.
346
    }
1✔
347

348
    // it's safe to call and discard the result of takeLocalActivityRequests() here, because if it's
349
    // not empty - we are in trouble anyway
350
    Preconditions.checkState(
1✔
351
        workflowStateMachines.takeLocalActivityRequests().isEmpty(),
1✔
352
        "[BUG] Local activities requests from the last event loop were not drained "
353
            + "and accounted in the outstanding local activities counter");
354
  }
1✔
355

356
  @VisibleForTesting
357
  WorkflowStateMachines getWorkflowStateMachines() {
358
    return workflowStateMachines;
1✔
359
  }
360

361
  private class StatesMachinesCallbackImpl implements StatesMachinesCallback {
1✔
362

363
    @Override
364
    public void start(HistoryEvent startWorkflowEvent) {
365
      replayWorkflowExecutor.start(startWorkflowEvent);
1✔
366
    }
1✔
367

368
    @Override
369
    public void eventLoop() {
370
      replayWorkflowExecutor.eventLoop();
1✔
371
    }
1✔
372

373
    @Override
374
    public void signal(HistoryEvent signalEvent) {
375
      replayWorkflowExecutor.handleWorkflowExecutionSignaled(signalEvent);
1✔
376
    }
1✔
377

378
    @Override
379
    public void update(UpdateMessage message) {
380
      replayWorkflowExecutor.handleWorkflowExecutionUpdated(message);
×
381
    }
×
382

383
    @Override
384
    public void cancel(HistoryEvent cancelEvent) {
385
      replayWorkflowExecutor.handleWorkflowExecutionCancelRequested(cancelEvent);
1✔
386
    }
1✔
387
  }
388

389
  private static class LocalActivityMeteringHelper {
1✔
390
    private final Map<String, AtomicInteger> firstWftActivities = new HashMap<>();
1✔
391
    private final Map<String, AtomicInteger> nonFirstWftActivities = new HashMap<>();
1✔
392
    private final Set<String> completed = new HashSet<>();
1✔
393

394
    private void newWFTStarting() {
395
      for (String activityId : firstWftActivities.keySet()) {
1✔
396
        AtomicInteger removed = firstWftActivities.remove(activityId);
1✔
397
        removed.set(0);
1✔
398
        nonFirstWftActivities.put(activityId, removed);
1✔
399
      }
1✔
400
    }
1✔
401

402
    private void addNewLocalActivity(ExecuteLocalActivityParameters params) {
403
      AtomicInteger attemptsDuringWFTCounter = new AtomicInteger(0);
1✔
404
      params.setOnNewAttemptCallback(attemptsDuringWFTCounter::incrementAndGet);
1✔
405
      firstWftActivities.put(params.getActivityId(), attemptsDuringWFTCounter);
1✔
406
    }
1✔
407

408
    private void markLocalActivityComplete(String activityId) {
409
      completed.add(activityId);
1✔
410
    }
1✔
411

412
    private int getNonfirstAttempts() {
413
      int result =
1✔
414
          nonFirstWftActivities.values().stream()
1✔
415
              .map(ai -> ai.getAndSet(0))
1✔
416
              .reduce(0, Integer::sum);
1✔
417
      for (String activityId : completed) {
1✔
418
        firstWftActivities.remove(activityId);
1✔
419
        nonFirstWftActivities.remove(activityId);
1✔
420
      }
1✔
421
      completed.clear();
1✔
422
      return result;
1✔
423
    }
424
  }
425
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc