• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #181

pending completion
#181

push

github-actions

web-flow
Properly wrap exceptions from schedule client (#1827)

Wrap schedule exception

37 of 37 new or added lines in 1 file covered. (100.0%)

18557 of 23894 relevant lines covered (77.66%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.74
/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.replay;
22

23
import static io.temporal.internal.common.WorkflowExecutionUtils.isFullHistory;
24
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
25

26
import com.uber.m3.tally.Scope;
27
import com.uber.m3.util.ImmutableMap;
28
import io.temporal.api.command.v1.Command;
29
import io.temporal.api.command.v1.FailWorkflowExecutionCommandAttributes;
30
import io.temporal.api.common.v1.MeteringMetadata;
31
import io.temporal.api.common.v1.WorkflowExecution;
32
import io.temporal.api.common.v1.WorkflowType;
33
import io.temporal.api.enums.v1.CommandType;
34
import io.temporal.api.enums.v1.QueryResultType;
35
import io.temporal.api.enums.v1.WorkflowTaskFailedCause;
36
import io.temporal.api.failure.v1.Failure;
37
import io.temporal.api.history.v1.HistoryEvent;
38
import io.temporal.api.query.v1.WorkflowQuery;
39
import io.temporal.api.sdk.v1.WorkflowTaskCompletedMetadata;
40
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
41
import io.temporal.api.taskqueue.v1.TaskQueue;
42
import io.temporal.api.workflowservice.v1.*;
43
import io.temporal.internal.common.ProtobufTimeUtils;
44
import io.temporal.internal.common.WorkflowExecutionUtils;
45
import io.temporal.internal.worker.*;
46
import io.temporal.serviceclient.MetricsTag;
47
import io.temporal.serviceclient.WorkflowServiceStubs;
48
import io.temporal.worker.NonDeterministicException;
49
import io.temporal.workflow.Functions;
50
import java.io.PrintWriter;
51
import java.io.StringWriter;
52
import java.time.Duration;
53
import java.util.List;
54
import java.util.Objects;
55
import java.util.concurrent.atomic.AtomicBoolean;
56
import java.util.stream.Collectors;
57
import org.slf4j.Logger;
58
import org.slf4j.LoggerFactory;
59

60
public final class ReplayWorkflowTaskHandler implements WorkflowTaskHandler {
61

62
  private static final Logger log = LoggerFactory.getLogger(ReplayWorkflowTaskHandler.class);
1✔
63

64
  private final ReplayWorkflowFactory workflowFactory;
65
  private final String namespace;
66
  private final WorkflowExecutorCache cache;
67
  private final SingleWorkerOptions options;
68
  private final Duration stickyTaskQueueScheduleToStartTimeout;
69
  private final WorkflowServiceStubs service;
70
  private final TaskQueue stickyTaskQueue;
71
  private final LocalActivityDispatcher localActivityDispatcher;
72

73
  public ReplayWorkflowTaskHandler(
74
      String namespace,
75
      ReplayWorkflowFactory asyncWorkflowFactory,
76
      WorkflowExecutorCache cache,
77
      SingleWorkerOptions options,
78
      TaskQueue stickyTaskQueue,
79
      Duration stickyTaskQueueScheduleToStartTimeout,
80
      WorkflowServiceStubs service,
81
      LocalActivityDispatcher localActivityDispatcher) {
1✔
82
    this.namespace = namespace;
1✔
83
    this.workflowFactory = asyncWorkflowFactory;
1✔
84
    this.cache = cache;
1✔
85
    this.options = options;
1✔
86
    this.stickyTaskQueue = stickyTaskQueue;
1✔
87
    this.stickyTaskQueueScheduleToStartTimeout = stickyTaskQueueScheduleToStartTimeout;
1✔
88
    this.service = Objects.requireNonNull(service);
1✔
89
    this.localActivityDispatcher = localActivityDispatcher;
1✔
90
  }
1✔
91

92
  @Override
93
  public WorkflowTaskHandler.Result handleWorkflowTask(PollWorkflowTaskQueueResponse workflowTask)
94
      throws Exception {
95
    String workflowType = workflowTask.getWorkflowType().getName();
1✔
96
    Scope metricsScope =
1✔
97
        options.getMetricsScope().tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, workflowType));
1✔
98
    return handleWorkflowTaskWithQuery(workflowTask.toBuilder(), metricsScope);
1✔
99
  }
100

101
  private Result handleWorkflowTaskWithQuery(
102
      PollWorkflowTaskQueueResponse.Builder workflowTask, Scope metricsScope) throws Exception {
103
    boolean directQuery = workflowTask.hasQuery();
1✔
104
    AtomicBoolean createdNew = new AtomicBoolean();
1✔
105
    WorkflowExecution execution = workflowTask.getWorkflowExecution();
1✔
106
    WorkflowRunTaskHandler workflowRunTaskHandler = null;
1✔
107
    boolean useCache = stickyTaskQueue != null;
1✔
108

109
    try {
110
      workflowRunTaskHandler =
1✔
111
          getOrCreateWorkflowExecutor(useCache, workflowTask, metricsScope, createdNew);
1✔
112
      logWorkflowTaskToBeProcessed(workflowTask, createdNew);
1✔
113

114
      ServiceWorkflowHistoryIterator historyIterator =
1✔
115
          new ServiceWorkflowHistoryIterator(service, namespace, workflowTask, metricsScope);
116
      boolean finalCommand;
117
      Result result;
118

119
      if (directQuery) {
1✔
120
        // Direct query happens when there is no reason (events) to produce a real persisted
121
        // workflow task.
122
        // But Server needs to notify the workflow about the query and get back the query result.
123
        // Server creates a fake non-persisted a PollWorkflowTaskResponse with just the query.
124
        // This WFT has no new events in the history to process
125
        // and the worker response on such a WFT can't contain any new commands either.
126
        QueryResult queryResult =
1✔
127
            workflowRunTaskHandler.handleDirectQueryWorkflowTask(workflowTask, historyIterator);
1✔
128
        finalCommand = queryResult.isWorkflowMethodCompleted();
1✔
129
        result = createDirectQueryResult(workflowTask, queryResult, null);
1✔
130
      } else {
1✔
131
        // main code path, handle workflow task that can have an embedded query
132
        WorkflowTaskResult wftResult =
1✔
133
            workflowRunTaskHandler.handleWorkflowTask(workflowTask, historyIterator);
1✔
134
        finalCommand = wftResult.isFinalCommand();
1✔
135
        result =
1✔
136
            createCompletedWFTRequest(
1✔
137
                workflowTask.getWorkflowType().getName(),
1✔
138
                workflowTask,
139
                wftResult,
140
                workflowRunTaskHandler::setCurrentStartedEvenId);
1✔
141
      }
142

143
      if (useCache) {
1✔
144
        if (finalCommand) {
1✔
145
          // don't invalidate execution from the cache if we were not using cached value here
146
          cache.invalidate(execution, metricsScope, "FinalCommand", null);
1✔
147
        } else if (createdNew.get()) {
1✔
148
          cache.addToCache(execution, workflowRunTaskHandler);
1✔
149
        }
150
      }
151

152
      return result;
1✔
153
    } catch (InterruptedException e) {
1✔
154
      throw e;
1✔
155
    } catch (Throwable e) {
1✔
156
      // Note here that the executor might not be in the cache, even when the caching is on. In that
157
      // case we need to close the executor explicitly. For items in the cache, invalidation
158
      // callback will try to close again, which should be ok.
159
      if (workflowRunTaskHandler != null) {
1✔
160
        workflowRunTaskHandler.close();
1✔
161
      }
162

163
      if (useCache) {
1✔
164
        cache.invalidate(execution, metricsScope, "Exception", e);
1✔
165
        // If history is full and exception occurred then sticky session hasn't been established
166
        // yet, and we can avoid doing a reset.
167
        if (!isFullHistory(workflowTask)) {
1✔
168
          resetStickyTaskQueue(execution);
1✔
169
        }
170
      }
171

172
      if (directQuery) {
1✔
173
        return createDirectQueryResult(workflowTask, null, e);
1✔
174
      } else {
175
        // this call rethrows an exception in some scenarios
176
        return failureToWFTResult(workflowTask, e);
1✔
177
      }
178
    } finally {
179
      if (!useCache && workflowRunTaskHandler != null) {
1✔
180
        // we close the execution in finally only if we don't use cache, otherwise it stays open
181
        workflowRunTaskHandler.close();
1✔
182
      }
183
    }
184
  }
185

186
  private Result createCompletedWFTRequest(
187
      String workflowType,
188
      PollWorkflowTaskQueueResponseOrBuilder workflowTask,
189
      WorkflowTaskResult result,
190
      Functions.Proc1<Long> eventIdSetHandle) {
191
    WorkflowExecution execution = workflowTask.getWorkflowExecution();
1✔
192
    if (log.isTraceEnabled()) {
1✔
193
      log.trace(
×
194
          "WorkflowTask startedEventId="
195
              + workflowTask.getStartedEventId()
×
196
              + ", WorkflowId="
197
              + execution.getWorkflowId()
×
198
              + ", RunId="
199
              + execution.getRunId()
×
200
              + " completed with \n"
201
              + WorkflowExecutionUtils.prettyPrintCommands(result.getCommands()));
×
202
    } else if (log.isDebugEnabled()) {
1✔
203
      log.debug(
×
204
          "WorkflowTask startedEventId="
205
              + workflowTask.getStartedEventId()
×
206
              + ", WorkflowId="
207
              + execution.getWorkflowId()
×
208
              + ", RunId="
209
              + execution.getRunId()
×
210
              + " completed with "
211
              + result.getCommands().size()
×
212
              + " new commands");
213
    }
214
    RespondWorkflowTaskCompletedRequest.Builder completedRequest =
215
        RespondWorkflowTaskCompletedRequest.newBuilder()
1✔
216
            .setTaskToken(workflowTask.getTaskToken())
1✔
217
            .addAllCommands(result.getCommands())
1✔
218
            .addAllMessages(result.getMessages())
1✔
219
            .putAllQueryResults(result.getQueryResults())
1✔
220
            .setForceCreateNewWorkflowTask(result.isForceWorkflowTask())
1✔
221
            .setMeteringMetadata(
1✔
222
                MeteringMetadata.newBuilder()
1✔
223
                    .setNonfirstLocalActivityExecutionAttempts(
1✔
224
                        result.getNonfirstLocalActivityAttempts())
1✔
225
                    .build())
1✔
226
            .setReturnNewWorkflowTask(result.isForceWorkflowTask());
1✔
227

228
    if (stickyTaskQueue != null
1✔
229
        && (stickyTaskQueueScheduleToStartTimeout == null
230
            || !stickyTaskQueueScheduleToStartTimeout.isZero())) {
1✔
231
      StickyExecutionAttributes.Builder attributes =
232
          StickyExecutionAttributes.newBuilder().setWorkerTaskQueue(stickyTaskQueue);
1✔
233
      if (stickyTaskQueueScheduleToStartTimeout != null) {
1✔
234
        attributes.setScheduleToStartTimeout(
1✔
235
            ProtobufTimeUtils.toProtoDuration(stickyTaskQueueScheduleToStartTimeout));
1✔
236
      }
237
      completedRequest.setStickyAttributes(attributes);
1✔
238
    }
239
    if (!result.getSdkFlags().isEmpty()) {
1✔
240
      completedRequest =
1✔
241
          completedRequest.setSdkMetadata(
1✔
242
              WorkflowTaskCompletedMetadata.newBuilder()
1✔
243
                  .addAllLangUsedFlags(result.getSdkFlags())
1✔
244
                  .build());
1✔
245
    }
246
    return new Result(
1✔
247
        workflowType,
248
        completedRequest.build(),
1✔
249
        null,
250
        null,
251
        null,
252
        result.isFinalCommand(),
1✔
253
        eventIdSetHandle);
254
  }
255

256
  private Result failureToWFTResult(
257
      PollWorkflowTaskQueueResponseOrBuilder workflowTask, Throwable e) throws Exception {
258
    String workflowType = workflowTask.getWorkflowType().getName();
1✔
259
    if (e instanceof WorkflowExecutionException) {
1✔
260
      RespondWorkflowTaskCompletedRequest response =
261
          RespondWorkflowTaskCompletedRequest.newBuilder()
1✔
262
              .setTaskToken(workflowTask.getTaskToken())
1✔
263
              .setIdentity(options.getIdentity())
1✔
264
              .setNamespace(namespace)
1✔
265
              // TODO: Set stamp or not based on capabilities
266
              .setBinaryChecksum(options.getBuildId())
1✔
267
              .addCommands(
1✔
268
                  Command.newBuilder()
1✔
269
                      .setCommandType(CommandType.COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION)
1✔
270
                      .setFailWorkflowExecutionCommandAttributes(
1✔
271
                          FailWorkflowExecutionCommandAttributes.newBuilder()
1✔
272
                              .setFailure(((WorkflowExecutionException) e).getFailure()))
1✔
273
                      .build())
1✔
274
              .build();
1✔
275
      return new WorkflowTaskHandler.Result(workflowType, response, null, null, null, false, null);
1✔
276
    }
277

278
    WorkflowExecution execution = workflowTask.getWorkflowExecution();
1✔
279
    log.warn(
1✔
280
        "Workflow task processing failure. startedEventId={}, WorkflowId={}, RunId={}. If seen continuously the workflow might be stuck.",
281
        workflowTask.getStartedEventId(),
1✔
282
        execution.getWorkflowId(),
1✔
283
        execution.getRunId(),
1✔
284
        e);
285

286
    // Only fail workflow task on the first attempt, subsequent failures of the same workflow task
287
    // should timeout. This is to avoid spin on the failed workflow task as the service doesn't
288
    // yet increase the retry interval.
289
    if (workflowTask.getAttempt() > 1) {
1✔
290
      /*
291
       * TODO we shouldn't swallow Error even if workflowTask.getAttempt() == 1.
292
       *  But leaving as it is for now, because a trivial change to rethrow
293
       *  will leave us without reporting Errors as WorkflowTaskFailure to the server,
294
       *  which we probably should at least attempt to do for visibility that the Error occurs.
295
       */
296
      if (e instanceof Error) {
1✔
297
        throw (Error) e;
1✔
298
      }
299
      throw (Exception) e;
1✔
300
    }
301

302
    Failure failure = options.getDataConverter().exceptionToFailure(e);
1✔
303
    RespondWorkflowTaskFailedRequest.Builder failedRequest =
304
        RespondWorkflowTaskFailedRequest.newBuilder()
1✔
305
            .setTaskToken(workflowTask.getTaskToken())
1✔
306
            .setFailure(failure);
1✔
307
    if (e instanceof NonDeterministicException) {
1✔
308
      failedRequest.setCause(
1✔
309
          WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR);
310
    }
311
    return new WorkflowTaskHandler.Result(
1✔
312
        workflowType, null, failedRequest.build(), null, null, false, null);
1✔
313
  }
314

315
  private Result createDirectQueryResult(
316
      PollWorkflowTaskQueueResponseOrBuilder workflowTask, QueryResult queryResult, Throwable e) {
317
    RespondQueryTaskCompletedRequest.Builder queryCompletedRequest =
318
        RespondQueryTaskCompletedRequest.newBuilder()
1✔
319
            .setTaskToken(workflowTask.getTaskToken())
1✔
320
            .setNamespace(namespace);
1✔
321

322
    if (e == null) {
1✔
323
      queryCompletedRequest.setCompletedType(QueryResultType.QUERY_RESULT_TYPE_ANSWERED);
1✔
324
      queryResult.getResponsePayloads().ifPresent(queryCompletedRequest::setQueryResult);
1✔
325
    } else {
326
      queryCompletedRequest.setCompletedType(QueryResultType.QUERY_RESULT_TYPE_FAILED);
1✔
327
      // TODO: Appropriate exception serialization.
328
      StringWriter sw = new StringWriter();
1✔
329
      PrintWriter pw = new PrintWriter(sw);
1✔
330
      e.printStackTrace(pw);
1✔
331

332
      queryCompletedRequest.setErrorMessage(sw.toString());
1✔
333
    }
334

335
    return new Result(
1✔
336
        workflowTask.getWorkflowType().getName(),
1✔
337
        null,
338
        null,
339
        queryCompletedRequest.build(),
1✔
340
        null,
341
        false,
342
        null);
343
  }
344

345
  @Override
346
  public boolean isAnyTypeSupported() {
347
    return workflowFactory.isAnyTypeSupported();
1✔
348
  }
349

350
  private WorkflowRunTaskHandler getOrCreateWorkflowExecutor(
351
      boolean useCache,
352
      PollWorkflowTaskQueueResponse.Builder workflowTask,
353
      Scope metricsScope,
354
      AtomicBoolean createdNew)
355
      throws Exception {
356
    if (useCache) {
1✔
357
      return cache.getOrCreate(
1✔
358
          workflowTask,
359
          metricsScope,
360
          () -> {
361
            createdNew.set(true);
1✔
362
            return createStatefulHandler(workflowTask, metricsScope);
1✔
363
          });
364
    } else {
365
      createdNew.set(true);
1✔
366
      return createStatefulHandler(workflowTask, metricsScope);
1✔
367
    }
368
  }
369

370
  // TODO(maxim): Consider refactoring that avoids mutating workflow task.
371
  private WorkflowRunTaskHandler createStatefulHandler(
372
      PollWorkflowTaskQueueResponse.Builder workflowTask, Scope metricsScope) throws Exception {
373
    WorkflowType workflowType = workflowTask.getWorkflowType();
1✔
374
    WorkflowExecution workflowExecution = workflowTask.getWorkflowExecution();
1✔
375
    List<HistoryEvent> events = workflowTask.getHistory().getEventsList();
1✔
376
    // Sticky workflow task with partial history.
377
    if (events.isEmpty() || events.get(0).getEventId() > 1) {
1✔
378
      GetWorkflowExecutionHistoryRequest getHistoryRequest =
379
          GetWorkflowExecutionHistoryRequest.newBuilder()
1✔
380
              .setNamespace(namespace)
1✔
381
              .setExecution(workflowTask.getWorkflowExecution())
1✔
382
              .build();
1✔
383
      GetWorkflowExecutionHistoryResponse getHistoryResponse =
1✔
384
          service
385
              .blockingStub()
1✔
386
              .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
387
              .getWorkflowExecutionHistory(getHistoryRequest);
1✔
388
      workflowTask
1✔
389
          .setHistory(getHistoryResponse.getHistory())
1✔
390
          .setNextPageToken(getHistoryResponse.getNextPageToken());
1✔
391
    }
392
    ReplayWorkflow workflow = workflowFactory.getWorkflow(workflowType, workflowExecution);
1✔
393
    return new ReplayWorkflowRunTaskHandler(
1✔
394
        namespace,
395
        workflow,
396
        workflowTask,
397
        options,
398
        metricsScope,
399
        localActivityDispatcher,
400
        service.getServerCapabilities().get());
1✔
401
  }
402

403
  private void resetStickyTaskQueue(WorkflowExecution execution) {
404
    service
1✔
405
        .futureStub()
1✔
406
        .resetStickyTaskQueue(
1✔
407
            ResetStickyTaskQueueRequest.newBuilder()
1✔
408
                .setNamespace(namespace)
1✔
409
                .setExecution(execution)
1✔
410
                .build());
1✔
411
  }
1✔
412

413
  private void logWorkflowTaskToBeProcessed(
414
      PollWorkflowTaskQueueResponse.Builder workflowTask, AtomicBoolean createdNew) {
415
    if (log.isDebugEnabled()) {
1✔
416
      boolean directQuery = workflowTask.hasQuery();
×
417
      WorkflowExecution execution = workflowTask.getWorkflowExecution();
×
418
      if (directQuery) {
×
419
        log.debug(
×
420
            "Handle Direct Query {}. WorkflowId='{}', RunId='{}', queryType='{}', startedEventId={}, previousStartedEventId={}",
421
            createdNew.get() ? "with new executor" : "with existing executor",
×
422
            execution.getWorkflowId(),
×
423
            execution.getRunId(),
×
424
            workflowTask.getQuery().getQueryType(),
×
425
            workflowTask.getStartedEventId(),
×
426
            workflowTask.getPreviousStartedEventId());
×
427
      } else {
428
        log.debug(
×
429
            "Handle Workflow Task {}. {}WorkflowId='{}', RunId='{}', TaskQueue='{}', startedEventId='{}', previousStartedEventId:{}",
430
            createdNew.get() ? "with new executor" : "with existing executor",
×
431
            workflowTask.getQueriesMap().isEmpty()
×
432
                ? ""
×
433
                : "With queries: "
434
                    + workflowTask.getQueriesMap().values().stream()
×
435
                        .map(WorkflowQuery::getQueryType)
×
436
                        .collect(Collectors.toList())
×
437
                    + ". ",
×
438
            execution.getWorkflowId(),
×
439
            execution.getRunId(),
×
440
            workflowTask.getWorkflowExecutionTaskQueue().getName(),
×
441
            workflowTask.getStartedEventId(),
×
442
            workflowTask.getPreviousStartedEventId());
×
443
      }
444
    }
445
  }
1✔
446
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc