• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #281

09 Jul 2024 07:21PM UTC coverage: 77.556% (-0.02%) from 77.577%
#281

push

github

web-flow
Make sure workflow_failed is incremented on NonDeterministicException (#2141)

Make sure workflow_failed is incremented on NonDeterministicException

2 of 3 new or added lines in 1 file covered. (66.67%)

5 existing lines in 1 file now uncovered.

19088 of 24612 relevant lines covered (77.56%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.97
/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.replay;
22

23
import static io.temporal.internal.common.ProtobufTimeUtils.toJavaDuration;
24
import static io.temporal.serviceclient.CheckedExceptionWrapper.wrap;
25

26
import com.google.common.annotations.VisibleForTesting;
27
import com.google.common.base.Preconditions;
28
import com.google.common.base.Throwables;
29
import com.google.protobuf.util.Durations;
30
import com.google.protobuf.util.Timestamps;
31
import com.uber.m3.tally.Scope;
32
import com.uber.m3.tally.Stopwatch;
33
import io.grpc.Deadline;
34
import io.temporal.api.command.v1.Command;
35
import io.temporal.api.common.v1.Payloads;
36
import io.temporal.api.enums.v1.QueryResultType;
37
import io.temporal.api.history.v1.HistoryEvent;
38
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
39
import io.temporal.api.protocol.v1.Message;
40
import io.temporal.api.query.v1.WorkflowQuery;
41
import io.temporal.api.query.v1.WorkflowQueryResult;
42
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
43
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
44
import io.temporal.internal.Config;
45
import io.temporal.internal.common.SdkFlag;
46
import io.temporal.internal.common.UpdateMessage;
47
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
48
import io.temporal.internal.statemachines.StatesMachinesCallback;
49
import io.temporal.internal.statemachines.WorkflowStateMachines;
50
import io.temporal.internal.worker.*;
51
import io.temporal.worker.MetricsType;
52
import io.temporal.worker.WorkflowImplementationOptions;
53
import io.temporal.workflow.Functions;
54
import java.time.Duration;
55
import java.util.*;
56
import java.util.concurrent.BlockingQueue;
57
import java.util.concurrent.LinkedBlockingDeque;
58
import java.util.concurrent.TimeUnit;
59
import java.util.concurrent.atomic.AtomicInteger;
60
import java.util.concurrent.locks.Lock;
61
import java.util.concurrent.locks.ReentrantLock;
62

63
/**
64
 * Implements workflow executor that relies on replay of a workflow code. An instance of this class
65
 * is created per cached workflow run.
66
 */
67
class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler {
68
  private final Scope metricsScope;
69

70
  private final WorkflowExecutionStartedEventAttributes startedEvent;
71

72
  private final Lock lock = new ReentrantLock();
1✔
73

74
  private final Functions.Proc1<LocalActivityResult> localActivityCompletionSink;
75

76
  private final BlockingQueue<LocalActivityResult> localActivityCompletionQueue =
1✔
77
      new LinkedBlockingDeque<>();
78

79
  private final LocalActivityDispatcher localActivityDispatcher;
80

81
  private final LocalActivityMeteringHelper localActivityMeteringHelper;
82

83
  private final ReplayWorkflow workflow;
84

85
  private final WorkflowStateMachines workflowStateMachines;
86

87
  /** Number of non completed local activity tasks */
88
  // TODO move and maintain this counter inside workflowStateMachines
89
  private int localActivityTaskCount;
90

91
  private final ReplayWorkflowContextImpl context;
92

93
  private final ReplayWorkflowExecutor replayWorkflowExecutor;
94

95
  private final GetSystemInfoResponse.Capabilities capabilities;
96

97
  ReplayWorkflowRunTaskHandler(
98
      String namespace,
99
      ReplayWorkflow workflow,
100
      PollWorkflowTaskQueueResponseOrBuilder workflowTask,
101
      SingleWorkerOptions workerOptions,
102
      Scope metricsScope,
103
      LocalActivityDispatcher localActivityDispatcher,
104
      GetSystemInfoResponse.Capabilities capabilities) {
1✔
105
    HistoryEvent startedEvent = workflowTask.getHistory().getEvents(0);
1✔
106
    if (!startedEvent.hasWorkflowExecutionStartedEventAttributes()) {
1✔
107
      throw new IllegalArgumentException(
1✔
108
          "First event in the history is not WorkflowExecutionStarted");
109
    }
110
    this.startedEvent = startedEvent.getWorkflowExecutionStartedEventAttributes();
1✔
111
    this.metricsScope = metricsScope;
1✔
112
    this.localActivityDispatcher = localActivityDispatcher;
1✔
113
    this.workflow = workflow;
1✔
114

115
    this.workflowStateMachines =
1✔
116
        new WorkflowStateMachines(new StatesMachinesCallbackImpl(), capabilities);
117
    String fullReplayDirectQueryType =
118
        workflowTask.hasQuery() ? workflowTask.getQuery().getQueryType() : null;
1✔
119
    this.context =
1✔
120
        new ReplayWorkflowContextImpl(
121
            workflowStateMachines,
122
            namespace,
123
            this.startedEvent,
124
            workflowTask.getWorkflowExecution(),
1✔
125
            Timestamps.toMillis(startedEvent.getEventTime()),
1✔
126
            fullReplayDirectQueryType,
127
            workerOptions,
128
            metricsScope);
129

130
    this.replayWorkflowExecutor =
1✔
131
        new ReplayWorkflowExecutor(workflow, workflowStateMachines, context);
132
    this.localActivityCompletionSink = localActivityCompletionQueue::add;
1✔
133
    this.localActivityMeteringHelper = new LocalActivityMeteringHelper();
1✔
134
    this.capabilities = capabilities;
1✔
135
  }
1✔
136

137
  @Override
138
  public WorkflowTaskResult handleWorkflowTask(
139
      PollWorkflowTaskQueueResponseOrBuilder workflowTask, WorkflowHistoryIterator historyIterator)
140
      throws Throwable {
141
    lock.lock();
1✔
142
    try {
143
      localActivityMeteringHelper.newWFTStarting();
1✔
144

145
      Deadline wftHearbeatDeadline =
1✔
146
          Deadline.after(
1✔
147
              (long)
148
                  (Durations.toNanos(startedEvent.getWorkflowTaskTimeout())
1✔
149
                      * Config.WORKFLOW_TAK_HEARTBEAT_COEFFICIENT),
150
              TimeUnit.NANOSECONDS);
151

152
      if (workflowTask.getPreviousStartedEventId()
1✔
153
          < workflowStateMachines.getLastWFTStartedEventId()) {
1✔
154
        // if previousStartedEventId < currentStartedEventId - the last workflow task handled by
155
        // these state machines is ahead of the last handled workflow task known by the server.
156
        // Something is off, the server lost progress.
157
        // If the fact that we error out here becomes undesirable, because we fail the workflow
158
        // task,
159
        // we always can rework it to graceful invalidation of the cache entity and a full replay
160
        // from the server
161
        throw new IllegalStateException(
×
162
            "Server history for the workflow is below the progress of the workflow on the worker, the progress needs to be discarded");
163
      }
164

165
      handleWorkflowTaskImpl(workflowTask, historyIterator);
1✔
166
      processLocalActivityRequests(wftHearbeatDeadline);
1✔
167
      List<Command> commands = workflowStateMachines.takeCommands();
1✔
168
      List<Message> messages = workflowStateMachines.takeMessages();
1✔
169
      EnumSet<SdkFlag> newFlags = workflowStateMachines.takeNewSdkFlags();
1✔
170
      List<Integer> newSdkFlags = new ArrayList<>(newFlags.size());
1✔
171
      for (SdkFlag flag : newFlags) {
1✔
172
        newSdkFlags.add(flag.getValue());
1✔
173
      }
1✔
174
      if (context.isWorkflowMethodCompleted()) {
1✔
175
        // it's important for query, otherwise the WorkflowTaskHandler is responsible for closing
176
        // and invalidation
177
        close();
1✔
178
      }
179
      if (context.getWorkflowTaskFailure() != null) {
1✔
180
        throw context.getWorkflowTaskFailure();
×
181
      }
182
      Map<String, WorkflowQueryResult> queryResults = executeQueries(workflowTask.getQueriesMap());
1✔
183
      return WorkflowTaskResult.newBuilder()
1✔
184
          .setCommands(commands)
1✔
185
          .setMessages(messages)
1✔
186
          .setQueryResults(queryResults)
1✔
187
          .setFinalCommand(context.isWorkflowMethodCompleted())
1✔
188
          .setForceWorkflowTask(localActivityTaskCount > 0 && !context.isWorkflowMethodCompleted())
1✔
189
          .setNonfirstLocalActivityAttempts(localActivityMeteringHelper.getNonfirstAttempts())
1✔
190
          .setSdkFlags(newSdkFlags)
1✔
191
          .build();
1✔
192
    } finally {
193
      lock.unlock();
1✔
194
    }
195
  }
196

197
  @Override
198
  public QueryResult handleDirectQueryWorkflowTask(
199
      PollWorkflowTaskQueueResponseOrBuilder workflowTask, WorkflowHistoryIterator historyIterator)
200
      throws Throwable {
201
    WorkflowQuery query = workflowTask.getQuery();
1✔
202
    lock.lock();
1✔
203
    try {
204
      handleWorkflowTaskImpl(workflowTask, historyIterator);
1✔
205
      if (context.isWorkflowMethodCompleted()) {
1✔
206
        // it's important for query, otherwise the WorkflowTaskHandler is responsible for closing
207
        // and invalidation
208
        close();
1✔
209
      }
210
      if (context.getWorkflowTaskFailure() != null) {
1✔
211
        throw context.getWorkflowTaskFailure();
×
212
      }
213
      Optional<Payloads> resultPayloads = replayWorkflowExecutor.query(query);
1✔
214
      return new QueryResult(resultPayloads, context.isWorkflowMethodCompleted());
1✔
215
    } finally {
216
      lock.unlock();
1✔
217
    }
218
  }
219

220
  @Override
221
  public void resetStartedEvenId(Long eventId) {
222
    workflowStateMachines.resetStartedEvenId(eventId);
1✔
223
  }
1✔
224

225
  private void handleWorkflowTaskImpl(
226
      PollWorkflowTaskQueueResponseOrBuilder workflowTask,
227
      WorkflowHistoryIterator historyIterator) {
228
    workflowStateMachines.setWorkflowStartedEventId(workflowTask.getStartedEventId());
1✔
229
    workflowStateMachines.setReplaying(workflowTask.getPreviousStartedEventId() > 0);
1✔
230
    workflowStateMachines.setMessages(workflowTask.getMessagesList());
1✔
231
    applyServerHistory(workflowTask.getStartedEventId(), historyIterator);
1✔
232
  }
1✔
233

234
  private void applyServerHistory(long lastEventId, WorkflowHistoryIterator historyIterator) {
235
    Duration expiration = toJavaDuration(startedEvent.getWorkflowTaskTimeout());
1✔
236
    historyIterator.initDeadline(Deadline.after(expiration.toMillis(), TimeUnit.MILLISECONDS));
1✔
237

238
    boolean timerStopped = false;
1✔
239
    Stopwatch sw = metricsScope.timer(MetricsType.WORKFLOW_TASK_REPLAY_LATENCY).start();
1✔
240
    long currentEventId = 0;
1✔
241
    try {
242
      while (historyIterator.hasNext()) {
1✔
243
        // iteration itself is intentionally left outside the try-catch below,
244
        // as gRPC exception happened during history iteration should never ever fail the workflow
245
        HistoryEvent event = historyIterator.next();
1✔
246
        currentEventId = event.getEventId();
1✔
247
        boolean hasNext = historyIterator.hasNext();
1✔
248
        try {
249
          workflowStateMachines.handleEvent(event, hasNext);
1✔
250
        } catch (Throwable e) {
1✔
251
          // Fail workflow if exception is of the specified type
252
          WorkflowImplementationOptions implementationOptions =
1✔
253
              workflow.getWorkflowContext().getWorkflowImplementationOptions();
1✔
254
          Class<? extends Throwable>[] failTypes =
1✔
255
              implementationOptions.getFailWorkflowExceptionTypes();
1✔
256
          for (Class<? extends Throwable> failType : failTypes) {
1✔
257
            if (failType.isAssignableFrom(e.getClass())) {
1✔
258
              metricsScope.counter(MetricsType.WORKFLOW_FAILED_COUNTER).inc(1);
1✔
259
              throw new WorkflowExecutionException(
1✔
260
                  workflow.getWorkflowContext().mapWorkflowExceptionToFailure(e));
1✔
261
            }
262
          }
263
          if (e instanceof WorkflowExecutionException) {
1✔
NEW
264
            metricsScope.counter(MetricsType.WORKFLOW_FAILED_COUNTER).inc(1);
×
265
          }
266
          throw wrap(e);
1✔
267
        }
1✔
268
        if (!timerStopped && !workflowStateMachines.isReplaying()) {
1✔
269
          sw.stop();
1✔
270
          timerStopped = true;
1✔
271
        }
272
      }
1✔
273
      verifyAllEventsProcessed(lastEventId, currentEventId);
1✔
274
    } finally {
275
      if (!timerStopped) {
1✔
276
        sw.stop();
1✔
277
      }
278
    }
279
  }
1✔
280

281
  // Verify the received and processed all events up to the last one we knew about from the polled
282
  // task.
283
  // It is possible for the server to send fewer events than required if we are reading history from
284
  // a stale node.
285
  private void verifyAllEventsProcessed(long lastEventId, long processedEventId) {
286
    if (lastEventId != Long.MAX_VALUE && lastEventId > 0 && processedEventId < lastEventId) {
1✔
287
      throw new IllegalStateException(
1✔
288
          String.format(
1✔
289
              "Premature end of stream, expectedLastEventID=%d but no more events after eventID=%d",
290
              lastEventId, processedEventId));
1✔
291
    }
292
  }
1✔
293

294
  private Map<String, WorkflowQueryResult> executeQueries(Map<String, WorkflowQuery> queries) {
295
    Map<String, WorkflowQueryResult> queryResults = new HashMap<>();
1✔
296
    for (Map.Entry<String, WorkflowQuery> entry : queries.entrySet()) {
1✔
297
      WorkflowQuery query = entry.getValue();
1✔
298
      try {
299
        Optional<Payloads> queryResult = replayWorkflowExecutor.query(query);
1✔
300
        WorkflowQueryResult.Builder result =
301
            WorkflowQueryResult.newBuilder()
1✔
302
                .setResultType(QueryResultType.QUERY_RESULT_TYPE_ANSWERED);
1✔
303
        if (queryResult.isPresent()) {
1✔
304
          result.setAnswer(queryResult.get());
1✔
305
        }
306
        queryResults.put(entry.getKey(), result.build());
1✔
307
      } catch (Exception e) {
1✔
308
        String stackTrace = Throwables.getStackTraceAsString(e);
1✔
309
        queryResults.put(
1✔
310
            entry.getKey(),
1✔
311
            WorkflowQueryResult.newBuilder()
1✔
312
                .setResultType(QueryResultType.QUERY_RESULT_TYPE_FAILED)
1✔
313
                .setErrorMessage(e + "\n" + stackTrace)
1✔
314
                .build());
1✔
315
      }
1✔
316
    }
1✔
317
    return queryResults;
1✔
318
  }
319

320
  @Override
321
  public void close() {
322
    lock.lock();
1✔
323
    try {
324
      replayWorkflowExecutor.close();
1✔
325
    } finally {
326
      lock.unlock();
1✔
327
    }
328
  }
1✔
329

330
  private void processLocalActivityRequests(Deadline wftHeartbeatDeadline)
331
      throws InterruptedException, Throwable {
332

333
    while (true) {
334
      // Scheduling or handling any local activities after the workflow method has returned
335
      // can result in commands being generated after the CompleteWorkflowExecution command
336
      // which the server does not allow.
337
      if (context.isWorkflowMethodCompleted()) {
1✔
338
        break;
1✔
339
      }
340
      List<ExecuteLocalActivityParameters> laRequests =
1✔
341
          workflowStateMachines.takeLocalActivityRequests();
1✔
342
      localActivityTaskCount += laRequests.size();
1✔
343

344
      for (ExecuteLocalActivityParameters laRequest : laRequests) {
1✔
345
        boolean accepted =
1✔
346
            localActivityDispatcher.dispatch(
1✔
347
                laRequest, localActivityCompletionSink, wftHeartbeatDeadline);
348
        // TODO do we have to fail? if we didn't fit in a potentially tight timeout left until
349
        // wftHeartbeatDeadline,
350
        //  maybe we can return control, heartbeat and try again with fresh timeout one more time?
351
        Preconditions.checkState(
1✔
352
            accepted,
353
            "Unable to schedule local activity for execution, "
354
                + "no more slots available and local activity task queue is full");
355

356
        localActivityMeteringHelper.addNewLocalActivity(laRequest);
1✔
357
      }
1✔
358

359
      if (localActivityTaskCount == 0) {
1✔
360
        // No outstanding local activity requests
361
        break;
1✔
362
      }
363

364
      long maxWaitTimeTillHeartbeatNs = wftHeartbeatDeadline.timeRemaining(TimeUnit.NANOSECONDS);
1✔
365
      LocalActivityResult laCompletion =
1✔
366
          localActivityCompletionQueue.poll(maxWaitTimeTillHeartbeatNs, TimeUnit.NANOSECONDS);
1✔
367
      if (laCompletion == null) {
1✔
368
        // Need to force a new task as we are out of time
369
        break;
1✔
370
      }
371

372
      localActivityTaskCount--;
1✔
373
      localActivityMeteringHelper.markLocalActivityComplete(laCompletion.getActivityId());
1✔
374

375
      if (laCompletion.getProcessingError() != null) {
1✔
376
        throw laCompletion.getProcessingError().getThrowable();
1✔
377
      }
378

379
      workflowStateMachines.handleLocalActivityCompletion(laCompletion);
1✔
380
      // handleLocalActivityCompletion triggers eventLoop.
381
      // After this call, there may be new local activity requests available in
382
      // workflowStateMachines.takeLocalActivityRequests()
383
      // These requests need to be processed and accounted for, otherwise we may end up not
384
      // heartbeating and completing workflow task instead. So we have to make another iteration.
385
    }
1✔
386

387
    // it's safe to call and discard the result of takeLocalActivityRequests() here, because if it's
388
    // not empty - we are in trouble anyway
389
    Preconditions.checkState(
1✔
390
        workflowStateMachines.takeLocalActivityRequests().isEmpty()
1✔
391
            || context.isWorkflowMethodCompleted(),
1✔
392
        "[BUG] Local activities requests from the last event loop were not drained "
393
            + "and accounted in the outstanding local activities counter");
394
  }
1✔
395

396
  @VisibleForTesting
397
  WorkflowStateMachines getWorkflowStateMachines() {
398
    return workflowStateMachines;
1✔
399
  }
400

401
  private class StatesMachinesCallbackImpl implements StatesMachinesCallback {
1✔
402

403
    @Override
404
    public void start(HistoryEvent startWorkflowEvent) {
405
      replayWorkflowExecutor.start(startWorkflowEvent);
1✔
406
    }
1✔
407

408
    @Override
409
    public void eventLoop() {
410
      replayWorkflowExecutor.eventLoop();
1✔
411
    }
1✔
412

413
    @Override
414
    public void signal(HistoryEvent signalEvent) {
415
      replayWorkflowExecutor.handleWorkflowExecutionSignaled(signalEvent);
1✔
416
    }
1✔
417

418
    @Override
419
    public void update(UpdateMessage message) {
420
      replayWorkflowExecutor.handleWorkflowExecutionUpdated(message);
1✔
421
    }
1✔
422

423
    @Override
424
    public void cancel(HistoryEvent cancelEvent) {
425
      replayWorkflowExecutor.handleWorkflowExecutionCancelRequested(cancelEvent);
1✔
426
    }
1✔
427
  }
428

429
  @VisibleForTesting
430
  static class LocalActivityMeteringHelper {
1✔
431
    private final Map<String, AtomicInteger> firstWftActivities = new HashMap<>();
1✔
432
    private final Map<String, AtomicInteger> nonFirstWftActivities = new HashMap<>();
1✔
433
    private final Set<String> completed = new HashSet<>();
1✔
434

435
    void newWFTStarting() {
436
      for (String activityId : firstWftActivities.keySet()) {
1✔
437
        AtomicInteger attemptCount = firstWftActivities.get(activityId);
1✔
438
        attemptCount.set(0);
1✔
439
        nonFirstWftActivities.put(activityId, attemptCount);
1✔
440
      }
1✔
441
      firstWftActivities.clear();
1✔
442
    }
1✔
443

444
    void addNewLocalActivity(ExecuteLocalActivityParameters params) {
445
      AtomicInteger attemptsDuringWFTCounter = new AtomicInteger(0);
1✔
446
      params.setOnNewAttemptCallback(attemptsDuringWFTCounter::incrementAndGet);
1✔
447
      firstWftActivities.put(params.getActivityId(), attemptsDuringWFTCounter);
1✔
448
    }
1✔
449

450
    void markLocalActivityComplete(String activityId) {
451
      completed.add(activityId);
1✔
452
    }
1✔
453

454
    int getNonfirstAttempts() {
455
      int result =
1✔
456
          nonFirstWftActivities.values().stream()
1✔
457
              .map(ai -> ai.getAndSet(0))
1✔
458
              .reduce(0, Integer::sum);
1✔
459
      for (String activityId : completed) {
1✔
460
        firstWftActivities.remove(activityId);
1✔
461
        nonFirstWftActivities.remove(activityId);
1✔
462
      }
1✔
463
      completed.clear();
1✔
464
      return result;
1✔
465
    }
466
  }
467
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc