• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #174

pending completion
#174

push

github-actions

web-flow
Add schedules API (#1776)

Add schedules API

1143 of 1143 new or added lines in 35 files covered. (100.0%)

18101 of 23284 relevant lines covered (77.74%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.54
/temporal-sdk/src/main/java/io/temporal/internal/statemachines/UpdateProtocolStateMachine.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.statemachines;
22

23
import com.google.protobuf.Any;
24
import com.google.protobuf.InvalidProtocolBufferException;
25
import io.temporal.api.command.v1.Command;
26
import io.temporal.api.command.v1.ProtocolMessageCommandAttributes;
27
import io.temporal.api.common.v1.Payloads;
28
import io.temporal.api.enums.v1.CommandType;
29
import io.temporal.api.enums.v1.EventType;
30
import io.temporal.api.failure.v1.Failure;
31
import io.temporal.api.protocol.v1.Message;
32
import io.temporal.api.update.v1.Acceptance;
33
import io.temporal.api.update.v1.Outcome;
34
import io.temporal.api.update.v1.Rejection;
35
import io.temporal.api.update.v1.Request;
36
import io.temporal.api.update.v1.Response;
37
import io.temporal.internal.common.ProtocolType;
38
import io.temporal.internal.common.UpdateMessage;
39
import io.temporal.workflow.Functions;
40
import java.util.Optional;
41
import org.slf4j.Logger;
42
import org.slf4j.LoggerFactory;
43

44
final class UpdateProtocolStateMachine
45
    extends EntityStateMachineInitialCommand<
46
        UpdateProtocolStateMachine.State,
47
        UpdateProtocolStateMachine.ExplicitEvent,
48
        UpdateProtocolStateMachine> {
49

50
  enum ExplicitEvent {
1✔
51
    REJECT,
1✔
52
    ACCEPT,
1✔
53
    COMPLETE,
1✔
54
  }
55

56
  enum State {
1✔
57
    NEW,
1✔
58
    REQUEST_INITIATED,
1✔
59
    ACCEPTED,
1✔
60
    ACCEPTED_COMMAND_CREATED,
1✔
61
    ACCEPTED_COMMAND_RECORDED,
1✔
62
    COMPLETED,
1✔
63
    COMPLETED_COMMAND_CREATED,
1✔
64
    COMPLETED_COMMAND_RECORDED,
1✔
65
    COMPLETED_IMMEDIATELY,
1✔
66
    COMPLETED_IMMEDIATELY_COMMAND_CREATED,
1✔
67
    COMPLETED_IMMEDIATELY_COMMAND_RECORDED
1✔
68
  }
69

70
  private static final Logger log = LoggerFactory.getLogger(UpdateProtocolStateMachine.class);
1✔
71

72
  private final Functions.Func<Boolean> replaying;
73

74
  private final Functions.Proc1<UpdateMessage> updateHandle;
75
  private final Functions.Proc1<Message> sendHandle;
76

77
  private String protoInstanceID;
78
  private String requestMsgId;
79
  private long requestSeqID;
80
  private Request initialRequest;
81
  private String messageId;
82

83
  public static final StateMachineDefinition<State, ExplicitEvent, UpdateProtocolStateMachine>
84
      STATE_MACHINE_DEFINITION =
1✔
85
          StateMachineDefinition.<State, ExplicitEvent, UpdateProtocolStateMachine>newInstance(
1✔
86
                  "Update", State.NEW, State.COMPLETED_COMMAND_RECORDED)
87
              .add(
1✔
88
                  State.NEW,
89
                  ProtocolType.UPDATE_V1,
90
                  State.REQUEST_INITIATED,
91
                  UpdateProtocolStateMachine::triggerUpdate)
92
              .add(
1✔
93
                  State.REQUEST_INITIATED,
94
                  ExplicitEvent.ACCEPT,
95
                  State.ACCEPTED,
96
                  UpdateProtocolStateMachine::sendCommandMessage)
97
              .add(
1✔
98
                  State.ACCEPTED,
99
                  CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE,
100
                  State.ACCEPTED_COMMAND_CREATED)
101
              .add(
1✔
102
                  State.ACCEPTED_COMMAND_CREATED,
103
                  EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED,
104
                  State.ACCEPTED_COMMAND_RECORDED)
105
              .add(
1✔
106
                  State.ACCEPTED_COMMAND_RECORDED,
107
                  ExplicitEvent.COMPLETE,
108
                  State.COMPLETED,
109
                  UpdateProtocolStateMachine::sendCommandMessage)
110
              .add(
1✔
111
                  State.COMPLETED,
112
                  CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE,
113
                  State.COMPLETED_COMMAND_CREATED)
114
              .add(
1✔
115
                  State.COMPLETED_COMMAND_CREATED,
116
                  EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED,
117
                  State.COMPLETED_COMMAND_RECORDED)
118
              // Handle the validation failure case
119
              .add(State.REQUEST_INITIATED, ExplicitEvent.REJECT, State.COMPLETED_COMMAND_RECORDED)
1✔
120
              // Handle an edge case when the update handle completes immediately. The state machine
121
              // should then expect
122
              // to see two protocol command messages back to back then two update events.
123
              .add(
1✔
124
                  State.ACCEPTED,
125
                  ExplicitEvent.COMPLETE,
126
                  State.COMPLETED_IMMEDIATELY,
127
                  UpdateProtocolStateMachine::sendCommandMessage)
128
              .add(
1✔
129
                  State.COMPLETED_IMMEDIATELY,
130
                  CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE,
131
                  State.COMPLETED_IMMEDIATELY_COMMAND_CREATED)
132
              .add(
1✔
133
                  State.COMPLETED_IMMEDIATELY_COMMAND_CREATED,
134
                  CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE,
135
                  State.COMPLETED_IMMEDIATELY_COMMAND_RECORDED)
136
              .add(
1✔
137
                  State.COMPLETED_IMMEDIATELY_COMMAND_RECORDED,
138
                  EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED,
139
                  State.COMPLETED_COMMAND_CREATED)
140
              // Handle an edge case when an update handle completes after it has sent the protocol
141
              // message command
142
              // but has not seen the corresponding event. This can happen if the update handle runs
143
              // a local activity
144
              .add(
1✔
145
                  State.ACCEPTED_COMMAND_CREATED,
146
                  ExplicitEvent.COMPLETE,
147
                  State.COMPLETED_IMMEDIATELY_COMMAND_CREATED,
148
                  UpdateProtocolStateMachine::sendCommandMessage);
149

150
  public static UpdateProtocolStateMachine newInstance(
151
      Functions.Func<Boolean> replaying,
152
      Functions.Proc1<UpdateMessage> updateHandle,
153
      Functions.Proc1<Message> sendHandle,
154
      Functions.Proc1<CancellableCommand> commandSink,
155
      Functions.Proc1<StateMachine> stateMachineSink) {
156
    return new UpdateProtocolStateMachine(
1✔
157
        replaying, updateHandle, sendHandle, commandSink, stateMachineSink);
158
  }
159

160
  private UpdateProtocolStateMachine(
161
      Functions.Func<Boolean> replaying,
162
      Functions.Proc1<UpdateMessage> updateHandle,
163
      Functions.Proc1<Message> sendHandle,
164
      Functions.Proc1<CancellableCommand> commandSink,
165
      Functions.Proc1<StateMachine> stateMachineSink) {
166
    super(STATE_MACHINE_DEFINITION, commandSink, stateMachineSink);
1✔
167
    this.replaying = replaying;
1✔
168
    this.updateHandle = updateHandle;
1✔
169
    this.sendHandle = sendHandle;
1✔
170
  }
1✔
171

172
  void triggerUpdate() {
173
    protoInstanceID = this.currentMessage.getProtocolInstanceId();
1✔
174
    requestMsgId = this.currentMessage.getId();
1✔
175
    requestSeqID = this.currentMessage.getEventId();
1✔
176
    try {
177
      initialRequest = this.currentMessage.getBody().unpack(Request.class);
1✔
178
    } catch (InvalidProtocolBufferException e) {
×
179
      throw new IllegalArgumentException("Current message not an update:" + this.currentMessage);
×
180
    }
1✔
181
    UpdateMessage updateMessage =
1✔
182
        new UpdateMessage(this.currentMessage, new UpdateProtocolCallbackImpl());
183

184
    updateHandle.apply(updateMessage);
1✔
185
  }
1✔
186

187
  void sendCommandMessage() {
188
    addCommand(
1✔
189
        Command.newBuilder()
1✔
190
            .setCommandType(CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE)
1✔
191
            .setProtocolMessageCommandAttributes(
1✔
192
                ProtocolMessageCommandAttributes.newBuilder().setMessageId(messageId))
1✔
193
            .build());
1✔
194
  }
1✔
195

196
  public void accept() {
197
    Acceptance acceptResponse =
198
        Acceptance.newBuilder()
1✔
199
            .setAcceptedRequestMessageId(requestMsgId)
1✔
200
            .setAcceptedRequestSequencingEventId(requestSeqID)
1✔
201
            .setAcceptedRequest(initialRequest)
1✔
202
            .build();
1✔
203

204
    messageId = requestMsgId + "/accept";
1✔
205
    sendHandle.apply(
1✔
206
        Message.newBuilder()
1✔
207
            .setId(messageId)
1✔
208
            .setProtocolInstanceId(protoInstanceID)
1✔
209
            .setBody(Any.pack(acceptResponse))
1✔
210
            .build());
1✔
211
    explicitEvent(ExplicitEvent.ACCEPT);
1✔
212
  }
1✔
213

214
  public void reject(Failure failure) {
215
    Rejection rejectResponse =
216
        Rejection.newBuilder()
1✔
217
            .setRejectedRequestMessageId(requestMsgId)
1✔
218
            .setRejectedRequestSequencingEventId(requestSeqID)
1✔
219
            .setRejectedRequest(initialRequest)
1✔
220
            .setFailure(failure)
1✔
221
            .build();
1✔
222

223
    String messageId = requestMsgId + "/reject";
1✔
224
    sendHandle.apply(
1✔
225
        Message.newBuilder()
1✔
226
            .setId(messageId)
1✔
227
            .setProtocolInstanceId(protoInstanceID)
1✔
228
            .setBody(Any.pack(rejectResponse))
1✔
229
            .build());
1✔
230
    explicitEvent(ExplicitEvent.REJECT);
1✔
231
  }
1✔
232

233
  public void complete(Optional<Payloads> payload, Failure failure) {
234
    Outcome.Builder outcome = Outcome.newBuilder();
1✔
235
    if (failure != null) {
1✔
236
      outcome = outcome.setFailure(failure);
1✔
237
    } else {
238
      outcome = outcome.setSuccess(payload.isPresent() ? payload.get() : null);
1✔
239
    }
240

241
    Response outcomeResponse =
242
        Response.newBuilder().setOutcome(outcome).setMeta(initialRequest.getMeta()).build();
1✔
243

244
    messageId = requestMsgId + "/complete";
1✔
245
    sendHandle.apply(
1✔
246
        Message.newBuilder()
1✔
247
            .setId(messageId)
1✔
248
            .setProtocolInstanceId(protoInstanceID)
1✔
249
            .setBody(Any.pack(outcomeResponse))
1✔
250
            .build());
1✔
251
    explicitEvent(ExplicitEvent.COMPLETE);
1✔
252
  }
1✔
253

254
  @Override
255
  public void handleMessage(Message message) {
256
    // The right sequence of failures on the server right now may lead to duplicate request messages
257
    // for the same protocolInstanceID being sent to the worker. To work around this ignore
258
    // subsequent
259
    // messages if we are not in the NEW state.
260
    if (getState() == State.NEW) {
1✔
261
      super.handleMessage(message);
1✔
262
    } else if (log.isWarnEnabled()) {
×
263
      log.warn(
×
264
          "Received duplicate update messages for protocol instance: "
265
              + message.getProtocolInstanceId());
×
266
    }
267
  }
1✔
268

269
  private class UpdateProtocolCallbackImpl implements UpdateProtocolCallback {
1✔
270

271
    @Override
272
    public void accept() {
273
      UpdateProtocolStateMachine.this.accept();
1✔
274
    }
1✔
275

276
    @Override
277
    public void reject(Failure failure) {
278
      UpdateProtocolStateMachine.this.reject(failure);
1✔
279
    }
1✔
280

281
    @Override
282
    public void complete(Optional<Payloads> result, Failure failure) {
283
      UpdateProtocolStateMachine.this.complete(result, failure);
1✔
284
    }
1✔
285

286
    @Override
287
    public boolean isReplaying() {
288
      return UpdateProtocolStateMachine.this.replaying.apply();
1✔
289
    }
290
  }
291
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc