• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #120

pending completion
#120

push

github-actions

web-flow
Rename sdk-features to features (#1599)

16189 of 20022 relevant lines covered (80.86%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/temporal-sdk/src/main/java/io/temporal/internal/worker/Throttler.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import org.slf4j.Logger;
24
import org.slf4j.LoggerFactory;
25

26
final class Throttler {
27

28
  private static final Logger log = LoggerFactory.getLogger(Throttler.class);
×
29

30
  /** Human readable name of the resource being throttled. Used for logging only. */
31
  private final String name;
32

33
  /** Used as a circular buffer */
34
  private CircularLongBuffer checkPointTimes;
35

36
  /** Used as an index to a circular buffer */
37
  private long index;
38

39
  /** Interval used to measure the rate. Shorter interval allows less spikey rates. */
40
  private long rateInterval;
41

42
  private final long rateIntervalMilliseconds;
43

44
  private long overslept;
45

46
  /**
47
   * Construct throttler.
48
   *
49
   * @param name Human readable name of the resource being throttled. Used for logging only.
50
   * @param maxRatePerSecond maximum rate allowed
51
   * @param rateIntervalMilliseconds rate measurement interval. Interval should be at least 1000 /
52
   *     maxRatePerSecond.
53
   */
54
  public Throttler(String name, double maxRatePerSecond, long rateIntervalMilliseconds) {
×
55
    if (null == name) {
×
56
      throw new IllegalArgumentException("null name");
×
57
    }
58
    this.name = name;
×
59
    if (maxRatePerSecond <= 0) {
×
60
      throw new IllegalArgumentException("0 or negative maxRatePerSecond");
×
61
    }
62
    if (rateIntervalMilliseconds <= 0) {
×
63
      throw new IllegalArgumentException("0 or negative rateIntervalMilliseconds");
×
64
    }
65
    synchronized (this) {
×
66
      this.rateIntervalMilliseconds = rateIntervalMilliseconds;
×
67
      setMaxRatePerSecond(maxRatePerSecond);
×
68
    }
×
69
  }
×
70

71
  public synchronized void setMaxRatePerSecond(double maxRatePerSecond) {
72
    int maxMessagesPerRateInterval = (int) (maxRatePerSecond * rateIntervalMilliseconds / 1000);
×
73
    if (maxMessagesPerRateInterval == 0) {
×
74
      maxMessagesPerRateInterval = 1;
×
75
      rateInterval = (long) (1.0 / maxRatePerSecond * 1000.0);
×
76
    } else {
77
      rateInterval = rateIntervalMilliseconds;
×
78
    }
79
    if (checkPointTimes != null) {
×
80
      int oldSize = checkPointTimes.size();
×
81
      checkPointTimes =
×
82
          checkPointTimes.copy(index - maxMessagesPerRateInterval, maxMessagesPerRateInterval);
×
83
      index = Math.min(checkPointTimes.size(), oldSize);
×
84
    } else {
×
85
      checkPointTimes = new CircularLongBuffer(maxMessagesPerRateInterval);
×
86
      index = 0;
×
87
    }
88
    log.debug("new rate=" + maxRatePerSecond + " (msg/sec)");
×
89
  }
×
90

91
  public synchronized void throttle(int count) throws InterruptedException {
92
    for (int i = 0; i < count; ++i) {
×
93
      throttle();
×
94
    }
95
  }
×
96

97
  /**
98
   * When called on each request sleeps if called faster then configured average rate.
99
   *
100
   * @throws InterruptedException when destroyRequested
101
   */
102
  public synchronized void throttle() throws InterruptedException {
103
    long now = System.currentTimeMillis();
×
104
    long checkPoint = checkPointTimes.get(index);
×
105
    if (checkPoint > 0) {
×
106
      long elapsed = now - checkPoint;
×
107

108
      // if the time for this window is less than the minimum per window
109
      if (elapsed >= 0 && elapsed < rateInterval) {
×
110
        long sleepInterval = rateInterval - elapsed - overslept;
×
111
        overslept = 0;
×
112
        if (sleepInterval > 0) {
×
113
          if (log.isTraceEnabled()) {
×
114
            log.debug(
×
115
                "Throttling "
116
                    + name
117
                    + ": called "
118
                    + checkPointTimes.size()
×
119
                    + " times in last "
120
                    + elapsed
121
                    + " milliseconds. Going to sleep for "
122
                    + sleepInterval
123
                    + " milliseconds.");
124
          }
125
          long t = System.currentTimeMillis();
×
126
          Thread.sleep(sleepInterval);
×
127
          overslept = System.currentTimeMillis() - t - sleepInterval;
×
128
        }
129
      }
130
    }
131
    checkPointTimes.set(index++, System.currentTimeMillis());
×
132
  }
×
133
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc