• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #299

12 Aug 2024 06:16AM UTC coverage: 77.696% (+0.005%) from 77.691%
#299

push

github

web-flow
Warn on dangling handlers and add method to help await on all handlers. (#2144)

Warn on dangling handlers and add method to help
await on all handlers

96 of 108 new or added lines in 14 files covered. (88.89%)

9 existing lines in 3 files now uncovered.

19961 of 25691 relevant lines covered (77.7%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.25
/temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import io.temporal.api.common.v1.Payloads;
24
import io.temporal.common.converter.DataConverter;
25
import io.temporal.common.converter.DataConverterException;
26
import io.temporal.common.converter.EncodedValues;
27
import io.temporal.common.interceptors.Header;
28
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
29
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
30
import io.temporal.worker.MetricsType;
31
import io.temporal.workflow.DynamicSignalHandler;
32
import io.temporal.workflow.HandlerUnfinishedPolicy;
33
import io.temporal.workflow.Workflow;
34
import java.util.*;
35
import org.slf4j.Logger;
36
import org.slf4j.LoggerFactory;
37

38
class SignalDispatcher {
39
  private static final Logger log = LoggerFactory.getLogger(SignalDispatcher.class);
1✔
40

41
  private final DataConverter dataConverterWithWorkflowContext;
42
  private final Map<String, WorkflowOutboundCallsInterceptor.SignalRegistrationRequest>
1✔
43
      signalCallbacks = new HashMap<>();
44

45
  private WorkflowInboundCallsInterceptor inboundCallsInterceptor;
46
  private DynamicSignalHandler dynamicSignalHandler;
47

48
  /** Buffers signals which don't have a registered listener. */
49
  private final Queue<SignalData> signalBuffer = new ArrayDeque<>();
1✔
50

51
  private Map<Long, SignalHandlerInfo> runningSignalHandlers = new LinkedHashMap<>();
1✔
52

53
  public SignalDispatcher(DataConverter dataConverterWithWorkflowContext) {
1✔
54
    this.dataConverterWithWorkflowContext = dataConverterWithWorkflowContext;
1✔
55
  }
1✔
56

57
  public void setInboundCallsInterceptor(WorkflowInboundCallsInterceptor inboundCallsInterceptor) {
58
    this.inboundCallsInterceptor = inboundCallsInterceptor;
1✔
59
  }
1✔
60

61
  /** Called from the interceptor tail */
62
  public void handleInterceptedSignal(WorkflowInboundCallsInterceptor.SignalInput input) {
63
    String signalName = input.getSignalName();
1✔
64
    Object[] args = input.getArguments();
1✔
65
    WorkflowOutboundCallsInterceptor.SignalRegistrationRequest handler =
1✔
66
        signalCallbacks.get(signalName);
1✔
67
    if (handler == null) {
1✔
68
      if (dynamicSignalHandler != null) {
×
69
        dynamicSignalHandler.handle(signalName, (EncodedValues) args[0]);
×
70
        return;
×
71
      }
72
      throw new IllegalStateException("Unknown signal type: " + signalName);
×
73
    } else {
74
      handler.getCallback().apply(args);
1✔
75
    }
76
  }
1✔
77

78
  public Map<Long, SignalHandlerInfo> getRunningSignalHandlers() {
79
    return runningSignalHandlers;
1✔
80
  }
81

82
  public void handleSignal(
83
      String signalName, Optional<Payloads> input, long eventId, Header header) {
84
    WorkflowOutboundCallsInterceptor.SignalRegistrationRequest handler =
1✔
85
        signalCallbacks.get(signalName);
1✔
86
    Object[] args;
87
    HandlerUnfinishedPolicy policy;
88
    if (handler == null) {
1✔
89
      if (dynamicSignalHandler == null) {
1✔
90
        signalBuffer.add(new SignalData(signalName, input, eventId, header));
1✔
91
        return;
1✔
92
      }
93
      args = new Object[] {new EncodedValues(input, dataConverterWithWorkflowContext)};
×
NEW
94
      policy = dynamicSignalHandler.getUnfinishedPolicy(signalName);
×
95
    } else {
96
      try {
97
        args =
1✔
98
            dataConverterWithWorkflowContext.fromPayloads(
1✔
99
                input, handler.getArgTypes(), handler.getGenericArgTypes());
1✔
100
      } catch (DataConverterException e) {
1✔
101
        logSerializationException(signalName, eventId, e);
1✔
102
        return;
1✔
103
      }
1✔
104
      policy = handler.getUnfinishedPolicy();
1✔
105
    }
106
    // Track the signal handler
107
    boolean threadDestroyed = false;
1✔
108
    runningSignalHandlers.put(eventId, new SignalHandlerInfo(eventId, signalName, policy));
1✔
109
    try {
110
      inboundCallsInterceptor.handleSignal(
1✔
111
          new WorkflowInboundCallsInterceptor.SignalInput(signalName, args, eventId, header));
112
    } catch (DestroyWorkflowThreadError e) {
1✔
113
      threadDestroyed = true;
1✔
114
      throw e;
1✔
115
    } finally {
116
      // If the thread was destroyed the user did not finish the handler
117
      if (!threadDestroyed) {
1✔
118
        runningSignalHandlers.remove(eventId);
1✔
119
      }
120
    }
121
  }
1✔
122

123
  public void registerSignalHandlers(
124
      WorkflowOutboundCallsInterceptor.RegisterSignalHandlersInput input) {
125
    for (WorkflowOutboundCallsInterceptor.SignalRegistrationRequest request : input.getRequests()) {
1✔
126
      String signalType = request.getSignalType();
1✔
127
      if (signalCallbacks.containsKey(signalType)) {
1✔
128
        throw new IllegalStateException("Signal \"" + signalType + "\" is already registered");
×
129
      }
130
      signalCallbacks.put(signalType, request);
1✔
131
    }
1✔
132
    for (SignalData signalData : signalBuffer) {
1✔
133
      handleSignal(
1✔
134
          signalData.getSignalName(),
1✔
135
          signalData.getPayload(),
1✔
136
          signalData.getEventId(),
1✔
137
          signalData.getHeader());
1✔
138
    }
1✔
139
  }
1✔
140

141
  public void registerDynamicSignalHandler(
142
      WorkflowOutboundCallsInterceptor.RegisterDynamicSignalHandlerInput input) {
143
    dynamicSignalHandler = input.getHandler();
1✔
144
    for (SignalData signalData : signalBuffer) {
1✔
145
      dynamicSignalHandler.handle(
1✔
146
          signalData.getSignalName(),
1✔
147
          new EncodedValues(signalData.getPayload(), dataConverterWithWorkflowContext));
1✔
148
    }
1✔
149
  }
1✔
150

151
  private void logSerializationException(
152
      String signalName, Long eventId, DataConverterException exception) {
153
    log.error(
1✔
154
        "Failure deserializing signal input for \""
155
            + signalName
156
            + "\" at eventId "
157
            + eventId
158
            + ". Dropping it.",
159
        exception);
160
    Workflow.getMetricsScope().counter(MetricsType.CORRUPTED_SIGNALS_COUNTER).inc(1);
1✔
161
  }
1✔
162

163
  private static class SignalData {
164
    private final String signalName;
165
    private final Optional<Payloads> payload;
166
    private final long eventId;
167
    private final Header header;
168

169
    private SignalData(String signalName, Optional<Payloads> payload, long eventId, Header header) {
1✔
170
      this.signalName = Objects.requireNonNull(signalName);
1✔
171
      this.payload = Objects.requireNonNull(payload);
1✔
172
      this.eventId = eventId;
1✔
173
      this.header = header;
1✔
174
    }
1✔
175

176
    public String getSignalName() {
177
      return signalName;
1✔
178
    }
179

180
    public Optional<Payloads> getPayload() {
181
      return payload;
1✔
182
    }
183

184
    public long getEventId() {
185
      return eventId;
1✔
186
    }
187

188
    public Header getHeader() {
189
      return header;
1✔
190
    }
191
  }
192
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc