• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #188

25 Sep 2023 04:42PM UTC coverage: 77.369% (-0.3%) from 77.663%
#188

push

github-actions

web-flow
Fix null pointer on trigger immediately (#1865)

4 of 4 new or added lines in 1 file covered. (100.0%)

18670 of 24131 relevant lines covered (77.37%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.3
/temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import io.temporal.api.common.v1.Payloads;
24
import io.temporal.common.converter.DataConverter;
25
import io.temporal.common.converter.DataConverterException;
26
import io.temporal.common.converter.EncodedValues;
27
import io.temporal.common.interceptors.Header;
28
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
29
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
30
import io.temporal.worker.MetricsType;
31
import io.temporal.workflow.DynamicSignalHandler;
32
import io.temporal.workflow.Workflow;
33
import java.util.ArrayDeque;
34
import java.util.HashMap;
35
import java.util.Map;
36
import java.util.Objects;
37
import java.util.Optional;
38
import java.util.Queue;
39
import org.slf4j.Logger;
40
import org.slf4j.LoggerFactory;
41

42
class SignalDispatcher {
43
  private static final Logger log = LoggerFactory.getLogger(SignalDispatcher.class);
1✔
44

45
  private final DataConverter dataConverterWithWorkflowContext;
46
  private final Map<String, WorkflowOutboundCallsInterceptor.SignalRegistrationRequest>
1✔
47
      signalCallbacks = new HashMap<>();
48

49
  private WorkflowInboundCallsInterceptor inboundCallsInterceptor;
50
  private DynamicSignalHandler dynamicSignalHandler;
51

52
  /** Buffers signals which don't have a registered listener. */
53
  private final Queue<SignalData> signalBuffer = new ArrayDeque<>();
1✔
54

55
  public SignalDispatcher(DataConverter dataConverterWithWorkflowContext) {
1✔
56
    this.dataConverterWithWorkflowContext = dataConverterWithWorkflowContext;
1✔
57
  }
1✔
58

59
  public void setInboundCallsInterceptor(WorkflowInboundCallsInterceptor inboundCallsInterceptor) {
60
    this.inboundCallsInterceptor = inboundCallsInterceptor;
1✔
61
  }
1✔
62

63
  /** Called from the interceptor tail */
64
  public void handleInterceptedSignal(WorkflowInboundCallsInterceptor.SignalInput input) {
65
    String signalName = input.getSignalName();
1✔
66
    Object[] args = input.getArguments();
1✔
67
    WorkflowOutboundCallsInterceptor.SignalRegistrationRequest handler =
1✔
68
        signalCallbacks.get(signalName);
1✔
69
    if (handler == null) {
1✔
70
      if (dynamicSignalHandler != null) {
×
71
        dynamicSignalHandler.handle(signalName, (EncodedValues) args[0]);
×
72
        return;
×
73
      }
74
      throw new IllegalStateException("Unknown signal type: " + signalName);
×
75
    } else {
76
      handler.getCallback().apply(args);
1✔
77
    }
78
  }
1✔
79

80
  public void handleSignal(
81
      String signalName, Optional<Payloads> input, long eventId, Header header) {
82
    WorkflowOutboundCallsInterceptor.SignalRegistrationRequest handler =
1✔
83
        signalCallbacks.get(signalName);
1✔
84
    Object[] args;
85
    if (handler == null) {
1✔
86
      if (dynamicSignalHandler == null) {
1✔
87
        signalBuffer.add(new SignalData(signalName, input, eventId, header));
1✔
88
        return;
1✔
89
      }
90
      args = new Object[] {new EncodedValues(input, dataConverterWithWorkflowContext)};
×
91
    } else {
92
      try {
93
        args =
1✔
94
            dataConverterWithWorkflowContext.fromPayloads(
1✔
95
                input, handler.getArgTypes(), handler.getGenericArgTypes());
1✔
96
      } catch (DataConverterException e) {
1✔
97
        logSerializationException(signalName, eventId, e);
1✔
98
        return;
1✔
99
      }
1✔
100
    }
101
    inboundCallsInterceptor.handleSignal(
1✔
102
        new WorkflowInboundCallsInterceptor.SignalInput(signalName, args, eventId, header));
103
  }
1✔
104

105
  public void registerSignalHandlers(
106
      WorkflowOutboundCallsInterceptor.RegisterSignalHandlersInput input) {
107
    for (WorkflowOutboundCallsInterceptor.SignalRegistrationRequest request : input.getRequests()) {
1✔
108
      String signalType = request.getSignalType();
1✔
109
      if (signalCallbacks.containsKey(signalType)) {
1✔
110
        throw new IllegalStateException("Signal \"" + signalType + "\" is already registered");
×
111
      }
112
      signalCallbacks.put(signalType, request);
1✔
113
    }
1✔
114
    for (SignalData signalData : signalBuffer) {
1✔
115
      handleSignal(
1✔
116
          signalData.getSignalName(),
1✔
117
          signalData.getPayload(),
1✔
118
          signalData.getEventId(),
1✔
119
          signalData.getHeader());
1✔
120
    }
1✔
121
  }
1✔
122

123
  public void registerDynamicSignalHandler(
124
      WorkflowOutboundCallsInterceptor.RegisterDynamicSignalHandlerInput input) {
125
    dynamicSignalHandler = input.getHandler();
1✔
126
    for (SignalData signalData : signalBuffer) {
1✔
127
      dynamicSignalHandler.handle(
1✔
128
          signalData.getSignalName(),
1✔
129
          new EncodedValues(signalData.getPayload(), dataConverterWithWorkflowContext));
1✔
130
    }
1✔
131
  }
1✔
132

133
  private void logSerializationException(
134
      String signalName, Long eventId, DataConverterException exception) {
135
    log.error(
1✔
136
        "Failure deserializing signal input for \""
137
            + signalName
138
            + "\" at eventId "
139
            + eventId
140
            + ". Dropping it.",
141
        exception);
142
    Workflow.getMetricsScope().counter(MetricsType.CORRUPTED_SIGNALS_COUNTER).inc(1);
1✔
143
  }
1✔
144

145
  private static class SignalData {
146
    private final String signalName;
147
    private final Optional<Payloads> payload;
148
    private final long eventId;
149
    private final Header header;
150

151
    private SignalData(String signalName, Optional<Payloads> payload, long eventId, Header header) {
1✔
152
      this.signalName = Objects.requireNonNull(signalName);
1✔
153
      this.payload = Objects.requireNonNull(payload);
1✔
154
      this.eventId = eventId;
1✔
155
      this.header = header;
1✔
156
    }
1✔
157

158
    public String getSignalName() {
159
      return signalName;
1✔
160
    }
161

162
    public Optional<Payloads> getPayload() {
163
      return payload;
1✔
164
    }
165

166
    public long getEventId() {
167
      return eventId;
1✔
168
    }
169

170
    public Header getHeader() {
171
      return header;
1✔
172
    }
173
  }
174
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc