• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #157

pending completion
#157

push

github-actions

web-flow
Provide SerializationContext for PayloadConverter and PayloadCodec (#1695)

Issue #1694

497 of 497 new or added lines in 32 files covered. (100.0%)

16942 of 20806 relevant lines covered (81.43%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.99
/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.replay;
22

23
import static io.temporal.internal.common.InternalUtils.createStickyTaskQueue;
24
import static io.temporal.internal.common.WorkflowExecutionUtils.isFullHistory;
25
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
26

27
import com.uber.m3.tally.Scope;
28
import com.uber.m3.util.ImmutableMap;
29
import io.temporal.api.command.v1.Command;
30
import io.temporal.api.command.v1.FailWorkflowExecutionCommandAttributes;
31
import io.temporal.api.common.v1.MeteringMetadata;
32
import io.temporal.api.common.v1.WorkflowExecution;
33
import io.temporal.api.common.v1.WorkflowType;
34
import io.temporal.api.enums.v1.CommandType;
35
import io.temporal.api.enums.v1.QueryResultType;
36
import io.temporal.api.enums.v1.WorkflowTaskFailedCause;
37
import io.temporal.api.failure.v1.Failure;
38
import io.temporal.api.history.v1.HistoryEvent;
39
import io.temporal.api.query.v1.WorkflowQuery;
40
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
41
import io.temporal.api.workflowservice.v1.*;
42
import io.temporal.internal.common.ProtobufTimeUtils;
43
import io.temporal.internal.common.WorkflowExecutionUtils;
44
import io.temporal.internal.worker.*;
45
import io.temporal.serviceclient.MetricsTag;
46
import io.temporal.serviceclient.WorkflowServiceStubs;
47
import io.temporal.worker.NonDeterministicException;
48
import java.io.PrintWriter;
49
import java.io.StringWriter;
50
import java.time.Duration;
51
import java.util.List;
52
import java.util.Objects;
53
import java.util.concurrent.atomic.AtomicBoolean;
54
import java.util.stream.Collectors;
55
import org.slf4j.Logger;
56
import org.slf4j.LoggerFactory;
57

58
public final class ReplayWorkflowTaskHandler implements WorkflowTaskHandler {
59

60
  private static final Logger log = LoggerFactory.getLogger(ReplayWorkflowTaskHandler.class);
1✔
61

62
  private final ReplayWorkflowFactory workflowFactory;
63
  private final String namespace;
64
  private final WorkflowExecutorCache cache;
65
  private final SingleWorkerOptions options;
66
  private final Duration stickyTaskQueueScheduleToStartTimeout;
67
  private final WorkflowServiceStubs service;
68
  private final String stickyTaskQueueName;
69
  private final LocalActivityDispatcher localActivityDispatcher;
70

71
  public ReplayWorkflowTaskHandler(
72
      String namespace,
73
      ReplayWorkflowFactory asyncWorkflowFactory,
74
      WorkflowExecutorCache cache,
75
      SingleWorkerOptions options,
76
      String stickyTaskQueueName,
77
      Duration stickyTaskQueueScheduleToStartTimeout,
78
      WorkflowServiceStubs service,
79
      LocalActivityDispatcher localActivityDispatcher) {
1✔
80
    this.namespace = namespace;
1✔
81
    this.workflowFactory = asyncWorkflowFactory;
1✔
82
    this.cache = cache;
1✔
83
    this.options = options;
1✔
84
    this.stickyTaskQueueName = stickyTaskQueueName;
1✔
85
    this.stickyTaskQueueScheduleToStartTimeout = stickyTaskQueueScheduleToStartTimeout;
1✔
86
    this.service = Objects.requireNonNull(service);
1✔
87
    this.localActivityDispatcher = localActivityDispatcher;
1✔
88
  }
1✔
89

90
  @Override
91
  public WorkflowTaskHandler.Result handleWorkflowTask(PollWorkflowTaskQueueResponse workflowTask)
92
      throws Exception {
93
    String workflowType = workflowTask.getWorkflowType().getName();
1✔
94
    Scope metricsScope =
1✔
95
        options.getMetricsScope().tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, workflowType));
1✔
96
    return handleWorkflowTaskWithQuery(workflowTask.toBuilder(), metricsScope);
1✔
97
  }
98

99
  private Result handleWorkflowTaskWithQuery(
100
      PollWorkflowTaskQueueResponse.Builder workflowTask, Scope metricsScope) throws Exception {
101
    boolean directQuery = workflowTask.hasQuery();
1✔
102
    AtomicBoolean createdNew = new AtomicBoolean();
1✔
103
    WorkflowExecution execution = workflowTask.getWorkflowExecution();
1✔
104
    WorkflowRunTaskHandler workflowRunTaskHandler = null;
1✔
105
    boolean useCache = stickyTaskQueueName != null;
1✔
106

107
    try {
108
      workflowRunTaskHandler =
1✔
109
          getOrCreateWorkflowExecutor(useCache, workflowTask, metricsScope, createdNew);
1✔
110
      logWorkflowTaskToBeProcessed(workflowTask, createdNew);
1✔
111

112
      ServiceWorkflowHistoryIterator historyIterator =
1✔
113
          new ServiceWorkflowHistoryIterator(service, namespace, workflowTask, metricsScope);
114
      boolean finalCommand;
115
      Result result;
116

117
      if (directQuery) {
1✔
118
        // Direct query happens when there is no reason (events) to produce a real persisted
119
        // workflow task.
120
        // But Server needs to notify the workflow about the query and get back the query result.
121
        // Server creates a fake non-persisted a PollWorkflowTaskResponse with just the query.
122
        // This WFT has no new events in the history to process
123
        // and the worker response on such a WFT can't contain any new commands either.
124
        QueryResult queryResult =
1✔
125
            workflowRunTaskHandler.handleDirectQueryWorkflowTask(workflowTask, historyIterator);
1✔
126
        finalCommand = queryResult.isWorkflowMethodCompleted();
1✔
127
        result = createDirectQueryResult(workflowTask, queryResult, null);
1✔
128
      } else {
1✔
129
        // main code path, handle workflow task that can have an embedded query
130
        WorkflowTaskResult wftResult =
1✔
131
            workflowRunTaskHandler.handleWorkflowTask(workflowTask, historyIterator);
1✔
132
        finalCommand = wftResult.isFinalCommand();
1✔
133
        result =
1✔
134
            createCompletedWFTRequest(
1✔
135
                workflowTask.getWorkflowType().getName(), workflowTask, wftResult);
1✔
136
      }
137

138
      if (useCache) {
1✔
139
        if (finalCommand) {
1✔
140
          // don't invalidate execution from the cache if we were not using cached value here
141
          cache.invalidate(execution, metricsScope, "FinalCommand", null);
1✔
142
        } else if (createdNew.get()) {
1✔
143
          cache.addToCache(execution, workflowRunTaskHandler);
1✔
144
        }
145
      }
146

147
      return result;
1✔
148
    } catch (InterruptedException e) {
1✔
149
      throw e;
1✔
150
    } catch (Throwable e) {
1✔
151
      // Note here that the executor might not be in the cache, even when the caching is on. In that
152
      // case we need to close the executor explicitly. For items in the cache, invalidation
153
      // callback will try to close again, which should be ok.
154
      if (workflowRunTaskHandler != null) {
1✔
155
        workflowRunTaskHandler.close();
1✔
156
      }
157

158
      if (useCache) {
1✔
159
        cache.invalidate(execution, metricsScope, "Exception", e);
1✔
160
        // If history is full and exception occurred then sticky session hasn't been established
161
        // yet, and we can avoid doing a reset.
162
        if (!isFullHistory(workflowTask)) {
1✔
163
          resetStickyTaskQueue(execution);
1✔
164
        }
165
      }
166

167
      if (directQuery) {
1✔
168
        return createDirectQueryResult(workflowTask, null, e);
1✔
169
      } else {
170
        // this call rethrows an exception in some scenarios
171
        return failureToWFTResult(workflowTask, e);
1✔
172
      }
173
    } finally {
174
      if (!useCache && workflowRunTaskHandler != null) {
1✔
175
        // we close the execution in finally only if we don't use cache, otherwise it stays open
176
        workflowRunTaskHandler.close();
1✔
177
      }
178
    }
179
  }
180

181
  private Result createCompletedWFTRequest(
182
      String workflowType,
183
      PollWorkflowTaskQueueResponseOrBuilder workflowTask,
184
      WorkflowTaskResult result) {
185
    WorkflowExecution execution = workflowTask.getWorkflowExecution();
1✔
186
    if (log.isTraceEnabled()) {
1✔
187
      log.trace(
×
188
          "WorkflowTask startedEventId="
189
              + workflowTask.getStartedEventId()
×
190
              + ", WorkflowId="
191
              + execution.getWorkflowId()
×
192
              + ", RunId="
193
              + execution.getRunId()
×
194
              + " completed with \n"
195
              + WorkflowExecutionUtils.prettyPrintCommands(result.getCommands()));
×
196
    } else if (log.isDebugEnabled()) {
1✔
197
      log.debug(
×
198
          "WorkflowTask startedEventId="
199
              + workflowTask.getStartedEventId()
×
200
              + ", WorkflowId="
201
              + execution.getWorkflowId()
×
202
              + ", RunId="
203
              + execution.getRunId()
×
204
              + " completed with "
205
              + result.getCommands().size()
×
206
              + " new commands");
207
    }
208
    RespondWorkflowTaskCompletedRequest.Builder completedRequest =
209
        RespondWorkflowTaskCompletedRequest.newBuilder()
1✔
210
            .setTaskToken(workflowTask.getTaskToken())
1✔
211
            .addAllCommands(result.getCommands())
1✔
212
            .putAllQueryResults(result.getQueryResults())
1✔
213
            .setForceCreateNewWorkflowTask(result.isForceWorkflowTask())
1✔
214
            .setMeteringMetadata(
1✔
215
                MeteringMetadata.newBuilder()
1✔
216
                    .setNonfirstLocalActivityExecutionAttempts(
1✔
217
                        result.getNonfirstLocalActivityAttempts())
1✔
218
                    .build())
1✔
219
            .setReturnNewWorkflowTask(result.isForceWorkflowTask());
1✔
220

221
    if (stickyTaskQueueName != null
1✔
222
        && (stickyTaskQueueScheduleToStartTimeout == null
223
            || !stickyTaskQueueScheduleToStartTimeout.isZero())) {
1✔
224
      StickyExecutionAttributes.Builder attributes =
225
          StickyExecutionAttributes.newBuilder()
1✔
226
              .setWorkerTaskQueue(createStickyTaskQueue(stickyTaskQueueName));
1✔
227
      if (stickyTaskQueueScheduleToStartTimeout != null) {
1✔
228
        attributes.setScheduleToStartTimeout(
1✔
229
            ProtobufTimeUtils.toProtoDuration(stickyTaskQueueScheduleToStartTimeout));
1✔
230
      }
231
      completedRequest.setStickyAttributes(attributes);
1✔
232
    }
233
    return new Result(
1✔
234
        workflowType, completedRequest.build(), null, null, null, result.isFinalCommand());
1✔
235
  }
236

237
  private Result failureToWFTResult(
238
      PollWorkflowTaskQueueResponseOrBuilder workflowTask, Throwable e) throws Exception {
239
    String workflowType = workflowTask.getWorkflowType().getName();
1✔
240
    if (e instanceof WorkflowExecutionException) {
1✔
241
      RespondWorkflowTaskCompletedRequest response =
242
          RespondWorkflowTaskCompletedRequest.newBuilder()
1✔
243
              .setTaskToken(workflowTask.getTaskToken())
1✔
244
              .setIdentity(options.getIdentity())
1✔
245
              .setNamespace(namespace)
1✔
246
              .setBinaryChecksum(options.getBinaryChecksum())
1✔
247
              .addCommands(
1✔
248
                  Command.newBuilder()
1✔
249
                      .setCommandType(CommandType.COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION)
1✔
250
                      .setFailWorkflowExecutionCommandAttributes(
1✔
251
                          FailWorkflowExecutionCommandAttributes.newBuilder()
1✔
252
                              .setFailure(((WorkflowExecutionException) e).getFailure()))
1✔
253
                      .build())
1✔
254
              .build();
1✔
255
      return new WorkflowTaskHandler.Result(workflowType, response, null, null, null, false);
1✔
256
    }
257

258
    WorkflowExecution execution = workflowTask.getWorkflowExecution();
1✔
259
    log.warn(
1✔
260
        "Workflow task processing failure. startedEventId={}, WorkflowId={}, RunId={}. If seen continuously the workflow might be stuck.",
261
        workflowTask.getStartedEventId(),
1✔
262
        execution.getWorkflowId(),
1✔
263
        execution.getRunId(),
1✔
264
        e);
265

266
    // Only fail workflow task on the first attempt, subsequent failures of the same workflow task
267
    // should timeout. This is to avoid spin on the failed workflow task as the service doesn't
268
    // yet increase the retry interval.
269
    if (workflowTask.getAttempt() > 1) {
1✔
270
      /*
271
       * TODO we shouldn't swallow Error even if workflowTask.getAttempt() == 1.
272
       *  But leaving as it is for now, because a trivial change to rethrow
273
       *  will leave us without reporting Errors as WorkflowTaskFailure to the server,
274
       *  which we probably should at least attempt to do for visibility that the Error occurs.
275
       */
276
      if (e instanceof Error) {
1✔
277
        throw (Error) e;
1✔
278
      }
279
      throw (Exception) e;
1✔
280
    }
281

282
    Failure failure = options.getDataConverter().exceptionToFailure(e);
1✔
283
    RespondWorkflowTaskFailedRequest.Builder failedRequest =
284
        RespondWorkflowTaskFailedRequest.newBuilder()
1✔
285
            .setTaskToken(workflowTask.getTaskToken())
1✔
286
            .setFailure(failure);
1✔
287
    if (e instanceof NonDeterministicException) {
1✔
288
      failedRequest.setCause(
1✔
289
          WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR);
290
    }
291
    return new WorkflowTaskHandler.Result(
1✔
292
        workflowType, null, failedRequest.build(), null, null, false);
1✔
293
  }
294

295
  private Result createDirectQueryResult(
296
      PollWorkflowTaskQueueResponseOrBuilder workflowTask, QueryResult queryResult, Throwable e) {
297
    RespondQueryTaskCompletedRequest.Builder queryCompletedRequest =
298
        RespondQueryTaskCompletedRequest.newBuilder()
1✔
299
            .setTaskToken(workflowTask.getTaskToken())
1✔
300
            .setNamespace(namespace);
1✔
301

302
    if (e == null) {
1✔
303
      queryCompletedRequest.setCompletedType(QueryResultType.QUERY_RESULT_TYPE_ANSWERED);
1✔
304
      queryResult.getResponsePayloads().ifPresent(queryCompletedRequest::setQueryResult);
1✔
305
    } else {
306
      queryCompletedRequest.setCompletedType(QueryResultType.QUERY_RESULT_TYPE_FAILED);
1✔
307
      // TODO: Appropriate exception serialization.
308
      StringWriter sw = new StringWriter();
1✔
309
      PrintWriter pw = new PrintWriter(sw);
1✔
310
      e.printStackTrace(pw);
1✔
311

312
      queryCompletedRequest.setErrorMessage(sw.toString());
1✔
313
    }
314

315
    return new Result(
1✔
316
        workflowTask.getWorkflowType().getName(),
1✔
317
        null,
318
        null,
319
        queryCompletedRequest.build(),
1✔
320
        null,
321
        false);
322
  }
323

324
  @Override
325
  public boolean isAnyTypeSupported() {
326
    return workflowFactory.isAnyTypeSupported();
1✔
327
  }
328

329
  private WorkflowRunTaskHandler getOrCreateWorkflowExecutor(
330
      boolean useCache,
331
      PollWorkflowTaskQueueResponse.Builder workflowTask,
332
      Scope metricsScope,
333
      AtomicBoolean createdNew)
334
      throws Exception {
335
    if (useCache) {
1✔
336
      return cache.getOrCreate(
1✔
337
          workflowTask,
338
          metricsScope,
339
          () -> {
340
            createdNew.set(true);
1✔
341
            return createStatefulHandler(workflowTask, metricsScope);
1✔
342
          });
343
    } else {
344
      createdNew.set(true);
1✔
345
      return createStatefulHandler(workflowTask, metricsScope);
1✔
346
    }
347
  }
348

349
  // TODO(maxim): Consider refactoring that avoids mutating workflow task.
350
  private WorkflowRunTaskHandler createStatefulHandler(
351
      PollWorkflowTaskQueueResponse.Builder workflowTask, Scope metricsScope) throws Exception {
352
    WorkflowType workflowType = workflowTask.getWorkflowType();
1✔
353
    WorkflowExecution workflowExecution = workflowTask.getWorkflowExecution();
1✔
354
    List<HistoryEvent> events = workflowTask.getHistory().getEventsList();
1✔
355
    // Sticky workflow task with partial history.
356
    if (events.isEmpty() || events.get(0).getEventId() > 1) {
1✔
357
      GetWorkflowExecutionHistoryRequest getHistoryRequest =
358
          GetWorkflowExecutionHistoryRequest.newBuilder()
1✔
359
              .setNamespace(namespace)
1✔
360
              .setExecution(workflowTask.getWorkflowExecution())
1✔
361
              .build();
1✔
362
      GetWorkflowExecutionHistoryResponse getHistoryResponse =
1✔
363
          service
364
              .blockingStub()
1✔
365
              .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
366
              .getWorkflowExecutionHistory(getHistoryRequest);
1✔
367
      workflowTask
1✔
368
          .setHistory(getHistoryResponse.getHistory())
1✔
369
          .setNextPageToken(getHistoryResponse.getNextPageToken());
1✔
370
    }
371
    ReplayWorkflow workflow = workflowFactory.getWorkflow(workflowType, workflowExecution);
1✔
372
    return new ReplayWorkflowRunTaskHandler(
1✔
373
        namespace, workflow, workflowTask, options, metricsScope, localActivityDispatcher);
374
  }
375

376
  private void resetStickyTaskQueue(WorkflowExecution execution) {
377
    service
1✔
378
        .futureStub()
1✔
379
        .resetStickyTaskQueue(
1✔
380
            ResetStickyTaskQueueRequest.newBuilder()
1✔
381
                .setNamespace(namespace)
1✔
382
                .setExecution(execution)
1✔
383
                .build());
1✔
384
  }
1✔
385

386
  private void logWorkflowTaskToBeProcessed(
387
      PollWorkflowTaskQueueResponse.Builder workflowTask, AtomicBoolean createdNew) {
388
    if (log.isDebugEnabled()) {
1✔
389
      boolean directQuery = workflowTask.hasQuery();
×
390
      WorkflowExecution execution = workflowTask.getWorkflowExecution();
×
391
      if (directQuery) {
×
392
        log.debug(
×
393
            "Handle Direct Query {}. WorkflowId='{}', RunId='{}', queryType='{}', startedEventId={}, previousStartedEventId={}",
394
            createdNew.get() ? "with new executor" : "with existing executor",
×
395
            execution.getWorkflowId(),
×
396
            execution.getRunId(),
×
397
            workflowTask.getQuery().getQueryType(),
×
398
            workflowTask.getStartedEventId(),
×
399
            workflowTask.getPreviousStartedEventId());
×
400
      } else {
401
        log.debug(
×
402
            "Handle Workflow Task {}. {}WorkflowId='{}', RunId='{}', TaskQueue='{}', startedEventId='{}', previousStartedEventId:{}",
403
            createdNew.get() ? "with new executor" : "with existing executor",
×
404
            workflowTask.getQueriesMap().isEmpty()
×
405
                ? ""
×
406
                : "With queries: "
407
                    + workflowTask.getQueriesMap().values().stream()
×
408
                        .map(WorkflowQuery::getQueryType)
×
409
                        .collect(Collectors.toList())
×
410
                    + ". ",
×
411
            execution.getWorkflowId(),
×
412
            execution.getRunId(),
×
413
            workflowTask.getWorkflowExecutionTaskQueue().getName(),
×
414
            workflowTask.getStartedEventId(),
×
415
            workflowTask.getPreviousStartedEventId());
×
416
      }
417
    }
418
  }
1✔
419
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc