• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #169

pending completion
#169

push

github-actions

web-flow
Remove use of deprecated API (#1758)

4 of 4 new or added lines in 1 file covered. (100.0%)

17345 of 21558 relevant lines covered (80.46%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.25
/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.replay;
22

23
import static io.temporal.internal.common.InternalUtils.createStickyTaskQueue;
24
import static io.temporal.internal.common.WorkflowExecutionUtils.isFullHistory;
25
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
26

27
import com.uber.m3.tally.Scope;
28
import com.uber.m3.util.ImmutableMap;
29
import io.temporal.api.command.v1.Command;
30
import io.temporal.api.command.v1.FailWorkflowExecutionCommandAttributes;
31
import io.temporal.api.common.v1.MeteringMetadata;
32
import io.temporal.api.common.v1.WorkflowExecution;
33
import io.temporal.api.common.v1.WorkflowType;
34
import io.temporal.api.enums.v1.CommandType;
35
import io.temporal.api.enums.v1.QueryResultType;
36
import io.temporal.api.enums.v1.WorkflowTaskFailedCause;
37
import io.temporal.api.failure.v1.Failure;
38
import io.temporal.api.history.v1.HistoryEvent;
39
import io.temporal.api.query.v1.WorkflowQuery;
40
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
41
import io.temporal.api.workflowservice.v1.*;
42
import io.temporal.internal.common.ProtobufTimeUtils;
43
import io.temporal.internal.common.WorkflowExecutionUtils;
44
import io.temporal.internal.worker.*;
45
import io.temporal.serviceclient.MetricsTag;
46
import io.temporal.serviceclient.WorkflowServiceStubs;
47
import io.temporal.worker.NonDeterministicException;
48
import io.temporal.workflow.Functions;
49
import java.io.PrintWriter;
50
import java.io.StringWriter;
51
import java.time.Duration;
52
import java.util.List;
53
import java.util.Objects;
54
import java.util.concurrent.atomic.AtomicBoolean;
55
import java.util.stream.Collectors;
56
import org.slf4j.Logger;
57
import org.slf4j.LoggerFactory;
58

59
public final class ReplayWorkflowTaskHandler implements WorkflowTaskHandler {
60

61
  private static final Logger log = LoggerFactory.getLogger(ReplayWorkflowTaskHandler.class);
62

63
  private final ReplayWorkflowFactory workflowFactory;
64
  private final String namespace;
65
  private final WorkflowExecutorCache cache;
66
  private final SingleWorkerOptions options;
67
  private final Duration stickyTaskQueueScheduleToStartTimeout;
68
  private final WorkflowServiceStubs service;
69
  private final String stickyTaskQueueName;
70
  private final LocalActivityDispatcher localActivityDispatcher;
71

72
  public ReplayWorkflowTaskHandler(
73
      String namespace,
74
      ReplayWorkflowFactory asyncWorkflowFactory,
75
      WorkflowExecutorCache cache,
76
      SingleWorkerOptions options,
77
      String stickyTaskQueueName,
78
      Duration stickyTaskQueueScheduleToStartTimeout,
79
      WorkflowServiceStubs service,
80
      LocalActivityDispatcher localActivityDispatcher) {
81
    this.namespace = namespace;
82
    this.workflowFactory = asyncWorkflowFactory;
83
    this.cache = cache;
84
    this.options = options;
85
    this.stickyTaskQueueName = stickyTaskQueueName;
86
    this.stickyTaskQueueScheduleToStartTimeout = stickyTaskQueueScheduleToStartTimeout;
87
    this.service = Objects.requireNonNull(service);
88
    this.localActivityDispatcher = localActivityDispatcher;
89
  }
90

91
  @Override
92
  public WorkflowTaskHandler.Result handleWorkflowTask(PollWorkflowTaskQueueResponse workflowTask)
93
      throws Exception {
94
    String workflowType = workflowTask.getWorkflowType().getName();
95
    Scope metricsScope =
96
        options.getMetricsScope().tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, workflowType));
97
    return handleWorkflowTaskWithQuery(workflowTask.toBuilder(), metricsScope);
98
  }
99

100
  private Result handleWorkflowTaskWithQuery(
101
      PollWorkflowTaskQueueResponse.Builder workflowTask, Scope metricsScope) throws Exception {
102
    boolean directQuery = workflowTask.hasQuery();
103
    AtomicBoolean createdNew = new AtomicBoolean();
104
    WorkflowExecution execution = workflowTask.getWorkflowExecution();
105
    WorkflowRunTaskHandler workflowRunTaskHandler = null;
106
    boolean useCache = stickyTaskQueueName != null;
107

108
    try {
109
      workflowRunTaskHandler =
110
          getOrCreateWorkflowExecutor(useCache, workflowTask, metricsScope, createdNew);
111
      logWorkflowTaskToBeProcessed(workflowTask, createdNew);
112

113
      ServiceWorkflowHistoryIterator historyIterator =
114
          new ServiceWorkflowHistoryIterator(service, namespace, workflowTask, metricsScope);
115
      boolean finalCommand;
116
      Result result;
117

118
      if (directQuery) {
119
        // Direct query happens when there is no reason (events) to produce a real persisted
120
        // workflow task.
121
        // But Server needs to notify the workflow about the query and get back the query result.
122
        // Server creates a fake non-persisted a PollWorkflowTaskResponse with just the query.
123
        // This WFT has no new events in the history to process
124
        // and the worker response on such a WFT can't contain any new commands either.
125
        QueryResult queryResult =
126
            workflowRunTaskHandler.handleDirectQueryWorkflowTask(workflowTask, historyIterator);
127
        finalCommand = queryResult.isWorkflowMethodCompleted();
128
        result = createDirectQueryResult(workflowTask, queryResult, null);
129
      } else {
130
        // main code path, handle workflow task that can have an embedded query
131
        WorkflowTaskResult wftResult =
132
            workflowRunTaskHandler.handleWorkflowTask(workflowTask, historyIterator);
133
        finalCommand = wftResult.isFinalCommand();
134
        result =
135
            createCompletedWFTRequest(
136
                workflowTask.getWorkflowType().getName(),
137
                workflowTask,
138
                wftResult,
139
                workflowRunTaskHandler::setCurrentStartedEvenId);
140
      }
141

142
      if (useCache) {
143
        if (finalCommand) {
144
          // don't invalidate execution from the cache if we were not using cached value here
145
          cache.invalidate(execution, metricsScope, "FinalCommand", null);
146
        } else if (createdNew.get()) {
147
          cache.addToCache(execution, workflowRunTaskHandler);
148
        }
149
      }
150

151
      return result;
152
    } catch (InterruptedException e) {
153
      throw e;
154
    } catch (Throwable e) {
155
      // Note here that the executor might not be in the cache, even when the caching is on. In that
156
      // case we need to close the executor explicitly. For items in the cache, invalidation
157
      // callback will try to close again, which should be ok.
158
      if (workflowRunTaskHandler != null) {
159
        workflowRunTaskHandler.close();
160
      }
161

162
      if (useCache) {
163
        cache.invalidate(execution, metricsScope, "Exception", e);
164
        // If history is full and exception occurred then sticky session hasn't been established
165
        // yet, and we can avoid doing a reset.
166
        if (!isFullHistory(workflowTask)) {
167
          resetStickyTaskQueue(execution);
168
        }
169
      }
170

171
      if (directQuery) {
172
        return createDirectQueryResult(workflowTask, null, e);
173
      } else {
174
        // this call rethrows an exception in some scenarios
175
        return failureToWFTResult(workflowTask, e);
176
      }
177
    } finally {
178
      if (!useCache && workflowRunTaskHandler != null) {
179
        // we close the execution in finally only if we don't use cache, otherwise it stays open
180
        workflowRunTaskHandler.close();
181
      }
182
    }
183
  }
184

185
  private Result createCompletedWFTRequest(
186
      String workflowType,
187
      PollWorkflowTaskQueueResponseOrBuilder workflowTask,
188
      WorkflowTaskResult result,
189
      Functions.Proc1<Long> eventIdSetHandle) {
190
    WorkflowExecution execution = workflowTask.getWorkflowExecution();
191
    if (log.isTraceEnabled()) {
192
      log.trace(
193
          "WorkflowTask startedEventId="
194
              + workflowTask.getStartedEventId()
195
              + ", WorkflowId="
196
              + execution.getWorkflowId()
197
              + ", RunId="
198
              + execution.getRunId()
199
              + " completed with \n"
200
              + WorkflowExecutionUtils.prettyPrintCommands(result.getCommands()));
201
    } else if (log.isDebugEnabled()) {
202
      log.debug(
203
          "WorkflowTask startedEventId="
204
              + workflowTask.getStartedEventId()
205
              + ", WorkflowId="
206
              + execution.getWorkflowId()
207
              + ", RunId="
208
              + execution.getRunId()
209
              + " completed with "
210
              + result.getCommands().size()
211
              + " new commands");
212
    }
213
    RespondWorkflowTaskCompletedRequest.Builder completedRequest =
214
        RespondWorkflowTaskCompletedRequest.newBuilder()
215
            .setTaskToken(workflowTask.getTaskToken())
216
            .addAllCommands(result.getCommands())
217
            .addAllMessages(result.getMessages())
218
            .putAllQueryResults(result.getQueryResults())
219
            .setForceCreateNewWorkflowTask(result.isForceWorkflowTask())
220
            .setMeteringMetadata(
221
                MeteringMetadata.newBuilder()
222
                    .setNonfirstLocalActivityExecutionAttempts(
223
                        result.getNonfirstLocalActivityAttempts())
224
                    .build())
225
            .setReturnNewWorkflowTask(result.isForceWorkflowTask());
226

227
    if (stickyTaskQueueName != null
228
        && (stickyTaskQueueScheduleToStartTimeout == null
229
            || !stickyTaskQueueScheduleToStartTimeout.isZero())) {
230
      StickyExecutionAttributes.Builder attributes =
231
          StickyExecutionAttributes.newBuilder()
232
              .setWorkerTaskQueue(createStickyTaskQueue(stickyTaskQueueName));
233
      if (stickyTaskQueueScheduleToStartTimeout != null) {
234
        attributes.setScheduleToStartTimeout(
235
            ProtobufTimeUtils.toProtoDuration(stickyTaskQueueScheduleToStartTimeout));
236
      }
237
      completedRequest.setStickyAttributes(attributes);
238
    }
239
    return new Result(
240
        workflowType,
241
        completedRequest.build(),
242
        null,
243
        null,
244
        null,
245
        result.isFinalCommand(),
246
        eventIdSetHandle);
247
  }
248

249
  private Result failureToWFTResult(
250
      PollWorkflowTaskQueueResponseOrBuilder workflowTask, Throwable e) throws Exception {
251
    String workflowType = workflowTask.getWorkflowType().getName();
252
    if (e instanceof WorkflowExecutionException) {
253
      RespondWorkflowTaskCompletedRequest response =
254
          RespondWorkflowTaskCompletedRequest.newBuilder()
255
              .setTaskToken(workflowTask.getTaskToken())
256
              .setIdentity(options.getIdentity())
257
              .setNamespace(namespace)
258
              .setBinaryChecksum(options.getBinaryChecksum())
259
              .addCommands(
260
                  Command.newBuilder()
261
                      .setCommandType(CommandType.COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION)
262
                      .setFailWorkflowExecutionCommandAttributes(
263
                          FailWorkflowExecutionCommandAttributes.newBuilder()
264
                              .setFailure(((WorkflowExecutionException) e).getFailure()))
265
                      .build())
266
              .build();
267
      return new WorkflowTaskHandler.Result(workflowType, response, null, null, null, false, null);
268
    }
269

270
    WorkflowExecution execution = workflowTask.getWorkflowExecution();
271
    log.warn(
272
        "Workflow task processing failure. startedEventId={}, WorkflowId={}, RunId={}. If seen continuously the workflow might be stuck.",
273
        workflowTask.getStartedEventId(),
274
        execution.getWorkflowId(),
275
        execution.getRunId(),
276
        e);
277

278
    // Only fail workflow task on the first attempt, subsequent failures of the same workflow task
279
    // should timeout. This is to avoid spin on the failed workflow task as the service doesn't
280
    // yet increase the retry interval.
281
    if (workflowTask.getAttempt() > 1) {
282
      /*
283
       * TODO we shouldn't swallow Error even if workflowTask.getAttempt() == 1.
284
       *  But leaving as it is for now, because a trivial change to rethrow
285
       *  will leave us without reporting Errors as WorkflowTaskFailure to the server,
286
       *  which we probably should at least attempt to do for visibility that the Error occurs.
287
       */
288
      if (e instanceof Error) {
289
        throw (Error) e;
290
      }
291
      throw (Exception) e;
292
    }
293

294
    Failure failure = options.getDataConverter().exceptionToFailure(e);
295
    RespondWorkflowTaskFailedRequest.Builder failedRequest =
296
        RespondWorkflowTaskFailedRequest.newBuilder()
297
            .setTaskToken(workflowTask.getTaskToken())
298
            .setFailure(failure);
299
    if (e instanceof NonDeterministicException) {
300
      failedRequest.setCause(
301
          WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR);
302
    }
303
    return new WorkflowTaskHandler.Result(
304
        workflowType, null, failedRequest.build(), null, null, false, null);
305
  }
306

307
  private Result createDirectQueryResult(
308
      PollWorkflowTaskQueueResponseOrBuilder workflowTask, QueryResult queryResult, Throwable e) {
309
    RespondQueryTaskCompletedRequest.Builder queryCompletedRequest =
310
        RespondQueryTaskCompletedRequest.newBuilder()
311
            .setTaskToken(workflowTask.getTaskToken())
312
            .setNamespace(namespace);
313

314
    if (e == null) {
315
      queryCompletedRequest.setCompletedType(QueryResultType.QUERY_RESULT_TYPE_ANSWERED);
316
      queryResult.getResponsePayloads().ifPresent(queryCompletedRequest::setQueryResult);
317
    } else {
318
      queryCompletedRequest.setCompletedType(QueryResultType.QUERY_RESULT_TYPE_FAILED);
319
      // TODO: Appropriate exception serialization.
320
      StringWriter sw = new StringWriter();
321
      PrintWriter pw = new PrintWriter(sw);
322
      e.printStackTrace(pw);
323

324
      queryCompletedRequest.setErrorMessage(sw.toString());
325
    }
326

327
    return new Result(
328
        workflowTask.getWorkflowType().getName(),
329
        null,
330
        null,
331
        queryCompletedRequest.build(),
332
        null,
333
        false,
334
        null);
335
  }
336

337
  @Override
338
  public boolean isAnyTypeSupported() {
339
    return workflowFactory.isAnyTypeSupported();
340
  }
341

342
  private WorkflowRunTaskHandler getOrCreateWorkflowExecutor(
343
      boolean useCache,
344
      PollWorkflowTaskQueueResponse.Builder workflowTask,
345
      Scope metricsScope,
346
      AtomicBoolean createdNew)
347
      throws Exception {
348
    if (useCache) {
349
      return cache.getOrCreate(
350
          workflowTask,
351
          metricsScope,
352
          () -> {
353
            createdNew.set(true);
354
            return createStatefulHandler(workflowTask, metricsScope);
355
          });
356
    } else {
357
      createdNew.set(true);
358
      return createStatefulHandler(workflowTask, metricsScope);
359
    }
360
  }
361

362
  // TODO(maxim): Consider refactoring that avoids mutating workflow task.
363
  private WorkflowRunTaskHandler createStatefulHandler(
364
      PollWorkflowTaskQueueResponse.Builder workflowTask, Scope metricsScope) throws Exception {
365
    WorkflowType workflowType = workflowTask.getWorkflowType();
366
    WorkflowExecution workflowExecution = workflowTask.getWorkflowExecution();
367
    List<HistoryEvent> events = workflowTask.getHistory().getEventsList();
368
    // Sticky workflow task with partial history.
369
    if (events.isEmpty() || events.get(0).getEventId() > 1) {
370
      GetWorkflowExecutionHistoryRequest getHistoryRequest =
371
          GetWorkflowExecutionHistoryRequest.newBuilder()
372
              .setNamespace(namespace)
373
              .setExecution(workflowTask.getWorkflowExecution())
374
              .build();
375
      GetWorkflowExecutionHistoryResponse getHistoryResponse =
376
          service
377
              .blockingStub()
378
              .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
379
              .getWorkflowExecutionHistory(getHistoryRequest);
380
      workflowTask
381
          .setHistory(getHistoryResponse.getHistory())
382
          .setNextPageToken(getHistoryResponse.getNextPageToken());
383
    }
384
    ReplayWorkflow workflow = workflowFactory.getWorkflow(workflowType, workflowExecution);
385
    return new ReplayWorkflowRunTaskHandler(
386
        namespace, workflow, workflowTask, options, metricsScope, localActivityDispatcher);
387
  }
388

389
  private void resetStickyTaskQueue(WorkflowExecution execution) {
390
    service
391
        .futureStub()
392
        .resetStickyTaskQueue(
393
            ResetStickyTaskQueueRequest.newBuilder()
394
                .setNamespace(namespace)
395
                .setExecution(execution)
396
                .build());
397
  }
398

399
  private void logWorkflowTaskToBeProcessed(
400
      PollWorkflowTaskQueueResponse.Builder workflowTask, AtomicBoolean createdNew) {
401
    if (log.isDebugEnabled()) {
402
      boolean directQuery = workflowTask.hasQuery();
403
      WorkflowExecution execution = workflowTask.getWorkflowExecution();
404
      if (directQuery) {
405
        log.debug(
406
            "Handle Direct Query {}. WorkflowId='{}', RunId='{}', queryType='{}', startedEventId={}, previousStartedEventId={}",
407
            createdNew.get() ? "with new executor" : "with existing executor",
408
            execution.getWorkflowId(),
409
            execution.getRunId(),
410
            workflowTask.getQuery().getQueryType(),
411
            workflowTask.getStartedEventId(),
412
            workflowTask.getPreviousStartedEventId());
413
      } else {
414
        log.debug(
415
            "Handle Workflow Task {}. {}WorkflowId='{}', RunId='{}', TaskQueue='{}', startedEventId='{}', previousStartedEventId:{}",
416
            createdNew.get() ? "with new executor" : "with existing executor",
417
            workflowTask.getQueriesMap().isEmpty()
418
                ? ""
419
                : "With queries: "
420
                    + workflowTask.getQueriesMap().values().stream()
421
                        .map(WorkflowQuery::getQueryType)
422
                        .collect(Collectors.toList())
423
                    + ". ",
424
            execution.getWorkflowId(),
425
            execution.getRunId(),
426
            workflowTask.getWorkflowExecutionTaskQueue().getName(),
427
            workflowTask.getStartedEventId(),
428
            workflowTask.getPreviousStartedEventId());
429
      }
430
    }
431
  }
432
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc