• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #157

pending completion
#157

push

github-actions

web-flow
Provide SerializationContext for PayloadConverter and PayloadCodec (#1695)

Issue #1694

497 of 497 new or added lines in 32 files covered. (100.0%)

16942 of 20806 relevant lines covered (81.43%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.12
/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.activity;
22

23
import com.uber.m3.tally.Scope;
24
import io.grpc.Status;
25
import io.grpc.StatusRuntimeException;
26
import io.temporal.activity.ActivityExecutionContext;
27
import io.temporal.activity.ActivityInfo;
28
import io.temporal.api.common.v1.Payloads;
29
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatResponse;
30
import io.temporal.client.*;
31
import io.temporal.common.converter.DataConverter;
32
import io.temporal.internal.client.ActivityClientHelper;
33
import io.temporal.payload.context.ActivitySerializationContext;
34
import io.temporal.serviceclient.WorkflowServiceStubs;
35
import java.lang.reflect.Type;
36
import java.time.Duration;
37
import java.util.Optional;
38
import java.util.concurrent.ScheduledExecutorService;
39
import java.util.concurrent.ScheduledFuture;
40
import java.util.concurrent.TimeUnit;
41
import java.util.concurrent.locks.Lock;
42
import java.util.concurrent.locks.ReentrantLock;
43
import javax.annotation.concurrent.ThreadSafe;
44
import org.slf4j.Logger;
45
import org.slf4j.LoggerFactory;
46

47
@ThreadSafe
48
class HeartbeatContextImpl implements HeartbeatContext {
49
  private static final Logger log = LoggerFactory.getLogger(HeartbeatContextImpl.class);
1✔
50
  private static final long HEARTBEAT_RETRY_WAIT_MILLIS = 1000;
51

52
  private final Lock lock = new ReentrantLock();
1✔
53

54
  private final WorkflowServiceStubs service;
55
  private final String namespace;
56
  private final ActivityInfo info;
57
  private final String identity;
58
  private final ScheduledExecutorService heartbeatExecutor;
59
  private final long heartbeatIntervalMillis;
60
  private final DataConverter dataConverter;
61
  private final DataConverter dataConverterWithActivityContext;
62

63
  private final Scope metricsScope;
64
  private final Optional<Payloads> prevAttemptHeartbeatDetails;
65

66
  // turned into true on a reception of the first heartbeat
67
  private boolean receivedAHeartbeat = false;
1✔
68
  private Object lastDetails;
69
  private boolean hasOutstandingHeartbeat;
70
  private ScheduledFuture<?> scheduledHeartbeat;
71

72
  private ActivityCompletionException lastException;
73

74
  public HeartbeatContextImpl(
75
      WorkflowServiceStubs service,
76
      String namespace,
77
      ActivityInfo info,
78
      DataConverter dataConverter,
79
      ScheduledExecutorService heartbeatExecutor,
80
      Scope metricsScope,
81
      String identity,
82
      Duration maxHeartbeatThrottleInterval,
83
      Duration defaultHeartbeatThrottleInterval) {
1✔
84
    this.service = service;
1✔
85
    this.metricsScope = metricsScope;
1✔
86
    this.dataConverter = dataConverter;
1✔
87
    this.dataConverterWithActivityContext =
1✔
88
        dataConverter.withContext(
1✔
89
            new ActivitySerializationContext(
90
                namespace,
91
                info.getWorkflowId(),
1✔
92
                info.getWorkflowType(),
1✔
93
                info.getActivityType(),
1✔
94
                info.getActivityTaskQueue(),
1✔
95
                info.isLocal()));
1✔
96
    this.namespace = namespace;
1✔
97
    this.info = info;
1✔
98
    this.identity = identity;
1✔
99
    this.prevAttemptHeartbeatDetails = info.getHeartbeatDetails();
1✔
100
    this.heartbeatExecutor = heartbeatExecutor;
1✔
101
    this.heartbeatIntervalMillis =
1✔
102
        getHeartbeatIntervalMs(
1✔
103
            info.getHeartbeatTimeout(),
1✔
104
            maxHeartbeatThrottleInterval,
105
            defaultHeartbeatThrottleInterval);
106
  }
1✔
107

108
  /**
109
   * @see ActivityExecutionContext#heartbeat(Object)
110
   */
111
  @Override
112
  public <V> void heartbeat(V details) throws ActivityCompletionException {
113
    if (heartbeatExecutor.isShutdown()) {
1✔
114
      throw new ActivityWorkerShutdownException(info);
1✔
115
    }
116
    lock.lock();
1✔
117
    try {
118
      receivedAHeartbeat = true;
1✔
119
      lastDetails = details;
1✔
120
      hasOutstandingHeartbeat = true;
1✔
121
      // Only do sync heartbeat if there is no such call scheduled.
122
      if (scheduledHeartbeat == null) {
1✔
123
        doHeartBeatLocked(details);
1✔
124
      }
125
      if (lastException != null) {
1✔
126
        throw lastException;
1✔
127
      }
128
    } finally {
129
      lock.unlock();
1✔
130
    }
131
  }
1✔
132

133
  /**
134
   * @see ActivityExecutionContext#getHeartbeatDetails(Class, Type)
135
   */
136
  @Override
137
  @SuppressWarnings("unchecked")
138
  public <V> Optional<V> getHeartbeatDetails(Class<V> detailsClass, Type detailsGenericType) {
139
    lock.lock();
1✔
140
    try {
141
      if (receivedAHeartbeat) {
1✔
142
        return Optional.ofNullable((V) this.lastDetails);
1✔
143
      } else {
144
        return Optional.ofNullable(
1✔
145
            dataConverterWithActivityContext.fromPayloads(
1✔
146
                0, prevAttemptHeartbeatDetails, detailsClass, detailsGenericType));
147
      }
148
    } finally {
149
      lock.unlock();
1✔
150
    }
151
  }
152

153
  private void doHeartBeatLocked(Object details) {
154
    long nextHeartbeatDelay;
155
    try {
156
      sendHeartbeatRequest(details);
1✔
157
      hasOutstandingHeartbeat = false;
1✔
158
      nextHeartbeatDelay = heartbeatIntervalMillis;
1✔
159
    } catch (StatusRuntimeException e) {
1✔
160
      // Not rethrowing to not fail activity implementation on intermittent connection or Temporal
161
      // errors.
162
      log.warn("Heartbeat failed", e);
1✔
163
      nextHeartbeatDelay = HEARTBEAT_RETRY_WAIT_MILLIS;
1✔
164
    } catch (Exception e) {
×
165
      log.error("Unexpected exception", e);
×
166
      nextHeartbeatDelay = HEARTBEAT_RETRY_WAIT_MILLIS;
×
167
    }
1✔
168

169
    scheduleNextHeartbeatLocked(nextHeartbeatDelay);
1✔
170
  }
1✔
171

172
  private void scheduleNextHeartbeatLocked(long delay) {
173
    scheduledHeartbeat =
1✔
174
        heartbeatExecutor.schedule(
1✔
175
            () -> {
176
              lock.lock();
1✔
177
              try {
178
                if (hasOutstandingHeartbeat) {
1✔
179
                  doHeartBeatLocked(lastDetails);
1✔
180
                } else {
181
                  // if no new heartbeats have been submitted in the previous time interval, we
182
                  // don't need to throttle
183
                  // and the next heartbeat should go immediately without following a schedule.
184
                  scheduledHeartbeat = null;
1✔
185
                }
186
              } finally {
187
                lock.unlock();
1✔
188
              }
189
            },
1✔
190
            delay,
191
            TimeUnit.MILLISECONDS);
192
  }
1✔
193

194
  private void sendHeartbeatRequest(Object details) {
195
    try {
196
      RecordActivityTaskHeartbeatResponse status =
1✔
197
          ActivityClientHelper.sendHeartbeatRequest(
1✔
198
              service,
199
              namespace,
200
              identity,
201
              info.getTaskToken(),
1✔
202
              dataConverterWithActivityContext.toPayloads(details),
1✔
203
              metricsScope);
204
      if (status.getCancelRequested()) {
1✔
205
        lastException = new ActivityCanceledException(info);
1✔
206
      } else {
207
        lastException = null;
1✔
208
      }
209
    } catch (StatusRuntimeException e) {
1✔
210
      if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
1✔
211
        lastException = new ActivityNotExistsException(info, e);
1✔
212
      } else if (e.getStatus().getCode() == Status.Code.INVALID_ARGUMENT
1✔
213
          || e.getStatus().getCode() == Status.Code.FAILED_PRECONDITION) {
1✔
214
        lastException = new ActivityCompletionFailureException(info, e);
×
215
      } else {
216
        throw e;
1✔
217
      }
218
    }
1✔
219
  }
1✔
220

221
  private static long getHeartbeatIntervalMs(
222
      Duration activityHeartbeatTimeout,
223
      Duration maxHeartbeatThrottleInterval,
224
      Duration defaultHeartbeatThrottleInterval) {
225
    long interval =
226
        activityHeartbeatTimeout.isZero()
1✔
227
            ? defaultHeartbeatThrottleInterval.toMillis()
1✔
228
            : (long) (0.8 * activityHeartbeatTimeout.toMillis());
1✔
229
    return Math.min(interval, maxHeartbeatThrottleInterval.toMillis());
1✔
230
  }
231
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc