• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #284

23 Jul 2024 10:09PM UTC coverage: 77.304% (-0.06%) from 77.364%
#284

push

github

web-flow
Reintroduce slot supplier & add many tests (#2143)

593 of 752 new or added lines in 37 files covered. (78.86%)

22 existing lines in 10 files now uncovered.

19554 of 25295 relevant lines covered (77.3%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.11
/temporal-sdk/src/main/java/io/temporal/internal/worker/EagerActivitySlotsReservation.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import com.google.common.base.Preconditions;
24
import io.temporal.api.command.v1.Command;
25
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributes;
26
import io.temporal.api.enums.v1.CommandType;
27
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
28
import io.temporal.api.workflowservice.v1.RespondWorkflowTaskCompletedRequest;
29
import io.temporal.api.workflowservice.v1.RespondWorkflowTaskCompletedResponse;
30
import io.temporal.internal.Config;
31
import io.temporal.worker.tuning.SlotPermit;
32
import java.io.Closeable;
33
import java.util.ArrayList;
34
import java.util.List;
35
import java.util.Optional;
36
import javax.annotation.concurrent.NotThreadSafe;
37

38
/** This class is not thread safe and shouldn't leave the boundaries of one activity executor */
39
@NotThreadSafe
40
class EagerActivitySlotsReservation implements Closeable {
41
  private final EagerActivityDispatcher eagerActivityDispatcher;
42
  private final List<SlotPermit> reservedSlots = new ArrayList<>();
1✔
43

44
  EagerActivitySlotsReservation(EagerActivityDispatcher eagerActivityDispatcher) {
1✔
45
    this.eagerActivityDispatcher = eagerActivityDispatcher;
1✔
46
  }
1✔
47

48
  public void applyToRequest(RespondWorkflowTaskCompletedRequest.Builder mutableRequest) {
49
    for (int i = 0; i < mutableRequest.getCommandsCount(); i++) {
1✔
50
      Command command = mutableRequest.getCommands(i);
1✔
51
      if (command.getCommandType() != CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK) continue;
1✔
52

53
      ScheduleActivityTaskCommandAttributes commandAttributes =
1✔
54
          command.getScheduleActivityTaskCommandAttributes();
1✔
55
      if (!commandAttributes.getRequestEagerExecution()) continue;
1✔
56
      boolean atLimit = this.reservedSlots.size() >= Config.EAGER_ACTIVITIES_LIMIT;
1✔
57
      Optional<SlotPermit> permit = Optional.empty();
1✔
58
      if (!atLimit) {
1✔
59
        permit = this.eagerActivityDispatcher.tryReserveActivitySlot(commandAttributes);
1✔
60
      }
61

62
      if (permit.isPresent()) {
1✔
63
        this.reservedSlots.add(permit.get());
1✔
64
      } else {
65
        mutableRequest.setCommands(
1✔
66
            i,
67
            command.toBuilder()
1✔
68
                .setScheduleActivityTaskCommandAttributes(
1✔
69
                    commandAttributes.toBuilder().setRequestEagerExecution(false)));
1✔
70
      }
71
    }
72
  }
1✔
73

74
  public void handleResponse(RespondWorkflowTaskCompletedResponse serverResponse) {
75
    int activityTasksCount = serverResponse.getActivityTasksCount();
1✔
76
    Preconditions.checkArgument(
1✔
77
        activityTasksCount <= this.reservedSlots.size(),
1✔
78
        "Unexpectedly received %s eager activities though we only requested %s",
79
        activityTasksCount,
80
        this.reservedSlots.size());
1✔
81

82
    for (PollActivityTaskQueueResponse act : serverResponse.getActivityTasksList()) {
1✔
83
      // don't release slots here, instead the release function is called in the activity worker to
84
      // release when the activity is done
NEW
85
      SlotPermit permit = this.reservedSlots.remove(0);
×
NEW
86
      this.eagerActivityDispatcher.dispatchActivity(act, permit);
×
UNCOV
87
    }
×
88

89
    // Release any remaining that we won't be using
90
    try {
91
      this.eagerActivityDispatcher.releaseActivitySlotReservations(this.reservedSlots);
1✔
92
    } finally {
93
      this.reservedSlots.clear();
1✔
94
    }
95
  }
1✔
96

97
  @Override
98
  public void close() {
99
    if (!this.reservedSlots.isEmpty()) {
1✔
100
      // Release all slots
NEW
101
      this.eagerActivityDispatcher.releaseActivitySlotReservations(this.reservedSlots);
×
NEW
102
      this.reservedSlots.clear();
×
103
    }
104
  }
1✔
105
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc