• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #157

pending completion
#157

push

github-actions

web-flow
Provide SerializationContext for PayloadConverter and PayloadCodec (#1695)

Issue #1694

497 of 497 new or added lines in 32 files covered. (100.0%)

16942 of 20806 relevant lines covered (81.43%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.63
/temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import io.temporal.api.common.v1.Payloads;
24
import io.temporal.common.converter.DataConverter;
25
import io.temporal.common.converter.DataConverterException;
26
import io.temporal.common.converter.EncodedValues;
27
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
28
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
29
import io.temporal.worker.MetricsType;
30
import io.temporal.workflow.DynamicSignalHandler;
31
import io.temporal.workflow.Workflow;
32
import java.util.ArrayDeque;
33
import java.util.HashMap;
34
import java.util.Map;
35
import java.util.Objects;
36
import java.util.Optional;
37
import java.util.Queue;
38
import org.slf4j.Logger;
39
import org.slf4j.LoggerFactory;
40

41
class SignalDispatcher {
42
  private static final Logger log = LoggerFactory.getLogger(SignalDispatcher.class);
1✔
43

44
  private final DataConverter dataConverterWithWorkflowContext;
45
  private final Map<String, WorkflowOutboundCallsInterceptor.SignalRegistrationRequest>
1✔
46
      signalCallbacks = new HashMap<>();
47

48
  private WorkflowInboundCallsInterceptor inboundCallsInterceptor;
49
  private DynamicSignalHandler dynamicSignalHandler;
50

51
  /** Buffers signals which don't have a registered listener. */
52
  private final Queue<SignalData> signalBuffer = new ArrayDeque<>();
1✔
53

54
  public SignalDispatcher(DataConverter dataConverterWithWorkflowContext) {
1✔
55
    this.dataConverterWithWorkflowContext = dataConverterWithWorkflowContext;
1✔
56
  }
1✔
57

58
  public void setInboundCallsInterceptor(WorkflowInboundCallsInterceptor inboundCallsInterceptor) {
59
    this.inboundCallsInterceptor = inboundCallsInterceptor;
1✔
60
  }
1✔
61

62
  /** Called from the interceptor tail */
63
  public void handleInterceptedSignal(WorkflowInboundCallsInterceptor.SignalInput input) {
64
    String signalName = input.getSignalName();
1✔
65
    Object[] args = input.getArguments();
1✔
66
    WorkflowOutboundCallsInterceptor.SignalRegistrationRequest handler =
1✔
67
        signalCallbacks.get(signalName);
1✔
68
    if (handler == null) {
1✔
69
      if (dynamicSignalHandler != null) {
×
70
        dynamicSignalHandler.handle(signalName, (EncodedValues) args[0]);
×
71
        return;
×
72
      }
73
      throw new IllegalStateException("Unknown signal type: " + signalName);
×
74
    } else {
75
      handler.getCallback().apply(args);
1✔
76
    }
77
  }
1✔
78

79
  public void handleSignal(String signalName, Optional<Payloads> input, long eventId) {
80
    WorkflowOutboundCallsInterceptor.SignalRegistrationRequest handler =
1✔
81
        signalCallbacks.get(signalName);
1✔
82
    Object[] args;
83
    if (handler == null) {
1✔
84
      if (dynamicSignalHandler == null) {
1✔
85
        signalBuffer.add(new SignalData(signalName, input, eventId));
1✔
86
        return;
1✔
87
      }
88
      args = new Object[] {new EncodedValues(input, dataConverterWithWorkflowContext)};
×
89
    } else {
90
      try {
91
        args =
1✔
92
            DataConverter.arrayFromPayloads(
1✔
93
                dataConverterWithWorkflowContext,
94
                input,
95
                handler.getArgTypes(),
1✔
96
                handler.getGenericArgTypes());
1✔
97
      } catch (DataConverterException e) {
1✔
98
        logSerializationException(signalName, eventId, e);
1✔
99
        return;
1✔
100
      }
1✔
101
    }
102
    inboundCallsInterceptor.handleSignal(
1✔
103
        new WorkflowInboundCallsInterceptor.SignalInput(signalName, args, eventId));
104
  }
1✔
105

106
  public void registerSignalHandlers(
107
      WorkflowOutboundCallsInterceptor.RegisterSignalHandlersInput input) {
108
    for (WorkflowOutboundCallsInterceptor.SignalRegistrationRequest request : input.getRequests()) {
1✔
109
      String signalType = request.getSignalType();
1✔
110
      if (signalCallbacks.containsKey(signalType)) {
1✔
111
        throw new IllegalStateException("Signal \"" + signalType + "\" is already registered");
×
112
      }
113
      signalCallbacks.put(signalType, request);
1✔
114
    }
1✔
115
    for (SignalData signalData : signalBuffer) {
1✔
116
      handleSignal(signalData.getSignalName(), signalData.getPayload(), signalData.getEventId());
1✔
117
    }
1✔
118
  }
1✔
119

120
  public void registerDynamicSignalHandler(
121
      WorkflowOutboundCallsInterceptor.RegisterDynamicSignalHandlerInput input) {
122
    dynamicSignalHandler = input.getHandler();
1✔
123
    for (SignalData signalData : signalBuffer) {
1✔
124
      dynamicSignalHandler.handle(
1✔
125
          signalData.getSignalName(),
1✔
126
          new EncodedValues(signalData.getPayload(), dataConverterWithWorkflowContext));
1✔
127
    }
1✔
128
  }
1✔
129

130
  private void logSerializationException(
131
      String signalName, Long eventId, DataConverterException exception) {
132
    log.error(
1✔
133
        "Failure deserializing signal input for \""
134
            + signalName
135
            + "\" at eventId "
136
            + eventId
137
            + ". Dropping it.",
138
        exception);
139
    Workflow.getMetricsScope().counter(MetricsType.CORRUPTED_SIGNALS_COUNTER).inc(1);
1✔
140
  }
1✔
141

142
  private static class SignalData {
143
    private final String signalName;
144
    private final Optional<Payloads> payload;
145
    private final long eventId;
146

147
    private SignalData(String signalName, Optional<Payloads> payload, long eventId) {
1✔
148
      this.signalName = Objects.requireNonNull(signalName);
1✔
149
      this.payload = Objects.requireNonNull(payload);
1✔
150
      this.eventId = eventId;
1✔
151
    }
1✔
152

153
    public String getSignalName() {
154
      return signalName;
1✔
155
    }
156

157
    public Optional<Payloads> getPayload() {
158
      return payload;
1✔
159
    }
160

161
    public long getEventId() {
162
      return eventId;
1✔
163
    }
164
  }
165
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc