• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarmodules / copas / 3803670722

pending completion
3803670722

push

github

Thijs Schreijer
release 4.6.0

1265 of 1488 relevant lines covered (85.01%)

5054.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.39
/src/copas/queue.lua
1
local copas = require "copas"
1✔
2
local Sema = require "copas.semaphore"
1✔
3
local Lock = require "copas.lock"
1✔
4

5

6
local Queue = {}
1✔
7
Queue.__index = Queue
1✔
8

9

10
local new_name do
1✔
11
  local count = 0
1✔
12

13
  function new_name()
×
14
    count = count + 1
4✔
15
    return "copas_queue_" .. count
4✔
16
  end
17
end
18

19

20
-- Creates a new Queue instance
21
function Queue.new(opts)
1✔
22
  opts = opts or {}
4✔
23
  local self = {}
4✔
24
  setmetatable(self, Queue)
4✔
25
  self.name = opts.name or new_name()
4✔
26
  self.sema = Sema.new(10^9)
4✔
27
  self.head = 1
4✔
28
  self.tail = 1
4✔
29
  self.list = {}
4✔
30
  self.workers = setmetatable({}, { __mode = "k" })
4✔
31
  self.stopping = false
4✔
32
  self.worker_id = 0
4✔
33
  return self
4✔
34
end
35

36

37
-- Pushes an item in the queue (can be 'nil')
38
-- returns true, or nil+err ("stopping", or "destroyed")
39
function Queue:push(item)
1✔
40
  if self.stopping then
15✔
41
    return nil, "stopping"
×
42
  end
43
  self.list[self.head] = item
15✔
44
  self.head = self.head + 1
15✔
45
  self.sema:give()
15✔
46
  return true
15✔
47
end
48

49

50
-- Pops and item from the queue. If there are no items in the queue it will yield
51
-- until there are or a timeout happens (exception is when `timeout == 0`, then it will
52
-- not yield but return immediately). If the timeout is `math.huge` it will wait forever.
53
-- Returns item, or nil+err ("timeout", or "destroyed")
54
function Queue:pop(timeout)
1✔
55
  local ok, err = self.sema:take(1, timeout)
17✔
56
  if not ok then
17✔
57
    return ok, err
3✔
58
  end
59

60
  local item = self.list[self.tail]
14✔
61
  self.list[self.tail] = nil
14✔
62
  self.tail = self.tail + 1
14✔
63

64
  if self.tail == self.head then
14✔
65
    -- reset queue
66
    self.list = {}
6✔
67
    self.tail = 1
6✔
68
    self.head = 1
6✔
69
    if self.stopping then
6✔
70
      -- we're stopping and last item being returned, so we're done
71
      self:destroy()
1✔
72
    end
73
  end
74
  return item
14✔
75
end
76

77

78
-- return the number of items left in the queue
79
function Queue:get_size()
1✔
80
  return self.head - self.tail
8✔
81
end
82

83

84
-- instructs the queue to stop. Will not accept any more 'push' calls.
85
-- will autocall 'destroy' when the queue is empty.
86
-- returns immediately. See `finish`
87
function Queue:stop()
1✔
88
  if not self.stopping then
4✔
89
    self.stopping = true
4✔
90
    self.lock = Lock.new(nil, true)
4✔
91
    self.lock:get() -- close the lock
4✔
92
    if self:get_size() == 0 then
4✔
93
      -- queue is already empty, so "pop" function cannot call destroy on next
94
      -- pop, so destroy now.
95
      self:destroy()
2✔
96
    end
97
  end
98
  return true
4✔
99
end
100

101

102
-- Finishes a queue. Calls stop and then waits for the queue to run empty (and be
103
-- destroyed) before returning. returns true or nil+err ("timeout", or "destroyed")
104
-- Parameter no_destroy_on_timeout indicates if the queue is not to be forcefully
105
-- destroyed on a timeout.
106
function Queue:finish(timeout, no_destroy_on_timeout)
1✔
107
  self:stop()
2✔
108
  local _, err = self.lock:get(timeout)
2✔
109
  -- the lock never gets released, only destroyed, so we have to check the error string
110
  if err == "timeout" then
2✔
111
    if not no_destroy_on_timeout then
1✔
112
      self:destroy()
1✔
113
    end
114
    return nil, err
1✔
115
  end
116
  return true
1✔
117
end
118

119

120
do
121
  local destroyed_func = function()
122
    return nil, "destroyed"
3✔
123
  end
124

125
  local destroyed_queue_mt = {
1✔
126
    __index = function()
127
      return destroyed_func
3✔
128
    end
129
  }
130

131
  -- destroys a queue immediately. Abandons what is left in the queue.
132
  -- Releases all waiting threads with `nil+"destroyed"`
133
  function Queue:destroy()
1✔
134
    if self.lock then
4✔
135
      self.lock:destroy()
4✔
136
    end
137
    self.sema:destroy()
4✔
138
    setmetatable(self, destroyed_queue_mt)
4✔
139

140
    -- clear anything left in the queue
141
    for key in pairs(self.list) do
5✔
142
      self.list[key] = nil
1✔
143
    end
144

145
    return true
4✔
146
  end
147
end
148

149

150
-- adds a worker that will handle whatever is passed into the queue. Can be called
151
-- multiple times to add more workers.
152
-- The threads automatically exit when the queue is destroyed.
153
-- worker function signature: `function(item)` (Note: worker functions run
154
-- unprotected, so wrap code in an (x)pcall if errors are expected, otherwise the
155
-- worker will exit on an error, and queue handling will stop)
156
-- Returns the coroutine added.
157
function Queue:add_worker(worker)
1✔
158
  assert(type(worker) == "function", "expected worker to be a function")
3✔
159
  local coro
160

161
  self.worker_id = self.worker_id + 1
3✔
162
  local worker_name = self.name .. ":worker_" .. self.worker_id
3✔
163

164
  coro = copas.addnamedthread(worker_name, function()
6✔
165
    while true do
166
      local item, err = self:pop(math.huge) -- wait forever
9✔
167
      if err then
9✔
168
        break -- queue destroyed, exit
3✔
169
      end
170
      worker(item) -- TODO: wrap in errorhandling
6✔
171
    end
172
    self.workers[coro] = nil
3✔
173
  end)
174

175
  self.workers[coro] = true
3✔
176
  return coro
3✔
177
end
178

179
-- returns a list/array of current workers (coroutines) handling the queue.
180
-- (only the workers added by `add_worker`, and still active, will be in this list)
181
function Queue:get_workers()
1✔
182
  local lst = {}
×
183
  for coro in pairs(self.workers) do
×
184
    if coroutine.status(coro) ~= "dead" then
×
185
      lst[#lst+1] = coro
×
186
    end
187
  end
188
  return lst
×
189
end
190

191
return Queue
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc