• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 16

16 Apr 2024 01:28AM UTC coverage: 60.239% (-0.1%) from 60.343%
16

push

buildkite

mstifflin
Remove unnecessary sidecar command, try executing with lower resources

11446 of 19001 relevant lines covered (60.24%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.45
/src/main/java/com/uber/cadence/internal/sync/SyncWorkflow.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.sync;
19

20
import static com.uber.cadence.internal.errors.ErrorType.UNKNOWN_WORKFLOW_TYPE;
21

22
import com.uber.cadence.EventType;
23
import com.uber.cadence.HistoryEvent;
24
import com.uber.cadence.WorkflowQuery;
25
import com.uber.cadence.WorkflowType;
26
import com.uber.cadence.client.WorkflowClient;
27
import com.uber.cadence.context.ContextPropagator;
28
import com.uber.cadence.converter.DataConverter;
29
import com.uber.cadence.internal.replay.DeciderCache;
30
import com.uber.cadence.internal.replay.DecisionContext;
31
import com.uber.cadence.internal.replay.ReplayWorkflow;
32
import com.uber.cadence.internal.worker.WorkflowExecutionException;
33
import com.uber.cadence.worker.WorkflowImplementationOptions;
34
import com.uber.cadence.workflow.WorkflowInterceptor;
35
import io.opentracing.Tracer;
36
import java.util.List;
37
import java.util.Objects;
38
import java.util.concurrent.ExecutorService;
39
import java.util.function.Function;
40

41
/**
42
 * SyncWorkflow supports workflows that use synchronous blocking code. An instance is created per
43
 * decision.
44
 */
45
class SyncWorkflow implements ReplayWorkflow {
46

47
  private final DataConverter dataConverter;
48
  private final List<ContextPropagator> contextPropagators;
49
  private final ExecutorService threadPool;
50
  private final SyncWorkflowDefinition workflow;
51
  WorkflowImplementationOptions workflowImplementationOptions;
52
  private final Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory;
53
  private final Tracer tracer;
54
  private DeciderCache cache;
55
  private WorkflowRunnable workflowProc;
56
  private DeterministicRunner runner;
57

58
  public SyncWorkflow(
59
      SyncWorkflowDefinition workflow,
60
      WorkflowImplementationOptions workflowImplementationOptions,
61
      DataConverter dataConverter,
62
      ExecutorService threadPool,
63
      Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory,
64
      DeciderCache cache,
65
      List<ContextPropagator> contextPropagators,
66
      Tracer tracer) {
1✔
67
    this.workflow = Objects.requireNonNull(workflow);
1✔
68
    this.workflowImplementationOptions =
1✔
69
        workflowImplementationOptions == null
1✔
70
            ? new WorkflowImplementationOptions.Builder().build()
×
71
            : workflowImplementationOptions;
1✔
72
    this.dataConverter = Objects.requireNonNull(dataConverter);
1✔
73
    this.threadPool = Objects.requireNonNull(threadPool);
1✔
74
    this.interceptorFactory = Objects.requireNonNull(interceptorFactory);
1✔
75
    this.cache = cache;
1✔
76
    this.contextPropagators = contextPropagators;
1✔
77
    this.tracer = tracer;
1✔
78
  }
1✔
79

80
  @Override
81
  public WorkflowImplementationOptions getWorkflowImplementationOptions() {
82
    return workflowImplementationOptions;
1✔
83
  }
84

85
  @Override
86
  public void start(HistoryEvent event, DecisionContext context) {
87
    WorkflowType workflowType =
1✔
88
        event.getWorkflowExecutionStartedEventAttributes().getWorkflowType();
1✔
89
    if (workflow == null) {
1✔
90
      throw new IllegalArgumentException(UNKNOWN_WORKFLOW_TYPE + ": " + workflowType);
×
91
    }
92
    if (event.getEventType() != EventType.WorkflowExecutionStarted) {
1✔
93
      throw new IllegalArgumentException(
×
94
          "first event is not WorkflowExecutionStarted, but " + event.getEventType());
×
95
    }
96

97
    SyncDecisionContext syncContext =
1✔
98
        new SyncDecisionContext(
99
            context,
100
            dataConverter,
101
            contextPropagators,
102
            interceptorFactory,
103
            event.getWorkflowExecutionStartedEventAttributes().getLastCompletionResult(),
1✔
104
            workflowImplementationOptions,
105
            tracer);
106

107
    workflowProc =
1✔
108
        new WorkflowRunnable(
109
            syncContext, workflow, event.getWorkflowExecutionStartedEventAttributes());
1✔
110
    runner =
1✔
111
        DeterministicRunner.newRunner(
1✔
112
            threadPool, syncContext, context::currentTimeMillis, workflowProc, cache);
1✔
113
    syncContext.setRunner(runner);
1✔
114
  }
1✔
115

116
  @Override
117
  public void handleSignal(String signalName, byte[] input, long eventId) {
118
    String threadName = "\"" + signalName + "\" signal handler";
1✔
119
    runner.executeInWorkflowThread(
1✔
120
        threadName, () -> workflowProc.processSignal(signalName, input, eventId));
1✔
121
  }
1✔
122

123
  @Override
124
  public boolean eventLoop() throws Throwable {
125
    if (runner == null) {
1✔
126
      return false;
1✔
127
    }
128
    workflowProc.fireTimers();
1✔
129
    runner.runUntilAllBlocked();
1✔
130
    return runner.isDone() || workflowProc.isDone(); // Do not wait for all other threads.
1✔
131
  }
132

133
  @Override
134
  public byte[] getOutput() {
135
    return workflowProc.getOutput();
1✔
136
  }
137

138
  @Override
139
  public void cancel(String reason) {
140
    runner.cancel(reason);
1✔
141
  }
1✔
142

143
  @Override
144
  public void close() {
145
    if (runner != null) {
1✔
146
      runner.close();
1✔
147
    }
148
  }
1✔
149

150
  @Override
151
  public long getNextWakeUpTime() {
152
    return runner.getNextWakeUpTime();
1✔
153
  }
154

155
  @Override
156
  public byte[] query(WorkflowQuery query) {
157
    if (WorkflowClient.QUERY_TYPE_REPLAY_ONLY.equals(query.getQueryType())) {
1✔
158
      return new byte[] {};
1✔
159
    }
160
    if (WorkflowClient.QUERY_TYPE_STACK_TRACE.equals(query.getQueryType())) {
1✔
161
      return dataConverter.toData(runner.stackTrace());
1✔
162
    }
163
    return workflowProc.query(query.getQueryType(), query.getQueryArgs());
1✔
164
  }
165

166
  @Override
167
  public WorkflowExecutionException mapUnexpectedException(Exception failure) {
168
    return POJOWorkflowImplementationFactory.mapToWorkflowExecutionException(
1✔
169
        failure, dataConverter);
170
  }
171

172
  @Override
173
  public WorkflowExecutionException mapError(Error failure) {
174
    return POJOWorkflowImplementationFactory.mapError(failure, dataConverter);
1✔
175
  }
176
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc