• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #103

pending completion
#103

push

github-actions

web-flow
Implement retry of local activities for over local retry threshold duration (#1542)

Issue #1261

244 of 244 new or added lines in 16 files covered. (100.0%)

16122 of 19841 relevant lines covered (81.26%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.24
/temporal-sdk/src/main/java/io/temporal/internal/worker/EagerActivitySlotsReservation.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import com.google.common.base.Preconditions;
24
import io.temporal.api.command.v1.Command;
25
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributes;
26
import io.temporal.api.enums.v1.CommandType;
27
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
28
import io.temporal.api.workflowservice.v1.RespondWorkflowTaskCompletedRequest;
29
import io.temporal.api.workflowservice.v1.RespondWorkflowTaskCompletedResponse;
30
import io.temporal.internal.Config;
31
import java.io.Closeable;
32
import javax.annotation.concurrent.NotThreadSafe;
33

34
/** This class is not thread safe and shouldn't leave the boundaries of one activity executor */
35
@NotThreadSafe
36
class EagerActivitySlotsReservation implements Closeable {
37
  private final EagerActivityDispatcher eagerActivityDispatcher;
38
  private int outstandingReservationSlotsCount = 0;
1✔
39

40
  EagerActivitySlotsReservation(EagerActivityDispatcher eagerActivityDispatcher) {
1✔
41
    this.eagerActivityDispatcher = eagerActivityDispatcher;
1✔
42
  }
1✔
43

44
  public void applyToRequest(RespondWorkflowTaskCompletedRequest.Builder mutableRequest) {
45
    for (int i = 0; i < mutableRequest.getCommandsCount(); i++) {
1✔
46
      Command command = mutableRequest.getCommands(i);
1✔
47
      if (command.getCommandType() != CommandType.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK) continue;
1✔
48

49
      ScheduleActivityTaskCommandAttributes commandAttributes =
1✔
50
          command.getScheduleActivityTaskCommandAttributes();
1✔
51
      if (!commandAttributes.getRequestEagerExecution()) continue;
1✔
52

53
      if (this.outstandingReservationSlotsCount < Config.EAGER_ACTIVITIES_LIMIT
1✔
54
          && this.eagerActivityDispatcher.tryReserveActivitySlot(commandAttributes)) {
1✔
55
        this.outstandingReservationSlotsCount++;
1✔
56
      } else {
57
        mutableRequest.setCommands(
1✔
58
            i,
59
            command.toBuilder()
1✔
60
                .setScheduleActivityTaskCommandAttributes(
1✔
61
                    commandAttributes.toBuilder().setRequestEagerExecution(false)));
1✔
62
      }
63
    }
64
  }
1✔
65

66
  public void handleResponse(RespondWorkflowTaskCompletedResponse serverResponse) {
67
    int activityTasksCount = serverResponse.getActivityTasksCount();
1✔
68
    Preconditions.checkArgument(
1✔
69
        activityTasksCount <= this.outstandingReservationSlotsCount,
70
        "Unexpectedly received %s eager activities though we only requested %s",
71
        activityTasksCount,
72
        this.outstandingReservationSlotsCount);
73

74
    releaseSlots(this.outstandingReservationSlotsCount - activityTasksCount);
1✔
75

76
    for (PollActivityTaskQueueResponse act : serverResponse.getActivityTasksList()) {
1✔
77
      // don't release slots here, instead the semaphore release reference is passed to the activity
78
      // worker to release when the activity is done
79
      this.eagerActivityDispatcher.dispatchActivity(act);
×
80
    }
×
81

82
    this.outstandingReservationSlotsCount = 0;
1✔
83
  }
1✔
84

85
  @Override
86
  public void close() {
87
    if (this.outstandingReservationSlotsCount > 0)
1✔
88
      releaseSlots(this.outstandingReservationSlotsCount);
×
89
  }
1✔
90

91
  private void releaseSlots(int slotsToRelease) {
92
    if (slotsToRelease > this.outstandingReservationSlotsCount)
1✔
93
      throw new IllegalStateException(
×
94
          "Trying to release more activity slots than outstanding reservations");
95

96
    this.eagerActivityDispatcher.releaseActivitySlotReservations(slotsToRelease);
1✔
97
    this.outstandingReservationSlotsCount -= slotsToRelease;
1✔
98
  }
1✔
99
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc