• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #173

pending completion
#173

push

github-actions

web-flow
Issue 1761 fix supplier reuse (#1779)

For WorkflowLocals and WorkflowThreadLocals, disambiguate between variables that are null and those that have not been set. Invoke the supplier for local values at most once.

21 of 21 new or added lines in 4 files covered. (100.0%)

18051 of 22100 relevant lines covered (81.68%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.42
/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowExecutorCache.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.internal.common.WorkflowExecutionUtils.isFullHistory;
24

25
import com.google.common.base.Preconditions;
26
import com.google.common.cache.Cache;
27
import com.google.common.cache.CacheBuilder;
28
import com.uber.m3.tally.Scope;
29
import io.temporal.api.common.v1.WorkflowExecution;
30
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
31
import io.temporal.internal.replay.WorkflowRunTaskHandler;
32
import io.temporal.worker.MetricsType;
33
import java.util.Objects;
34
import java.util.concurrent.Callable;
35
import javax.annotation.Nullable;
36
import javax.annotation.concurrent.ThreadSafe;
37
import org.slf4j.Logger;
38
import org.slf4j.LoggerFactory;
39

40
@ThreadSafe
41
public final class WorkflowExecutorCache {
42
  private final Logger log = LoggerFactory.getLogger(WorkflowExecutorCache.class);
1✔
43
  private final WorkflowRunLockManager runLockManager;
44
  private final Cache<String, WorkflowRunTaskHandler> cache;
45
  private final Scope metricsScope;
46

47
  public WorkflowExecutorCache(
48
      int workflowCacheSize, WorkflowRunLockManager runLockManager, Scope scope) {
1✔
49
    Preconditions.checkArgument(workflowCacheSize > 0, "Max cache size must be greater than 0");
1✔
50
    this.runLockManager = runLockManager;
1✔
51
    this.cache =
1✔
52
        CacheBuilder.newBuilder()
1✔
53
            .maximumSize(workflowCacheSize)
1✔
54
            // TODO this number is taken out of the blue.
55
            //  This number should be calculated based on the number of all workers workflow task
56
            //  processors.
57
            .concurrencyLevel(128)
1✔
58
            .removalListener(
1✔
59
                e -> {
60
                  WorkflowRunTaskHandler entry = (WorkflowRunTaskHandler) e.getValue();
1✔
61
                  if (entry != null) {
1✔
62
                    try {
63
                      log.trace(
1✔
64
                          "Closing workflow execution for runId {}, cause {}",
65
                          e.getKey(),
1✔
66
                          e.getCause());
1✔
67
                      entry.close();
1✔
68
                      log.trace("Workflow execution for runId {} closed", e);
1✔
69
                    } catch (Throwable t) {
×
70
                      log.error("Workflow execution closure failed with an exception", t);
×
71
                      throw t;
×
72
                    }
1✔
73
                  }
74
                })
1✔
75
            .build();
1✔
76
    this.metricsScope = Objects.requireNonNull(scope);
1✔
77
    this.metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
1✔
78
  }
1✔
79

80
  public WorkflowRunTaskHandler getOrCreate(
81
      PollWorkflowTaskQueueResponseOrBuilder workflowTask,
82
      Scope workflowTypeScope,
83
      Callable<WorkflowRunTaskHandler> workflowExecutorFn)
84
      throws Exception {
85
    WorkflowExecution execution = workflowTask.getWorkflowExecution();
1✔
86
    String runId = execution.getRunId();
1✔
87
    if (isFullHistory(workflowTask)) {
1✔
88
      invalidate(execution, metricsScope, "full history", null);
1✔
89
      log.trace(
1✔
90
          "New Workflow Executor {}-{} has been created for a full history run",
91
          execution.getWorkflowId(),
1✔
92
          runId);
93
      return workflowExecutorFn.call();
1✔
94
    }
95

96
    @Nullable WorkflowRunTaskHandler workflowRunTaskHandler = cache.getIfPresent(runId);
1✔
97

98
    if (workflowRunTaskHandler != null) {
1✔
99
      workflowTypeScope.counter(MetricsType.STICKY_CACHE_HIT).inc(1);
1✔
100
      return workflowRunTaskHandler;
1✔
101
    }
102

103
    log.trace(
1✔
104
        "Workflow Executor {}-{} wasn't found in cache and a new executor has been created",
105
        execution.getWorkflowId(),
1✔
106
        runId);
107
    workflowTypeScope.counter(MetricsType.STICKY_CACHE_MISS).inc(1);
1✔
108

109
    return workflowExecutorFn.call();
1✔
110
  }
111

112
  public void addToCache(
113
      WorkflowExecution workflowExecution, WorkflowRunTaskHandler workflowRunTaskHandler) {
114
    cache.put(workflowExecution.getRunId(), workflowRunTaskHandler);
1✔
115
    log.trace(
1✔
116
        "Workflow Execution {}-{} has been added to cache",
117
        workflowExecution.getWorkflowId(),
1✔
118
        workflowExecution.getRunId());
1✔
119
    this.metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
1✔
120
  }
1✔
121

122
  /**
123
   * @param workflowTypeScope accepts workflow metric scope (tagged with task queue and workflow
124
   *     type)
125
   */
126
  @SuppressWarnings("deprecation")
127
  public boolean evictAnyNotInProcessing(
128
      WorkflowExecution inFavorOfExecution, Scope workflowTypeScope) {
129
    try {
130
      String inFavorOfRunId = inFavorOfExecution.getRunId();
1✔
131
      for (String key : cache.asMap().keySet()) {
1✔
132
        if (key.equals(inFavorOfRunId)) continue;
1✔
133
        boolean locked = runLockManager.tryLock(key);
1✔
134
        // if we were able to take a lock here, it means that the workflow is not in processing
135
        // currently on workers of this WorkerFactory and can be evicted
136
        if (locked) {
1✔
137
          try {
138
            log.trace(
1✔
139
                "Workflow Execution {}-{} caused eviction of Workflow Execution with runId {}",
140
                inFavorOfExecution.getWorkflowId(),
1✔
141
                inFavorOfRunId,
142
                key);
143
            cache.invalidate(key);
1✔
144
            workflowTypeScope.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION).inc(1);
1✔
145
            workflowTypeScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
1✔
146
            return true;
1✔
147
          } finally {
148
            runLockManager.unlock(key);
1✔
149
          }
150
        }
151
      }
1✔
152

153
      log.trace("Failed to evict from Workflow Execution cache, cache size is {}", cache.size());
1✔
154
      return false;
1✔
155
    } finally {
156
      this.metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
1✔
157
    }
158
  }
159

160
  @SuppressWarnings("deprecation")
161
  public void invalidate(
162
      WorkflowExecution execution, Scope workflowTypeScope, String reason, Throwable cause) {
163
    String runId = execution.getRunId();
1✔
164
    @Nullable WorkflowRunTaskHandler present = cache.getIfPresent(runId);
1✔
165
    if (log.isTraceEnabled()) {
1✔
166
      log.trace(
×
167
          "Invalidating {}-{} because of '{}', value is present in the cache: {}",
168
          execution.getWorkflowId(),
×
169
          runId,
170
          reason,
171
          present,
172
          cause);
173
    }
174
    cache.invalidate(runId);
1✔
175
    if (present != null) {
1✔
176
      workflowTypeScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
1✔
177
      this.metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
1✔
178
    }
179
  }
1✔
180

181
  public long size() {
182
    return cache.size();
1✔
183
  }
184

185
  public void invalidateAll() {
186
    cache.invalidateAll();
1✔
187
    metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
1✔
188
  }
1✔
189
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc