• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #181

pending completion
#181

push

github-actions

web-flow
Properly wrap exceptions from schedule client (#1827)

Wrap schedule exception

37 of 37 new or added lines in 1 file covered. (100.0%)

18557 of 23894 relevant lines covered (77.66%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.25
/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.replay;
22

23
import static io.temporal.internal.common.ProtobufTimeUtils.toJavaDuration;
24
import static io.temporal.serviceclient.CheckedExceptionWrapper.wrap;
25

26
import com.google.common.annotations.VisibleForTesting;
27
import com.google.common.base.Preconditions;
28
import com.google.common.base.Throwables;
29
import com.google.protobuf.util.Durations;
30
import com.google.protobuf.util.Timestamps;
31
import com.uber.m3.tally.Scope;
32
import com.uber.m3.tally.Stopwatch;
33
import io.grpc.Deadline;
34
import io.temporal.api.command.v1.Command;
35
import io.temporal.api.common.v1.Payloads;
36
import io.temporal.api.enums.v1.QueryResultType;
37
import io.temporal.api.history.v1.HistoryEvent;
38
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
39
import io.temporal.api.protocol.v1.Message;
40
import io.temporal.api.query.v1.WorkflowQuery;
41
import io.temporal.api.query.v1.WorkflowQueryResult;
42
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
43
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
44
import io.temporal.internal.Config;
45
import io.temporal.internal.common.SdkFlag;
46
import io.temporal.internal.common.UpdateMessage;
47
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
48
import io.temporal.internal.statemachines.StatesMachinesCallback;
49
import io.temporal.internal.statemachines.WorkflowStateMachines;
50
import io.temporal.internal.worker.*;
51
import io.temporal.worker.MetricsType;
52
import io.temporal.worker.WorkflowImplementationOptions;
53
import io.temporal.workflow.Functions;
54
import java.time.Duration;
55
import java.util.*;
56
import java.util.concurrent.BlockingQueue;
57
import java.util.concurrent.LinkedBlockingDeque;
58
import java.util.concurrent.TimeUnit;
59
import java.util.concurrent.atomic.AtomicInteger;
60
import java.util.concurrent.locks.Lock;
61
import java.util.concurrent.locks.ReentrantLock;
62

63
/**
64
 * Implements workflow executor that relies on replay of a workflow code. An instance of this class
65
 * is created per cached workflow run.
66
 */
67
class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler {
68
  private final Scope metricsScope;
69

70
  private final WorkflowExecutionStartedEventAttributes startedEvent;
71

72
  private final Lock lock = new ReentrantLock();
1✔
73

74
  private final Functions.Proc1<LocalActivityResult> localActivityCompletionSink;
75

76
  private final BlockingQueue<LocalActivityResult> localActivityCompletionQueue =
1✔
77
      new LinkedBlockingDeque<>();
78

79
  private final LocalActivityDispatcher localActivityDispatcher;
80

81
  private final LocalActivityMeteringHelper localActivityMeteringHelper;
82

83
  private final ReplayWorkflow workflow;
84

85
  private final WorkflowStateMachines workflowStateMachines;
86

87
  /** Number of non completed local activity tasks */
88
  // TODO move and maintain this counter inside workflowStateMachines
89
  private int localActivityTaskCount;
90

91
  private final ReplayWorkflowContextImpl context;
92

93
  private final ReplayWorkflowExecutor replayWorkflowExecutor;
94

95
  private final GetSystemInfoResponse.Capabilities capabilities;
96

97
  ReplayWorkflowRunTaskHandler(
98
      String namespace,
99
      ReplayWorkflow workflow,
100
      PollWorkflowTaskQueueResponseOrBuilder workflowTask,
101
      SingleWorkerOptions workerOptions,
102
      Scope metricsScope,
103
      LocalActivityDispatcher localActivityDispatcher,
104
      GetSystemInfoResponse.Capabilities capabilities) {
1✔
105
    HistoryEvent startedEvent = workflowTask.getHistory().getEvents(0);
1✔
106
    if (!startedEvent.hasWorkflowExecutionStartedEventAttributes()) {
1✔
107
      throw new IllegalArgumentException(
1✔
108
          "First event in the history is not WorkflowExecutionStarted");
109
    }
110
    this.startedEvent = startedEvent.getWorkflowExecutionStartedEventAttributes();
1✔
111
    this.metricsScope = metricsScope;
1✔
112
    this.localActivityDispatcher = localActivityDispatcher;
1✔
113
    this.workflow = workflow;
1✔
114

115
    this.workflowStateMachines =
1✔
116
        new WorkflowStateMachines(new StatesMachinesCallbackImpl(), capabilities);
117
    String fullReplayDirectQueryType =
118
        workflowTask.hasQuery() ? workflowTask.getQuery().getQueryType() : null;
1✔
119
    this.context =
1✔
120
        new ReplayWorkflowContextImpl(
121
            workflowStateMachines,
122
            namespace,
123
            this.startedEvent,
124
            workflowTask.getWorkflowExecution(),
1✔
125
            Timestamps.toMillis(startedEvent.getEventTime()),
1✔
126
            fullReplayDirectQueryType,
127
            workerOptions,
128
            metricsScope);
129

130
    this.replayWorkflowExecutor =
1✔
131
        new ReplayWorkflowExecutor(workflow, workflowStateMachines, context);
132
    this.localActivityCompletionSink = localActivityCompletionQueue::add;
1✔
133
    this.localActivityMeteringHelper = new LocalActivityMeteringHelper();
1✔
134
    this.capabilities = capabilities;
1✔
135
  }
1✔
136

137
  @Override
138
  public WorkflowTaskResult handleWorkflowTask(
139
      PollWorkflowTaskQueueResponseOrBuilder workflowTask, WorkflowHistoryIterator historyIterator)
140
      throws Throwable {
141
    lock.lock();
1✔
142
    try {
143
      localActivityMeteringHelper.newWFTStarting();
1✔
144

145
      Deadline wftHearbeatDeadline =
1✔
146
          Deadline.after(
1✔
147
              (long)
148
                  (Durations.toNanos(startedEvent.getWorkflowTaskTimeout())
1✔
149
                      * Config.WORKFLOW_TAK_HEARTBEAT_COEFFICIENT),
150
              TimeUnit.NANOSECONDS);
151

152
      if (workflowTask.getPreviousStartedEventId()
1✔
153
          < workflowStateMachines.getCurrentStartedEventId()) {
1✔
154
        // if previousStartedEventId < currentStartedEventId - the last workflow task handled by
155
        // these state machines is ahead of the last handled workflow task known by the server.
156
        // Something is off, the server lost progress.
157
        // If the fact that we error out here becomes undesirable, because we fail the workflow
158
        // task,
159
        // we always can rework it to graceful invalidation of the cache entity and a full replay
160
        // from the server
161
        throw new IllegalStateException(
×
162
            "Server history for the workflow is below the progress of the workflow on the worker, the progress needs to be discarded");
163
      }
164

165
      handleWorkflowTaskImpl(workflowTask, historyIterator);
1✔
166
      processLocalActivityRequests(wftHearbeatDeadline);
1✔
167
      List<Command> commands = workflowStateMachines.takeCommands();
1✔
168
      List<Message> messages = workflowStateMachines.takeMessages();
1✔
169
      EnumSet<SdkFlag> newFlags = workflowStateMachines.takeNewSdkFlags();
1✔
170
      List<Integer> newSdkFlags = new ArrayList<>(newFlags.size());
1✔
171
      for (SdkFlag flag : newFlags) {
1✔
172
        newSdkFlags.add(flag.getValue());
1✔
173
      }
1✔
174
      if (context.isWorkflowMethodCompleted()) {
1✔
175
        // it's important for query, otherwise the WorkflowTaskHandler is responsible for closing
176
        // and invalidation
177
        close();
1✔
178
      }
179
      if (context.getWorkflowTaskFailure() != null) {
1✔
180
        throw context.getWorkflowTaskFailure();
×
181
      }
182
      Map<String, WorkflowQueryResult> queryResults = executeQueries(workflowTask.getQueriesMap());
1✔
183
      return WorkflowTaskResult.newBuilder()
1✔
184
          .setCommands(commands)
1✔
185
          .setMessages(messages)
1✔
186
          .setQueryResults(queryResults)
1✔
187
          .setFinalCommand(context.isWorkflowMethodCompleted())
1✔
188
          .setForceWorkflowTask(localActivityTaskCount > 0 && !context.isWorkflowMethodCompleted())
1✔
189
          .setNonfirstLocalActivityAttempts(localActivityMeteringHelper.getNonfirstAttempts())
1✔
190
          .setSdkFlags(newSdkFlags)
1✔
191
          .build();
1✔
192
    } finally {
193
      lock.unlock();
1✔
194
    }
195
  }
196

197
  @Override
198
  public QueryResult handleDirectQueryWorkflowTask(
199
      PollWorkflowTaskQueueResponseOrBuilder workflowTask, WorkflowHistoryIterator historyIterator)
200
      throws Throwable {
201
    WorkflowQuery query = workflowTask.getQuery();
1✔
202
    lock.lock();
1✔
203
    try {
204
      handleWorkflowTaskImpl(workflowTask, historyIterator);
1✔
205
      if (context.isWorkflowMethodCompleted()) {
1✔
206
        // it's important for query, otherwise the WorkflowTaskHandler is responsible for closing
207
        // and invalidation
208
        close();
1✔
209
      }
210
      if (context.getWorkflowTaskFailure() != null) {
1✔
211
        throw context.getWorkflowTaskFailure();
×
212
      }
213
      Optional<Payloads> resultPayloads = replayWorkflowExecutor.query(query);
1✔
214
      return new QueryResult(resultPayloads, context.isWorkflowMethodCompleted());
1✔
215
    } finally {
216
      lock.unlock();
1✔
217
    }
218
  }
219

220
  @Override
221
  public void setCurrentStartedEvenId(Long eventId) {
222
    workflowStateMachines.setCurrentStartedEventId(eventId);
×
223
  }
×
224

225
  private void handleWorkflowTaskImpl(
226
      PollWorkflowTaskQueueResponseOrBuilder workflowTask,
227
      WorkflowHistoryIterator historyIterator) {
228
    workflowStateMachines.setWorklfowStartedEventId(workflowTask.getStartedEventId());
1✔
229
    workflowStateMachines.setReplaying(workflowTask.getPreviousStartedEventId() > 0);
1✔
230
    workflowStateMachines.setMessages(workflowTask.getMessagesList());
1✔
231
    applyServerHistory(historyIterator);
1✔
232
  }
1✔
233

234
  private void applyServerHistory(WorkflowHistoryIterator historyIterator) {
235
    Duration expiration = toJavaDuration(startedEvent.getWorkflowTaskTimeout());
1✔
236
    historyIterator.initDeadline(Deadline.after(expiration.toMillis(), TimeUnit.MILLISECONDS));
1✔
237

238
    boolean timerStopped = false;
1✔
239
    Stopwatch sw = metricsScope.timer(MetricsType.WORKFLOW_TASK_REPLAY_LATENCY).start();
1✔
240
    try {
241
      while (historyIterator.hasNext()) {
1✔
242
        // iteration itself is intentionally left outside the try-catch below,
243
        // as gRPC exception happened during history iteration should never ever fail the workflow
244
        HistoryEvent event = historyIterator.next();
1✔
245
        boolean hasNext = historyIterator.hasNext();
1✔
246
        try {
247
          workflowStateMachines.handleEvent(event, hasNext);
1✔
248
        } catch (Throwable e) {
1✔
249
          // Fail workflow if exception is of the specified type
250
          WorkflowImplementationOptions implementationOptions =
1✔
251
              workflow.getWorkflowContext().getWorkflowImplementationOptions();
1✔
252
          Class<? extends Throwable>[] failTypes =
1✔
253
              implementationOptions.getFailWorkflowExceptionTypes();
1✔
254
          for (Class<? extends Throwable> failType : failTypes) {
1✔
255
            if (failType.isAssignableFrom(e.getClass())) {
1✔
256
              throw new WorkflowExecutionException(
1✔
257
                  workflow.getWorkflowContext().mapWorkflowExceptionToFailure(e));
1✔
258
            }
259
          }
260
          throw wrap(e);
1✔
261
        }
1✔
262
        if (!timerStopped && !workflowStateMachines.isReplaying()) {
1✔
263
          sw.stop();
1✔
264
          timerStopped = true;
1✔
265
        }
266
      }
1✔
267
    } finally {
268
      if (!timerStopped) {
1✔
269
        sw.stop();
1✔
270
      }
271
    }
272
  }
1✔
273

274
  private Map<String, WorkflowQueryResult> executeQueries(Map<String, WorkflowQuery> queries) {
275
    Map<String, WorkflowQueryResult> queryResults = new HashMap<>();
1✔
276
    for (Map.Entry<String, WorkflowQuery> entry : queries.entrySet()) {
1✔
277
      WorkflowQuery query = entry.getValue();
1✔
278
      try {
279
        Optional<Payloads> queryResult = replayWorkflowExecutor.query(query);
1✔
280
        WorkflowQueryResult.Builder result =
281
            WorkflowQueryResult.newBuilder()
1✔
282
                .setResultType(QueryResultType.QUERY_RESULT_TYPE_ANSWERED);
1✔
283
        if (queryResult.isPresent()) {
1✔
284
          result.setAnswer(queryResult.get());
1✔
285
        }
286
        queryResults.put(entry.getKey(), result.build());
1✔
287
      } catch (Exception e) {
1✔
288
        String stackTrace = Throwables.getStackTraceAsString(e);
1✔
289
        queryResults.put(
1✔
290
            entry.getKey(),
1✔
291
            WorkflowQueryResult.newBuilder()
1✔
292
                .setResultType(QueryResultType.QUERY_RESULT_TYPE_FAILED)
1✔
293
                .setErrorMessage(e + "\n" + stackTrace)
1✔
294
                .build());
1✔
295
      }
1✔
296
    }
1✔
297
    return queryResults;
1✔
298
  }
299

300
  @Override
301
  public void close() {
302
    lock.lock();
1✔
303
    try {
304
      replayWorkflowExecutor.close();
1✔
305
    } finally {
306
      lock.unlock();
1✔
307
    }
308
  }
1✔
309

310
  private void processLocalActivityRequests(Deadline wftHeartbeatDeadline)
311
      throws InterruptedException, Throwable {
312

313
    while (true) {
314
      List<ExecuteLocalActivityParameters> laRequests =
1✔
315
          workflowStateMachines.takeLocalActivityRequests();
1✔
316
      localActivityTaskCount += laRequests.size();
1✔
317

318
      for (ExecuteLocalActivityParameters laRequest : laRequests) {
1✔
319
        boolean accepted =
1✔
320
            localActivityDispatcher.dispatch(
1✔
321
                laRequest, localActivityCompletionSink, wftHeartbeatDeadline);
322
        // TODO do we have to fail? if we didn't fit in a potentially tight timeout left until
323
        // wftHeartbeatDeadline,
324
        //  maybe we can return control, heartbeat and try again with fresh timeout one more time?
325
        Preconditions.checkState(
1✔
326
            accepted,
327
            "Unable to schedule local activity for execution, "
328
                + "no more slots available and local activity task queue is full");
329

330
        localActivityMeteringHelper.addNewLocalActivity(laRequest);
1✔
331
      }
1✔
332

333
      if (localActivityTaskCount == 0) {
1✔
334
        // No outstanding local activity requests
335
        break;
1✔
336
      }
337

338
      long maxWaitTimeTillHeartbeatNs = wftHeartbeatDeadline.timeRemaining(TimeUnit.NANOSECONDS);
1✔
339
      LocalActivityResult laCompletion =
1✔
340
          localActivityCompletionQueue.poll(maxWaitTimeTillHeartbeatNs, TimeUnit.NANOSECONDS);
1✔
341
      if (laCompletion == null) {
1✔
342
        // Need to force a new task as we are out of time
343
        break;
1✔
344
      }
345

346
      localActivityTaskCount--;
1✔
347
      localActivityMeteringHelper.markLocalActivityComplete(laCompletion.getActivityId());
1✔
348

349
      if (laCompletion.getProcessingError() != null) {
1✔
350
        throw laCompletion.getProcessingError().getThrowable();
1✔
351
      }
352

353
      workflowStateMachines.handleLocalActivityCompletion(laCompletion);
1✔
354
      // handleLocalActivityCompletion triggers eventLoop.
355
      // After this call, there may be new local activity requests available in
356
      // workflowStateMachines.takeLocalActivityRequests()
357
      // These requests need to be processed and accounted for, otherwise we may end up not
358
      // heartbeating and completing workflow task instead. So we have to make another iteration.
359
    }
1✔
360

361
    // it's safe to call and discard the result of takeLocalActivityRequests() here, because if it's
362
    // not empty - we are in trouble anyway
363
    Preconditions.checkState(
1✔
364
        workflowStateMachines.takeLocalActivityRequests().isEmpty(),
1✔
365
        "[BUG] Local activities requests from the last event loop were not drained "
366
            + "and accounted in the outstanding local activities counter");
367
  }
1✔
368

369
  @VisibleForTesting
370
  WorkflowStateMachines getWorkflowStateMachines() {
371
    return workflowStateMachines;
1✔
372
  }
373

374
  private class StatesMachinesCallbackImpl implements StatesMachinesCallback {
1✔
375

376
    @Override
377
    public void start(HistoryEvent startWorkflowEvent) {
378
      replayWorkflowExecutor.start(startWorkflowEvent);
1✔
379
    }
1✔
380

381
    @Override
382
    public void eventLoop() {
383
      replayWorkflowExecutor.eventLoop();
1✔
384
    }
1✔
385

386
    @Override
387
    public void signal(HistoryEvent signalEvent) {
388
      replayWorkflowExecutor.handleWorkflowExecutionSignaled(signalEvent);
1✔
389
    }
1✔
390

391
    @Override
392
    public void update(UpdateMessage message) {
393
      replayWorkflowExecutor.handleWorkflowExecutionUpdated(message);
1✔
394
    }
1✔
395

396
    @Override
397
    public void cancel(HistoryEvent cancelEvent) {
398
      replayWorkflowExecutor.handleWorkflowExecutionCancelRequested(cancelEvent);
1✔
399
    }
1✔
400
  }
401

402
  private static class LocalActivityMeteringHelper {
1✔
403
    private final Map<String, AtomicInteger> firstWftActivities = new HashMap<>();
1✔
404
    private final Map<String, AtomicInteger> nonFirstWftActivities = new HashMap<>();
1✔
405
    private final Set<String> completed = new HashSet<>();
1✔
406

407
    private void newWFTStarting() {
408
      for (String activityId : firstWftActivities.keySet()) {
1✔
409
        AtomicInteger removed = firstWftActivities.remove(activityId);
1✔
410
        removed.set(0);
1✔
411
        nonFirstWftActivities.put(activityId, removed);
1✔
412
      }
1✔
413
    }
1✔
414

415
    private void addNewLocalActivity(ExecuteLocalActivityParameters params) {
416
      AtomicInteger attemptsDuringWFTCounter = new AtomicInteger(0);
1✔
417
      params.setOnNewAttemptCallback(attemptsDuringWFTCounter::incrementAndGet);
1✔
418
      firstWftActivities.put(params.getActivityId(), attemptsDuringWFTCounter);
1✔
419
    }
1✔
420

421
    private void markLocalActivityComplete(String activityId) {
422
      completed.add(activityId);
1✔
423
    }
1✔
424

425
    private int getNonfirstAttempts() {
426
      int result =
1✔
427
          nonFirstWftActivities.values().stream()
1✔
428
              .map(ai -> ai.getAndSet(0))
1✔
429
              .reduce(0, Integer::sum);
1✔
430
      for (String activityId : completed) {
1✔
431
        firstWftActivities.remove(activityId);
1✔
432
        nonFirstWftActivities.remove(activityId);
1✔
433
      }
1✔
434
      completed.clear();
1✔
435
      return result;
1✔
436
    }
437
  }
438
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc