• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarmodules / copas / 23646349072

27 Mar 2026 12:32PM UTC coverage: 85.443%. Remained the same
23646349072

push

github

web-flow
chore(ci): upgrade actions/checkout from v4 to v6 (#182)

1397 of 1635 relevant lines covered (85.44%)

76980.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.91
/src/copas/queue.lua
1
local copas = require "copas"
7✔
2
local gettime = copas.gettime
7✔
3
local Sema = copas.semaphore
7✔
4
local Lock = copas.lock
7✔
5

6

7
local Queue = {}
7✔
8
Queue.__index = Queue
7✔
9

10

11
local new_name do
7✔
12
  local count = 0
7✔
13

14
  function new_name()
6✔
15
    count = count + 1
42✔
16
    return "copas_queue_" .. count
42✔
17
  end
18
end
19

20

21
-- Creates a new Queue instance
22
function Queue.new(opts)
7✔
23
  opts = opts or {}
42✔
24
  local self = {}
42✔
25
  setmetatable(self, Queue)
42✔
26
  self.name = opts.name or new_name()
54✔
27
  self.sema = Sema.new(10^9)
54✔
28
  self.head = 1
42✔
29
  self.tail = 1
42✔
30
  self.list = {}
42✔
31
  self.workers = setmetatable({}, { __mode = "k" })
42✔
32
  self.stopping = false
42✔
33
  self.worker_id = 0
42✔
34
  self.exit_semaphore = Sema.new(10^9)
54✔
35
  return self
42✔
36
end
37

38

39
-- Pushes an item in the queue (can be 'nil')
40
-- returns true, or nil+err ("stopping", or "destroyed")
41
function Queue:push(item)
7✔
42
  if self.stopping then
126✔
43
    return nil, "stopping"
×
44
  end
45
  self.list[self.head] = item
126✔
46
  self.head = self.head + 1
126✔
47
  self.sema:give()
126✔
48
  return true
126✔
49
end
50

51

52
-- Pops and item from the queue. If there are no items in the queue it will yield
53
-- until there are or a timeout happens (exception is when `timeout == 0`, then it will
54
-- not yield but return immediately). If the timeout is `math.huge` it will wait forever.
55
-- Returns item, or nil+err ("timeout", or "destroyed")
56
function Queue:pop(timeout)
7✔
57
  local ok, err = self.sema:take(1, timeout)
154✔
58
  if not ok then
154✔
59
    return ok, err
35✔
60
  end
61

62
  local item = self.list[self.tail]
119✔
63
  self.list[self.tail] = nil
119✔
64
  self.tail = self.tail + 1
119✔
65

66
  if self.tail == self.head then
119✔
67
    -- reset queue
68
    self.list = {}
49✔
69
    self.tail = 1
49✔
70
    self.head = 1
49✔
71
    if self.stopping then
49✔
72
      -- we're stopping and last item being returned, so we're done
73
      self:destroy()
7✔
74
    end
75
  end
76
  return item
119✔
77
end
78

79

80
-- return the number of items left in the queue
81
function Queue:get_size()
7✔
82
  return self.head - self.tail
70✔
83
end
84

85

86
-- instructs the queue to stop. Will not accept any more 'push' calls.
87
-- will autocall 'destroy' when the queue is empty.
88
-- returns immediately. See `finish`
89
function Queue:stop()
7✔
90
  if not self.stopping then
42✔
91
    self.stopping = true
42✔
92
    self.lock = Lock.new(nil, true)
54✔
93
    self.lock:get() -- close the lock
42✔
94
    if self:get_size() == 0 then
54✔
95
      -- queue is already empty, so "pop" function cannot call destroy on next
96
      -- pop, so destroy now.
97
      self:destroy()
28✔
98
    end
99
  end
100
  return true
42✔
101
end
102

103

104
-- Finishes a queue. Calls stop and then waits for the queue to run empty (and be
105
-- destroyed) before returning. returns true or nil+err ("timeout", or "destroyed")
106
-- Parameter no_destroy_on_timeout indicates if the queue is not to be forcefully
107
-- destroyed on a timeout.
108
function Queue:finish(timeout, no_destroy_on_timeout)
7✔
109
  self:stop()
28✔
110
  timeout = timeout or self.lock.timeout
28✔
111
  local endtime = gettime() + timeout
28✔
112
  local _, err = self.lock:get(timeout)
34✔
113
  -- the lock never gets released, only destroyed, so we have to check the error string
114
  if err == "timeout" then
28✔
115
    if not no_destroy_on_timeout then
7✔
116
      self:destroy()
7✔
117
    end
118
    return nil, err
7✔
119
  end
120

121
  -- if we get here, the lock was destroyed, so the queue is empty, now wait for all workers to exit
122
  if not next(self.workers) then
21✔
123
    -- all workers already exited, we're done
124
    return true
×
125
  end
126

127
  -- multiple threads can call this "finish" method, so we must check exiting workers
128
  -- one by one.
129
  while true do
130
    local _, err = self.exit_semaphore:take(1, math.max(0, endtime - gettime()))
28✔
131
    if err == "destroyed" then
28✔
132
      return true  -- someone else destroyed/finished it, so we're done
×
133
    end
134
    if err == "timeout" then
28✔
135
      if not no_destroy_on_timeout then
×
136
        self:destroy()
×
137
      end
138
      return nil, "timeout"
×
139
    end
140
    if not next(self.workers) then
28✔
141
      self.exit_semaphore:destroy()
21✔
142
      return true  -- all workers exited, we're done
21✔
143
    end
144
  end
145
end
146

147

148
do
149
  local destroyed_func = function()
150
    return nil, "destroyed"
28✔
151
  end
152

153
  local destroyed_queue_mt = {
7✔
154
    __index = function()
155
      return destroyed_func
28✔
156
    end
157
  }
158

159
  -- destroys a queue immediately. Abandons what is left in the queue.
160
  -- Releases all waiting threads with `nil+"destroyed"`
161
  function Queue:destroy()
7✔
162
    if self.lock then
42✔
163
      self.lock:destroy()
42✔
164
    end
165
    self.sema:destroy()
42✔
166
    setmetatable(self, destroyed_queue_mt)
42✔
167

168
    -- clear anything left in the queue
169
    for key in pairs(self.list) do
49✔
170
      self.list[key] = nil
7✔
171
    end
172

173
    return true
42✔
174
  end
175
end
176

177

178
-- adds a worker that will handle whatever is passed into the queue. Can be called
179
-- multiple times to add more workers.
180
-- The threads automatically exit when the queue is destroyed.
181
-- worker function signature: `function(item)` (Note: worker functions run
182
-- unprotected, so wrap code in an (x)pcall if errors are expected, otherwise the
183
-- worker will exit on an error, and queue handling will stop)
184
-- Returns the coroutine added.
185
function Queue:add_worker(worker)
7✔
186
  assert(type(worker) == "function", "expected worker to be a function")
42✔
187
  local coro
188

189
  self.worker_id = self.worker_id + 1
42✔
190
  local worker_name = self.name .. ":worker_" .. self.worker_id
42✔
191

192
  coro = copas.addnamedthread(worker_name, function()
84✔
193
    while true do
194
      local item, err = self:pop(math.huge) -- wait forever
109✔
195
      if err then
105✔
196
        break -- queue destroyed, exit
24✔
197
      end
198
      worker(item) -- TODO: wrap in errorhandling
81✔
199
    end
200
    self.workers[coro] = nil
42✔
201
    if self.exit_semaphore then
42✔
202
      self.exit_semaphore:give(1)
42✔
203
    end
204
  end)
205

206
  self.workers[coro] = true
42✔
207
  return coro
42✔
208
end
209

210
-- returns a list/array of current workers (coroutines) handling the queue.
211
-- (only the workers added by `add_worker`, and still active, will be in this list)
212
function Queue:get_workers()
7✔
213
  local lst = {}
×
214
  for coro in pairs(self.workers) do
×
215
    if coroutine.status(coro) ~= "dead" then
×
216
      lst[#lst+1] = coro
×
217
    end
218
  end
219
  return lst
×
220
end
221

222
return Queue
7✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc