• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #284

23 Jul 2024 10:09PM UTC coverage: 77.304% (-0.06%) from 77.364%
#284

push

github

web-flow
Reintroduce slot supplier & add many tests (#2143)

593 of 752 new or added lines in 37 files covered. (78.86%)

22 existing lines in 10 files now uncovered.

19554 of 25295 relevant lines covered (77.3%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.91
/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import io.temporal.worker.tuning.LocalActivitySlotInfo;
24
import io.temporal.worker.tuning.SlotPermit;
25
import io.temporal.workflow.Functions;
26
import java.util.concurrent.PriorityBlockingQueue;
27
import java.util.concurrent.Semaphore;
28
import java.util.concurrent.TimeUnit;
29
import javax.annotation.Nullable;
30
import org.slf4j.Logger;
31
import org.slf4j.LoggerFactory;
32

33
class LocalActivitySlotSupplierQueue {
34
  static final class QueuedLARequest {
35
    final boolean isRetry;
36
    final SlotReservationData data;
37
    final LocalActivityAttemptTask task;
38

39
    QueuedLARequest(boolean isRetry, SlotReservationData data, LocalActivityAttemptTask task) {
1✔
40
      this.isRetry = isRetry;
1✔
41
      this.data = data;
1✔
42
      this.task = task;
1✔
43
    }
1✔
44
  }
45

46
  private final PriorityBlockingQueue<QueuedLARequest> requestQueue;
47
  private final Semaphore newExecutionsBackpressureSemaphore;
48
  private final TrackingSlotSupplier<LocalActivitySlotInfo> slotSupplier;
49
  private final Functions.Proc1<LocalActivityAttemptTask> afterReservedCallback;
50
  private final Thread queueThread;
51
  private static final Logger log =
1✔
52
      LoggerFactory.getLogger(LocalActivitySlotSupplierQueue.class.getName());
1✔
53
  private volatile boolean running = true;
1✔
54

55
  LocalActivitySlotSupplierQueue(
56
      TrackingSlotSupplier<LocalActivitySlotInfo> slotSupplier,
57
      Functions.Proc1<LocalActivityAttemptTask> afterReservedCallback) {
1✔
58
    this.afterReservedCallback = afterReservedCallback;
1✔
59
    // TODO: See if there's a better option than fixed number for no-max suppliers
60
    //   https://github.com/temporalio/sdk-java/issues/2149
61
    int maximumSlots = slotSupplier.maximumSlots().orElse(50) * 2;
1✔
62
    this.newExecutionsBackpressureSemaphore = new Semaphore(maximumSlots);
1✔
63
    this.requestQueue =
1✔
64
        new PriorityBlockingQueue<>(
65
            maximumSlots,
66
            (r1, r2) -> {
67
              // Prioritize retries
68
              if (r1.isRetry && !r2.isRetry) {
1✔
NEW
69
                return -1;
×
70
              } else if (!r1.isRetry && r2.isRetry) {
1✔
NEW
71
                return 1;
×
72
              }
73
              return 0;
1✔
74
            });
75
    this.slotSupplier = slotSupplier;
1✔
76
    this.queueThread = new Thread(this::processQueue, "LocalActivitySlotSupplierQueue");
1✔
77
    this.queueThread.start();
1✔
78
  }
1✔
79

80
  private void processQueue() {
81
    try {
82
      while (running) {
1✔
83
        QueuedLARequest request = requestQueue.take();
1✔
84
        SlotPermit slotPermit;
85
        try {
86
          slotPermit = slotSupplier.reserveSlot(request.data);
1✔
87
        } catch (Exception e) {
1✔
88
          log.error(
1✔
89
              "Error reserving local activity slot, dropped activity id {}",
90
              request.task.getActivityId(),
1✔
91
              e);
92
          continue;
1✔
93
        }
1✔
94
        request.task.getExecutionContext().setPermit(slotPermit);
1✔
95
        afterReservedCallback.apply(request.task);
1✔
96
      }
1✔
97
    } catch (InterruptedException e) {
1✔
98
      Thread.currentThread().interrupt();
1✔
99
    }
1✔
100
  }
1✔
101

102
  void shutdown() {
103
    running = false;
1✔
104
    queueThread.interrupt();
1✔
105
  }
1✔
106

107
  boolean waitOnBackpressure(@Nullable Long acceptanceTimeoutMs) throws InterruptedException {
108
    boolean accepted;
109
    if (acceptanceTimeoutMs == null) {
1✔
NEW
110
      newExecutionsBackpressureSemaphore.acquire();
×
NEW
111
      accepted = true;
×
112
    } else {
113
      if (acceptanceTimeoutMs > 0) {
1✔
114
        accepted =
1✔
115
            newExecutionsBackpressureSemaphore.tryAcquire(
1✔
116
                acceptanceTimeoutMs, TimeUnit.MILLISECONDS);
1✔
117
      } else {
118
        accepted = newExecutionsBackpressureSemaphore.tryAcquire();
1✔
119
      }
120
    }
121
    return accepted;
1✔
122
  }
123

124
  void submitAttempt(SlotReservationData data, boolean isRetry, LocalActivityAttemptTask task) {
125
    QueuedLARequest request = new QueuedLARequest(isRetry, data, task);
1✔
126
    requestQueue.add(request);
1✔
127

128
    if (!isRetry) {
1✔
129
      // If this attempt isn't a retry, that means it had to get a permit from the backpressure
130
      // semaphore, and therefore we should release that permit now.
131
      newExecutionsBackpressureSemaphore.release();
1✔
132
    }
133
  }
1✔
134

135
  TrackingSlotSupplier<LocalActivitySlotInfo> getSlotSupplier() {
NEW
136
    return slotSupplier;
×
137
  }
138
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc