• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #272

21 Jun 2024 08:17PM UTC coverage: 77.548% (+0.04%) from 77.506%
#272

push

github

web-flow
Resource based tuner (#2110)

275 of 338 new or added lines in 11 files covered. (81.36%)

12 existing lines in 5 files now uncovered.

19522 of 25174 relevant lines covered (77.55%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.42
/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowExecutorCache.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.internal.common.WorkflowExecutionUtils.isFullHistory;
24

25
import com.google.common.base.Preconditions;
26
import com.google.common.cache.Cache;
27
import com.google.common.cache.CacheBuilder;
28
import com.uber.m3.tally.Scope;
29
import io.temporal.api.common.v1.WorkflowExecution;
30
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
31
import io.temporal.internal.replay.WorkflowRunTaskHandler;
32
import io.temporal.worker.MetricsType;
33
import java.util.Objects;
34
import java.util.concurrent.Callable;
35
import javax.annotation.Nullable;
36
import javax.annotation.concurrent.ThreadSafe;
37
import org.slf4j.Logger;
38
import org.slf4j.LoggerFactory;
39

40
@ThreadSafe
41
public final class WorkflowExecutorCache {
42
  private final Logger log = LoggerFactory.getLogger(WorkflowExecutorCache.class);
1✔
43
  private final WorkflowRunLockManager runLockManager;
44
  private final Cache<String, WorkflowRunTaskHandler> cache;
45
  private final Scope metricsScope;
46

47
  public WorkflowExecutorCache(
48
      int workflowCacheSize, WorkflowRunLockManager runLockManager, Scope scope) {
1✔
49
    Preconditions.checkArgument(workflowCacheSize > 0, "Max cache size must be greater than 0");
1✔
50
    this.runLockManager = runLockManager;
1✔
51
    this.cache =
1✔
52
        CacheBuilder.newBuilder()
1✔
53
            .maximumSize(workflowCacheSize)
1✔
54
            // TODO this number is taken out of the blue.
55
            //  This number should be calculated based on the number of all workers workflow task
56
            //  processors.
57
            .concurrencyLevel(128)
1✔
58
            .removalListener(
1✔
59
                e -> {
60
                  WorkflowRunTaskHandler entry = (WorkflowRunTaskHandler) e.getValue();
1✔
61
                  if (entry != null) {
1✔
62
                    try {
63
                      log.trace(
1✔
64
                          "Closing workflow execution for runId {}, cause {}",
65
                          e.getKey(),
1✔
66
                          e.getCause());
1✔
67
                      entry.close();
1✔
68
                      log.trace("Workflow execution for runId {} closed", e);
1✔
UNCOV
69
                    } catch (Throwable t) {
×
UNCOV
70
                      log.error("Workflow execution closure failed with an exception", t);
×
UNCOV
71
                      throw t;
×
72
                    }
1✔
73
                  }
74
                })
1✔
75
            .build();
1✔
76
    this.metricsScope = Objects.requireNonNull(scope);
1✔
77
    this.metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
1✔
78
  }
1✔
79

80
  public WorkflowRunTaskHandler getOrCreate(
81
      PollWorkflowTaskQueueResponseOrBuilder workflowTask,
82
      Scope workflowTypeScope,
83
      Callable<WorkflowRunTaskHandler> workflowExecutorFn)
84
      throws Exception {
85
    WorkflowExecution execution = workflowTask.getWorkflowExecution();
1✔
86
    String runId = execution.getRunId();
1✔
87
    if (isFullHistory(workflowTask)) {
1✔
88
      invalidate(execution, metricsScope, "full history", null);
1✔
89
      log.trace(
1✔
90
          "New Workflow Executor {}-{} has been created for a full history run",
91
          execution.getWorkflowId(),
1✔
92
          runId);
93
      return workflowExecutorFn.call();
1✔
94
    }
95

96
    @Nullable WorkflowRunTaskHandler workflowRunTaskHandler = cache.getIfPresent(runId);
1✔
97

98
    if (workflowRunTaskHandler != null) {
1✔
99
      workflowTypeScope.counter(MetricsType.STICKY_CACHE_HIT).inc(1);
1✔
100
      return workflowRunTaskHandler;
1✔
101
    }
102

103
    log.trace(
1✔
104
        "Workflow Executor {}-{} wasn't found in cache and a new executor has been created",
105
        execution.getWorkflowId(),
1✔
106
        runId);
107
    workflowTypeScope.counter(MetricsType.STICKY_CACHE_MISS).inc(1);
1✔
108

109
    return workflowExecutorFn.call();
1✔
110
  }
111

112
  public void addToCache(
113
      WorkflowExecution workflowExecution, WorkflowRunTaskHandler workflowRunTaskHandler) {
114
    cache.put(workflowExecution.getRunId(), workflowRunTaskHandler);
1✔
115
    log.trace(
1✔
116
        "Workflow Execution {}-{} has been added to cache",
117
        workflowExecution.getWorkflowId(),
1✔
118
        workflowExecution.getRunId());
1✔
119
    this.metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
1✔
120
  }
1✔
121

122
  /**
123
   * @param workflowTypeScope accepts workflow metric scope (tagged with task queue and workflow
124
   *     type)
125
   */
126
  @SuppressWarnings("deprecation")
127
  public boolean evictAnyNotInProcessing(
128
      WorkflowExecution inFavorOfExecution, Scope workflowTypeScope) {
129
    try {
130
      String inFavorOfRunId = inFavorOfExecution.getRunId();
1✔
131
      for (String key : cache.asMap().keySet()) {
1✔
132
        if (key.equals(inFavorOfRunId)) continue;
1✔
133
        boolean locked = runLockManager.tryLock(key);
1✔
134
        // if we were able to take a lock here, it means that the workflow is not in processing
135
        // currently on workers of this WorkerFactory and can be evicted
136
        if (locked) {
1✔
137
          try {
138
            log.trace(
1✔
139
                "Workflow Execution {}-{} caused eviction of Workflow Execution with runId {}",
140
                inFavorOfExecution.getWorkflowId(),
1✔
141
                inFavorOfRunId,
142
                key);
143
            cache.invalidate(key);
1✔
144
            workflowTypeScope.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION).inc(1);
1✔
145
            workflowTypeScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
1✔
146
            return true;
1✔
147
          } finally {
148
            runLockManager.unlock(key);
1✔
149
          }
150
        }
151
      }
1✔
152

153
      log.trace("Failed to evict from Workflow Execution cache, cache size is {}", cache.size());
1✔
154
      return false;
1✔
155
    } finally {
156
      this.metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
1✔
157
    }
158
  }
159

160
  @SuppressWarnings("deprecation")
161
  public void invalidate(
162
      WorkflowExecution execution, Scope workflowTypeScope, String reason, Throwable cause) {
163
    String runId = execution.getRunId();
1✔
164
    @Nullable WorkflowRunTaskHandler present = cache.getIfPresent(runId);
1✔
165
    if (log.isTraceEnabled()) {
1✔
166
      log.trace(
×
167
          "Invalidating {}-{} because of '{}', value is present in the cache: {}",
168
          execution.getWorkflowId(),
×
169
          runId,
170
          reason,
171
          present,
172
          cause);
173
    }
174
    cache.invalidate(runId);
1✔
175
    if (present != null) {
1✔
176
      workflowTypeScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
1✔
177
      this.metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
1✔
178
    }
179
  }
1✔
180

181
  public long size() {
182
    return cache.size();
1✔
183
  }
184

185
  public void invalidateAll() {
186
    cache.invalidateAll();
1✔
187
    metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
1✔
188
  }
1✔
189
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc