• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #157

pending completion
#157

push

github-actions

web-flow
Provide SerializationContext for PayloadConverter and PayloadCodec (#1695)

Issue #1694

497 of 497 new or added lines in 32 files covered. (100.0%)

16942 of 20806 relevant lines covered (81.43%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.26
/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import io.temporal.internal.activity.ActivityExecutionContextFactory;
24
import io.temporal.internal.activity.ActivityExecutionContextFactoryImpl;
25
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
26
import io.temporal.serviceclient.WorkflowServiceStubs;
27
import java.time.Duration;
28
import java.util.concurrent.CompletableFuture;
29
import java.util.concurrent.Executors;
30
import java.util.concurrent.ScheduledExecutorService;
31
import java.util.concurrent.TimeUnit;
32
import org.slf4j.Logger;
33
import org.slf4j.LoggerFactory;
34

35
/** Activity worker that supports POJO activity implementations. */
36
public class SyncActivityWorker implements SuspendableWorker {
37
  private static final Logger log = LoggerFactory.getLogger(SyncActivityWorker.class);
1✔
38

39
  private final String identity;
40
  private final String namespace;
41
  private final String taskQueue;
42

43
  private final ScheduledExecutorService heartbeatExecutor;
44
  private final ActivityTaskHandlerImpl taskHandler;
45
  private final ActivityWorker worker;
46

47
  public SyncActivityWorker(
48
      WorkflowServiceStubs service,
49
      String namespace,
50
      String taskQueue,
51
      double taskQueueActivitiesPerSecond,
52
      SingleWorkerOptions options) {
1✔
53
    this.identity = options.getIdentity();
1✔
54
    this.namespace = namespace;
1✔
55
    this.taskQueue = taskQueue;
1✔
56

57
    this.heartbeatExecutor =
1✔
58
        Executors.newScheduledThreadPool(
1✔
59
            4,
60
            new ExecutorThreadFactory(
61
                WorkerThreadsNameHelper.getActivityHeartbeatThreadPrefix(namespace, taskQueue),
1✔
62
                // TODO we currently don't have an uncaught exception handler to pass here on
63
                // options,
64
                // the closest thing is options.getPollerOptions().getUncaughtExceptionHandler(),
65
                // but it's pollerOptions, not heartbeat.
66
                null));
67
    ActivityExecutionContextFactory activityExecutionContextFactory =
1✔
68
        new ActivityExecutionContextFactoryImpl(
69
            service,
70
            identity,
71
            namespace,
72
            options.getMaxHeartbeatThrottleInterval(),
1✔
73
            options.getDefaultHeartbeatThrottleInterval(),
1✔
74
            options.getDataConverter(),
1✔
75
            heartbeatExecutor);
76
    this.taskHandler =
1✔
77
        new ActivityTaskHandlerImpl(
78
            namespace,
79
            taskQueue,
80
            options.getDataConverter(),
1✔
81
            activityExecutionContextFactory,
82
            options.getWorkerInterceptors(),
1✔
83
            options.getContextPropagators());
1✔
84
    this.worker =
1✔
85
        new ActivityWorker(
86
            service, namespace, taskQueue, taskQueueActivitiesPerSecond, options, taskHandler);
87
  }
1✔
88

89
  public void registerActivityImplementations(Object... activitiesImplementation) {
90
    taskHandler.registerActivityImplementations(activitiesImplementation);
1✔
91
  }
1✔
92

93
  @Override
94
  public boolean start() {
95
    return worker.start();
1✔
96
  }
97

98
  @Override
99
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
100
    return shutdownManager
1✔
101
        // we want to shutdown heartbeatExecutor before activity worker, so in-flight activities
102
        // could get an ActivityWorkerShutdownException from their heartbeat
103
        .shutdownExecutor(heartbeatExecutor, this + "#heartbeatExecutor", Duration.ofSeconds(5))
1✔
104
        .thenCompose(r -> worker.shutdown(shutdownManager, interruptTasks))
1✔
105
        .exceptionally(
1✔
106
            e -> {
107
              log.error("[BUG] Unexpected exception during shutdown", e);
×
108
              return null;
×
109
            });
110
  }
111

112
  @Override
113
  public void awaitTermination(long timeout, TimeUnit unit) {
114
    long timeoutMillis = unit.toMillis(timeout);
1✔
115
    timeoutMillis = ShutdownManager.awaitTermination(worker, timeoutMillis);
1✔
116
    ShutdownManager.awaitTermination(heartbeatExecutor, timeoutMillis);
1✔
117
  }
1✔
118

119
  @Override
120
  public void suspendPolling() {
121
    worker.suspendPolling();
1✔
122
  }
1✔
123

124
  @Override
125
  public void resumePolling() {
126
    worker.resumePolling();
1✔
127
  }
1✔
128

129
  @Override
130
  public boolean isSuspended() {
131
    return worker.isSuspended();
1✔
132
  }
133

134
  @Override
135
  public boolean isShutdown() {
136
    return worker.isShutdown();
×
137
  }
138

139
  @Override
140
  public boolean isTerminated() {
141
    return worker.isTerminated() && heartbeatExecutor.isTerminated();
×
142
  }
143

144
  @Override
145
  public WorkerLifecycleState getLifecycleState() {
146
    WorkerLifecycleState lifecycleState = worker.getLifecycleState();
×
147
    if (WorkerLifecycleState.TERMINATED.equals(lifecycleState)) {
×
148
      // return TERMINATED only if both worker and heartbeatExecutor are terminated
149
      return heartbeatExecutor.isTerminated()
×
150
          ? WorkerLifecycleState.TERMINATED
×
151
          : WorkerLifecycleState.SHUTDOWN;
×
152
    } else {
153
      return lifecycleState;
×
154
    }
155
  }
156

157
  public EagerActivityDispatcher getEagerActivityDispatcher() {
158
    return this.worker.getEagerActivityDispatcher();
1✔
159
  }
160

161
  @Override
162
  public String toString() {
163
    return String.format(
1✔
164
        "SyncActivityWorker{namespace=%s, taskQueue=%s, identity=%s}",
165
        namespace, taskQueue, identity);
166
  }
167
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc