• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #291

30 Jul 2024 10:28PM UTC coverage: 77.576% (-0.08%) from 77.656%
#291

push

github

web-flow
Ensure shutdown of LA slot queue isn't swallowed (#2161)

4 of 4 new or added lines in 2 files covered. (100.0%)

22 existing lines in 7 files now uncovered.

19716 of 25415 relevant lines covered (77.58%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.21
/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import io.temporal.worker.tuning.LocalActivitySlotInfo;
24
import io.temporal.worker.tuning.SlotPermit;
25
import io.temporal.workflow.Functions;
26
import java.util.concurrent.PriorityBlockingQueue;
27
import java.util.concurrent.Semaphore;
28
import java.util.concurrent.TimeUnit;
29
import javax.annotation.Nullable;
30
import org.slf4j.Logger;
31
import org.slf4j.LoggerFactory;
32

33
class LocalActivitySlotSupplierQueue {
34
  static final class QueuedLARequest {
35
    final boolean isRetry;
36
    final SlotReservationData data;
37
    final LocalActivityAttemptTask task;
38

39
    QueuedLARequest(boolean isRetry, SlotReservationData data, LocalActivityAttemptTask task) {
1✔
40
      this.isRetry = isRetry;
1✔
41
      this.data = data;
1✔
42
      this.task = task;
1✔
43
    }
1✔
44
  }
45

46
  private final PriorityBlockingQueue<QueuedLARequest> requestQueue;
47
  private final Semaphore newExecutionsBackpressureSemaphore;
48
  private final TrackingSlotSupplier<LocalActivitySlotInfo> slotSupplier;
49
  private final Functions.Proc1<LocalActivityAttemptTask> afterReservedCallback;
50
  private final Thread queueThread;
51
  private static final Logger log =
1✔
52
      LoggerFactory.getLogger(LocalActivitySlotSupplierQueue.class.getName());
1✔
53
  private volatile boolean running = true;
1✔
54

55
  LocalActivitySlotSupplierQueue(
56
      TrackingSlotSupplier<LocalActivitySlotInfo> slotSupplier,
57
      Functions.Proc1<LocalActivityAttemptTask> afterReservedCallback) {
1✔
58
    this.afterReservedCallback = afterReservedCallback;
1✔
59
    // TODO: See if there's a better option than fixed number for no-max suppliers
60
    //   https://github.com/temporalio/sdk-java/issues/2149
61
    int maximumSlots = slotSupplier.maximumSlots().orElse(50) * 2;
1✔
62
    this.newExecutionsBackpressureSemaphore = new Semaphore(maximumSlots);
1✔
63
    this.requestQueue =
1✔
64
        new PriorityBlockingQueue<>(
65
            maximumSlots,
66
            (r1, r2) -> {
67
              // Prioritize retries
68
              if (r1.isRetry && !r2.isRetry) {
1✔
69
                return -1;
×
70
              } else if (!r1.isRetry && r2.isRetry) {
1✔
71
                return 1;
×
72
              }
73
              return 0;
1✔
74
            });
75
    this.slotSupplier = slotSupplier;
1✔
76
    this.queueThread = new Thread(this::processQueue, "LocalActivitySlotSupplierQueue");
1✔
77
    this.queueThread.start();
1✔
78
  }
1✔
79

80
  private void processQueue() {
81
    try {
82
      while (running) {
1✔
83
        QueuedLARequest request = requestQueue.take();
1✔
84
        SlotPermit slotPermit;
85
        try {
86
          slotPermit = slotSupplier.reserveSlot(request.data);
1✔
87
        } catch (InterruptedException e) {
1✔
88
          Thread.currentThread().interrupt();
1✔
89
          return;
1✔
UNCOV
90
        } catch (Exception e) {
×
UNCOV
91
          log.error(
×
92
              "Error reserving local activity slot, dropped activity id {}",
UNCOV
93
              request.task.getActivityId(),
×
94
              e);
UNCOV
95
          continue;
×
96
        }
1✔
97
        request.task.getExecutionContext().setPermit(slotPermit);
1✔
98
        afterReservedCallback.apply(request.task);
1✔
99
      }
1✔
100
    } catch (InterruptedException e) {
1✔
101
      Thread.currentThread().interrupt();
1✔
UNCOV
102
    }
×
103
  }
1✔
104

105
  void shutdown() {
106
    running = false;
1✔
107
    queueThread.interrupt();
1✔
108
  }
1✔
109

110
  boolean waitOnBackpressure(@Nullable Long acceptanceTimeoutMs) throws InterruptedException {
111
    boolean accepted;
112
    if (acceptanceTimeoutMs == null) {
1✔
113
      newExecutionsBackpressureSemaphore.acquire();
×
114
      accepted = true;
×
115
    } else {
116
      if (acceptanceTimeoutMs > 0) {
1✔
117
        accepted =
1✔
118
            newExecutionsBackpressureSemaphore.tryAcquire(
1✔
119
                acceptanceTimeoutMs, TimeUnit.MILLISECONDS);
1✔
120
      } else {
121
        accepted = newExecutionsBackpressureSemaphore.tryAcquire();
1✔
122
      }
123
    }
124
    return accepted;
1✔
125
  }
126

127
  void submitAttempt(SlotReservationData data, boolean isRetry, LocalActivityAttemptTask task) {
128
    QueuedLARequest request = new QueuedLARequest(isRetry, data, task);
1✔
129
    requestQueue.add(request);
1✔
130

131
    if (!isRetry) {
1✔
132
      // If this attempt isn't a retry, that means it had to get a permit from the backpressure
133
      // semaphore, and therefore we should release that permit now.
134
      newExecutionsBackpressureSemaphore.release();
1✔
135
    }
136
  }
1✔
137
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc