• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #299

12 Aug 2024 06:16AM UTC coverage: 77.696% (+0.005%) from 77.691%
#299

push

github

web-flow
Warn on dangling handlers and add method to help await on all handlers. (#2144)

Warn on dangling handlers and add method to help
await on all handlers

96 of 108 new or added lines in 14 files covered. (88.89%)

9 existing lines in 3 files now uncovered.

19961 of 25691 relevant lines covered (77.7%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.67
/temporal-sdk/src/main/java/io/temporal/internal/sync/UpdateDispatcher.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import io.temporal.api.common.v1.Payloads;
24
import io.temporal.common.converter.DataConverter;
25
import io.temporal.common.converter.EncodedValues;
26
import io.temporal.common.interceptors.Header;
27
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
28
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor.UpdateInput;
29
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor.UpdateOutput;
30
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
31
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor.UpdateRegistrationRequest;
32
import io.temporal.workflow.DynamicUpdateHandler;
33
import io.temporal.workflow.HandlerUnfinishedPolicy;
34
import java.util.*;
35
import org.slf4j.Logger;
36
import org.slf4j.LoggerFactory;
37

38
class UpdateDispatcher {
39
  private static final Logger log = LoggerFactory.getLogger(UpdateDispatcher.class);
1✔
40

41
  private final DataConverter dataConverterWithWorkflowContext;
42
  private final Map<String, WorkflowOutboundCallsInterceptor.UpdateRegistrationRequest>
1✔
43
      updateCallbacks = new HashMap<>();
44

45
  private DynamicUpdateHandler dynamicUpdateHandler;
46
  private WorkflowInboundCallsInterceptor inboundCallsInterceptor;
47
  private Map<String, UpdateHandlerInfo> runningUpdateHandlers = new TreeMap<>();
1✔
48

49
  public UpdateDispatcher(DataConverter dataConverterWithWorkflowContext) {
1✔
50
    this.dataConverterWithWorkflowContext = dataConverterWithWorkflowContext;
1✔
51
  }
1✔
52

53
  public void setInboundCallsInterceptor(WorkflowInboundCallsInterceptor inboundCallsInterceptor) {
54
    this.inboundCallsInterceptor = inboundCallsInterceptor;
1✔
55
  }
1✔
56

57
  public void handleValidateUpdate(
58
      String updateName, String updateId, Optional<Payloads> input, long eventId, Header header) {
59
    WorkflowOutboundCallsInterceptor.UpdateRegistrationRequest handler =
1✔
60
        updateCallbacks.get(updateName);
1✔
61
    Object[] args;
62
    HandlerUnfinishedPolicy policy;
63
    if (handler == null) {
1✔
64
      if (dynamicUpdateHandler == null) {
1✔
65
        throw new IllegalArgumentException(
1✔
66
            "Unknown update name: " + updateName + ", knownTypes=" + updateCallbacks.keySet());
1✔
67
      }
68
      args = new Object[] {new EncodedValues(input, dataConverterWithWorkflowContext)};
×
NEW
69
      policy = dynamicUpdateHandler.getUnfinishedPolicy(updateName);
×
70
    } else {
71
      args =
1✔
72
          dataConverterWithWorkflowContext.fromPayloads(
1✔
73
              input, handler.getArgTypes(), handler.getGenericArgTypes());
1✔
74
      policy = handler.getUnfinishedPolicy();
1✔
75
    }
76
    runningUpdateHandlers.put(updateId, new UpdateHandlerInfo(updateId, updateName, policy));
1✔
77
    try {
78
      inboundCallsInterceptor.validateUpdate(
1✔
79
          new WorkflowInboundCallsInterceptor.UpdateInput(updateName, header, args));
80
    } finally {
81
      runningUpdateHandlers.remove(updateId);
1✔
82
    }
83
  }
1✔
84

85
  public Optional<Payloads> handleExecuteUpdate(
86
      String updateName, String updateId, Optional<Payloads> input, long eventId, Header header) {
87
    WorkflowOutboundCallsInterceptor.UpdateRegistrationRequest handler =
1✔
88
        updateCallbacks.get(updateName);
1✔
89
    Object[] args;
90
    HandlerUnfinishedPolicy policy;
91
    if (handler == null) {
1✔
92
      if (dynamicUpdateHandler == null) {
×
93
        throw new IllegalArgumentException(
×
94
            "Unknown update name: " + updateName + ", knownTypes=" + updateCallbacks.keySet());
×
95
      }
96
      args = new Object[] {new EncodedValues(input, dataConverterWithWorkflowContext)};
×
NEW
97
      policy = dynamicUpdateHandler.getUnfinishedPolicy(updateName);
×
98
    } else {
99
      args =
1✔
100
          dataConverterWithWorkflowContext.fromPayloads(
1✔
101
              input, handler.getArgTypes(), handler.getGenericArgTypes());
1✔
102
      policy = handler.getUnfinishedPolicy();
1✔
103
    }
104

105
    runningUpdateHandlers.put(updateId, new UpdateHandlerInfo(updateId, updateName, policy));
1✔
106
    boolean threadDestroyed = false;
1✔
107
    try {
108
      Object result =
1✔
109
          inboundCallsInterceptor
110
              .executeUpdate(
1✔
111
                  new WorkflowInboundCallsInterceptor.UpdateInput(updateName, header, args))
112
              .getResult();
1✔
113
      return dataConverterWithWorkflowContext.toPayloads(result);
1✔
114
    } catch (DestroyWorkflowThreadError e) {
1✔
115
      threadDestroyed = true;
1✔
116
      throw e;
1✔
117
    } finally {
118
      // If the thread was destroyed the user did not finish the handler
119
      if (!threadDestroyed) {
1✔
120
        runningUpdateHandlers.remove(updateId);
1✔
121
      }
122
    }
123
  }
124

125
  public void registerUpdateHandlers(
126
      WorkflowOutboundCallsInterceptor.RegisterUpdateHandlersInput input) {
127
    for (WorkflowOutboundCallsInterceptor.UpdateRegistrationRequest request : input.getRequests()) {
1✔
128
      String updateName = request.getUpdateName();
1✔
129
      if (updateCallbacks.containsKey(updateName)) {
1✔
130
        throw new IllegalStateException("Update \"" + updateName + "\" is already registered");
×
131
      }
132
      updateCallbacks.put(updateName, request);
1✔
133
    }
1✔
134
  }
1✔
135

136
  public void registerDynamicUpdateHandler(
137
      WorkflowOutboundCallsInterceptor.RegisterDynamicUpdateHandlerInput input) {
138
    dynamicUpdateHandler = input.getHandler();
×
139
  }
×
140

141
  public void handleInterceptedValidateUpdate(UpdateInput input) {
142
    String updateName = input.getUpdateName();
1✔
143
    Object[] args = input.getArguments();
1✔
144
    UpdateRegistrationRequest handler = updateCallbacks.get(updateName);
1✔
145
    if (handler == null) {
1✔
146
      if (dynamicUpdateHandler != null) {
×
147
        dynamicUpdateHandler.handleValidate(updateName, (EncodedValues) args[0]);
×
148
      } else {
149
        throw new IllegalStateException("Unknown update name: " + updateName);
×
150
      }
151
    } else {
152
      handler.getValidateCallback().apply(args);
1✔
153
    }
154
  }
1✔
155

156
  public UpdateOutput handleInterceptedExecuteUpdate(UpdateInput input) {
157
    String updateName = input.getUpdateName();
1✔
158
    Object[] args = input.getArguments();
1✔
159
    UpdateRegistrationRequest handler = updateCallbacks.get(updateName);
1✔
160
    Object result;
161
    if (handler == null) {
1✔
162
      if (dynamicUpdateHandler != null) {
×
163
        result = dynamicUpdateHandler.handleExecute(updateName, (EncodedValues) args[0]);
×
164
      } else {
165
        throw new IllegalStateException("Unknown update name: " + updateName);
×
166
      }
167
    } else {
168
      result = handler.getExecuteCallback().apply(args);
1✔
169
    }
170
    return new WorkflowInboundCallsInterceptor.UpdateOutput(result);
1✔
171
  }
172

173
  public Map<String, UpdateHandlerInfo> getRunningUpdateHandlers() {
174
    return runningUpdateHandlers;
1✔
175
  }
176
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc