• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ochaton / fun-cpu-limit / 1852710272

04 Jun 2025 09:44AM UTC coverage: 83.158% (-3.7%) from 86.842%
1852710272

push

gitlab-ci

Vladislav Grubov
fixes cpu_limit, adds pv

38 of 51 new or added lines in 1 file covered. (74.51%)

2 existing lines in 1 file now uncovered.

79 of 95 relevant lines covered (83.16%)

8247.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.16
/fun/cpu_limit.lua
1
local fun = require 'fun'
1✔
2
local fun_iter = fun.iter
1✔
3

4
local methods = debug.getmetatable(fun.ones()).__index
2✔
5

6
local rawiter = function(gen, param, state)
7
        return fun_iter(gen, param, state):unwrap()
2✔
8
end
9

10
local method1 = function(func)
11
    return function(self, arg1)
12
        return func(arg1, self.gen, self.param, self.state)
3✔
13
    end
14
end
15

16
local export1 = function(func)
17
    return function(arg1, gen, param, state)
18
        return func(arg1, rawiter(gen, param, state))
2✔
19
    end
20
end
21

22
local fiber = require 'fiber'
1✔
23
local clock = require 'clock'
1✔
24
-- local log = require 'log'
25
-- if log.new then log = log.new('fun.cpu_limit') end
26

27
local function ev_time_mks() return fiber.time64() end
29,205✔
28
local function realtime_mks() return clock.realtime64()/1e3 end
20,190✔
29
local function thread_time_mks() return clock.thread64()/1e3 end
20,194✔
30

31
---@param time number
32
local yield_sleep = function(time)
33
        fiber.sleep(time)
9,011✔
34
end
35

36
local build_yield_commit = function(yield_with)
37
        return function(time)
38
                local is_committed = false
250✔
39
                if type(box.cfg) ~= 'function' and box.is_in_txn and box.is_in_txn() then
250✔
40
                        box.commit()
250✔
41
                        is_committed = true
250✔
42
                end
43
                yield_with(time)
250✔
44
                if is_committed then box.begin() end
250✔
45
        end
46
end
47

48
local build_yield_rollback = function(yield_with)
49
        return function(time)
NEW
50
                if type(box.cfg) ~= 'function' and box.is_in_txn and box.is_in_txn() then
×
NEW
51
                        box.rollback()
×
52
                end
NEW
53
                return yield_with(time)
×
54
        end
55
end
56

57
local build_yield_raise = function(time)
58
        return function(yield_with)
NEW
59
                if type(box.cfg) ~= 'function' and box.is_in_txn and box.is_in_txn() then
×
NEW
60
                        error("Transaction left openned", 3)
×
61
                end
NEW
62
                return yield_with(time)
×
63
        end
64
end
65

66
local yield_commit = build_yield_commit(yield_sleep)
1✔
67
local yield_rollback = build_yield_rollback(yield_sleep)
1✔
68
local yield_raise = build_yield_raise(yield_sleep)
1✔
69

70
local mks = 1
1✔
71
local sec = 1e6
1✔
72
local ms = 1000*mks
1✔
73
local max_ev_run_mks = 100*ms
1✔
74

75
local cpu_limit_gen_x = function(state, state_x, ...)
76
        if state_x == nil then
40,199✔
77
                return nil
2✔
78
        end
79

80
        state[1] = state_x
40,197✔
81
        -- state[2] = quota_mks
82
        -- state[3] = ev_mks
83
        -- state[4] = thread_mks
84

85
        return state, ...
40,197✔
86
end
87

88
local function update_quota(now_mks, max_mks, add_mks)
89
        local new_mks = now_mks + add_mks
29,200✔
90
        if new_mks > max_mks then
29,200✔
NEW
91
                new_mks = max_mks
×
92
        end
93
        -- log.info("quota: %.1fµs -> %.1fµs", now_mks, new_mks)
94
        return new_mks
29,200✔
95
end
96

97
local rand = math.random
1✔
98

99
local cpu_limit_gen = function(param, state)
100
        local max_quota_mks, sleep, gen_x, param_x = param[1], param[2], param[3], param[4]
40,199✔
101
        local state_x, quota_mks, prev_ev_mks, prev_thread_mks = state[1], state[2], state[3], state[4]
40,199✔
102

103
        if rand() > 0.5 then
40,199✔
104
                return cpu_limit_gen_x(state, gen_x(param_x, state_x))
40,020✔
105
        end
106

107
        local ev_mks = ev_time_mks()
20,189✔
108
        local thread_mks = thread_time_mks()
20,189✔
109
        local wall_mks = realtime_mks()
20,189✔
110

111
        if prev_ev_mks == ev_mks then
20,189✔
112
                -- the caller spent all this time on cpu
113
                -- decrease quota
114
                quota_mks = update_quota(quota_mks, max_quota_mks, -tonumber(thread_mks - prev_thread_mks))
40,378✔
115
                -- quota_mks = quota_mks - tonumber(thread_mks - prev_thread_mks)
116
        end
117

118
        local ev_loop_too_long = (ev_mks + max_ev_run_mks) < wall_mks
20,189✔
119
        if ev_loop_too_long or quota_mks <= 1 then
20,189✔
120
                -- it's time to yield
121
                -- Fiber needs to sleep:
122
                local sleep_sec = (max_ev_run_mks - (max_quota_mks - quota_mks))/sec
9,011✔
123
                sleep(sleep_sec)
9,011✔
124

125
                ev_mks = ev_time_mks()
18,022✔
126
                if ev_mks > wall_mks then
9,011✔
127
                        -- yes, any time drift does not add time quota.
128
                        local gained_mks = assert(tonumber(ev_mks-wall_mks))
9,011✔
129
                        quota_mks = update_quota(quota_mks, max_quota_mks, gained_mks*max_quota_mks/max_ev_run_mks)
18,022✔
130
                end
131
        end
132

133
        -- state[1] = state_x
134
        state[2] = quota_mks
20,189✔
135
        state[3] = ev_mks
20,189✔
136
        state[4] = thread_mks
20,189✔
137

138
        return cpu_limit_gen_x(state, gen_x(param_x, state_x))
40,378✔
139
end
140

141
local cpu_limit = function(opts, gen_x, param_x, state_x)
142
        local cpu_limit
143
        local sleep_with = yield_sleep
4✔
144
        if type(opts) == 'table' then
4✔
145
                cpu_limit = tonumber(opts.cpu_limit)
2✔
146
                local yield_with = yield_sleep
2✔
147
                if opts.yield_with then
2✔
NEW
148
                        if type(opts.yield_with) ~= 'function' then
×
NEW
149
                                error("malformed cpu_limit/yield_with: must be a function when given", 2)
×
150
                        end
NEW
151
                        yield_with = opts.yield_with
×
152
                end
153

154
                ---sleep_builder is a builder function that can build specific fiber.sleep()
155
                ---default builders already exist
156
                ---@type fun(yield_with: fun(time: number)): fun(time: number)
157
                local sleep_builder
158
                if opts.txn == 'commit' then
2✔
159
                        sleep_builder = build_yield_commit
1✔
160
                        sleep_with = yield_commit
1✔
161
                elseif opts.txn == 'rollback' then
1✔
NEW
162
                        sleep_builder = build_yield_rollback
×
UNCOV
163
                        sleep_with = yield_rollback
×
164
                elseif opts.txn == 'raise' then
1✔
NEW
165
                        sleep_builder = build_yield_raise
×
UNCOV
166
                        sleep_with = yield_raise
×
167
                elseif opts.txn then
1✔
168
                        error("malformed cpu_limit/txn: commit, rollback or raise are supported", 2)
×
169
                end
170
                if yield_with ~= yield_sleep then
2✔
NEW
171
                        sleep_with = sleep_builder(yield_with)
×
172
                end
173
        else
174
                cpu_limit = tonumber(opts)
2✔
175
                sleep_with = yield_sleep
2✔
176
        end
177
        assert(cpu_limit, "malformed cpu_limit given: should be positive number within (0;100)")
4✔
178
        assert(cpu_limit > 0, "malformed cpu_limit given: should be positive number within (0;100)")
4✔
179
        assert(cpu_limit < 100, "malformed cpu_limit given: should be positive number within (0;100)")
4✔
180

181
        local max_quota_mks = tonumber(max_ev_run_mks * cpu_limit / 100)
4✔
182
        return fun.wrap(cpu_limit_gen,
4✔
183
                {max_quota_mks, sleep_with, gen_x, param_x},
4✔
184
                {state_x, max_quota_mks, ev_time_mks(), thread_time_mks()}
12✔
185
        )
4✔
186
end
187

188
--[[
189
        cpu limit is like dripping bucket.
190

191
        we say that we want to limit this iterator to consume ≤10% cpu.
192
        after yield we caught ev_mks.
193

194
        Then we call gen_x(param_x, state_x)
195
        and measure how much time it took.
196
        It may yield inside!
197

198
        Caller:
199
                - ev(), real()
200
                        - gen_x(param_x, state_x)
201
                - ev(), real()
202

203
        -- We gain time to work when we sleep
204
        -- We lose time of the work between subsequent calls
205

206
        -- Given time quota for fiber is evaluated as % of max_ev_run_time
207
        -- So basically, it is not possible to the fiber be on-cpu more than max_ev_run_time (10*ms for now)
208

209
        -- When fiber or ev_run exhausts time_quota (yes, noizy neighbours make fiber yield too)
210
        -- fiber is sent to sleep for next time slot.
211
        -- sleep time slot is evaluated in the following manner:
212
        -- for each X time of work, fiber needs to be yielded for at least X / % to regain it's time-quota.
213
        -- The same approach, but slightly in a different way is implemented here:
214
        -- fiber should sleep at most `max_ev_run_time` (basically should skip next ev loop).
215
        -- more precisely, fiber need to sleep (100% - X%) * max_ev_run_time where X% is cpu quota given to fiber
216
        -- but, in some cases, it is sended to sleep because of noizy neighbours,
217
        -- so we subtract from sleep-time left time_quota.
218
        -- After each sleep time_quota increases for X% of the sleep but never can be higher than X% * max_ev_run_time.
219
]]
220

221
methods.cpu_limit = method1(cpu_limit)
2✔
222
---@diagnostic disable-next-line: inject-field
223
fun.cpu_limit = export1(cpu_limit)
2✔
224

225
return fun
1✔
226

227
--[[
228
        fun.cpulimit(10, ....)
229

230
        fun.ones():cpu_limit(10)
231

232
        fun.ones():cpu_limit({ cpu_limit = 3, rxn='rollback' })
233
]]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc