• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarmodules / copas / 3874175729

pending completion
3874175729

push

github

Thijs Schreijer
feat(cli) add a runtime script to run code in a copas environment

1284 of 1497 relevant lines covered (85.77%)

14626.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.63
/src/copas.lua
1
-------------------------------------------------------------------------------
2
-- Copas - Coroutine Oriented Portable Asynchronous Services
3
--
4
-- A dispatcher based on coroutines that can be used by TCP/IP servers.
5
-- Uses LuaSocket as the interface with the TCP/IP stack.
6
--
7
-- Authors: Andre Carregal, Javier Guerra, and Fabio Mascarenhas
8
-- Contributors: Diego Nehab, Mike Pall, David Burgess, Leonardo Godinho,
9
--               Thomas Harning Jr., and Gary NG
10
--
11
-- Copyright 2005-2022 - Kepler Project (www.keplerproject.org)
12
--
13
-- $Id: copas.lua,v 1.37 2009/04/07 22:09:52 carregal Exp $
14
-------------------------------------------------------------------------------
15

16
if package.loaded["socket.http"] and (_VERSION=="Lua 5.1") then     -- obsolete: only for Lua 5.1 compatibility
28✔
17
  error("you must require copas before require'ing socket.http")
×
18
end
19
if package.loaded["copas.http"] and (_VERSION=="Lua 5.1") then     -- obsolete: only for Lua 5.1 compatibility
28✔
20
  error("you must require copas before require'ing copas.http")
×
21
end
22

23

24
local socket = require "socket"
28✔
25
local binaryheap = require "binaryheap"
28✔
26
local gettime = socket.gettime
28✔
27
local ssl -- only loaded upon demand
28

29
local WATCH_DOG_TIMEOUT = 120
28✔
30
local UDP_DATAGRAM_MAX = socket._DATAGRAMSIZE or 8192
28✔
31
local TIMEOUT_PRECISION = 0.1  -- 100ms
28✔
32
local fnil = function() end
49,817✔
33

34

35
local coroutine_create = coroutine.create
28✔
36
local coroutine_running = coroutine.running
28✔
37
local coroutine_yield = coroutine.yield
28✔
38
local coroutine_resume = coroutine.resume
28✔
39
local coroutine_status = coroutine.status
28✔
40

41

42
-- nil-safe versions for pack/unpack
43
local _unpack = unpack or table.unpack
28✔
44
local unpack = function(t, i, j) return _unpack(t, i or 1, j or t.n or #t) end
292✔
45
local pack = function(...) return { n = select("#", ...), ...} end
423✔
46

47

48
local pcall = pcall
28✔
49
if _VERSION=="Lua 5.1" and not jit then     -- obsolete: only for Lua 5.1 compatibility
28✔
50
  pcall = require("coxpcall").pcall
×
51
  coroutine_running = require("coxpcall").running
×
52
end
53

54

55
do
56
  -- Redefines LuaSocket functions with coroutine safe versions (pure Lua)
57
  -- (this allows the use of socket.http from within copas)
58
  local err_mt = {
28✔
59
    __tostring = function (self)
60
      return "Copas 'try' error intermediate table: '"..tostring(self[1].."'")
×
61
    end,
62
  }
63

64
  local function statusHandler(status, ...)
65
    if status then return ... end
14✔
66
    local err = (...)
5✔
67
    if type(err) == "table" and getmetatable(err) == err_mt then
5✔
68
      return nil, err[1]
5✔
69
    else
70
      error(err)
×
71
    end
72
  end
73

74
  function socket.protect(func)
28✔
75
    return function (...)
76
            return statusHandler(pcall(func, ...))
28✔
77
          end
78
  end
79

80
  function socket.newtry(finalizer)
28✔
81
    return function (...)
82
            local status = (...)
197✔
83
            if not status then
197✔
84
              pcall(finalizer or fnil, select(2, ...))
5✔
85
              error(setmetatable({ (select(2, ...)) }, err_mt), 0)
5✔
86
            end
87
            return ...
192✔
88
          end
89
  end
90

91
  socket.try = socket.newtry()
56✔
92
end
93

94

95
-- Setup the Copas meta table to auto-load submodules and define a default method
96
local copas do
28✔
97
  local submodules = { "ftp", "http", "lock", "queue", "semaphore", "smtp", "timer" }
28✔
98
  for i, key in ipairs(submodules) do
224✔
99
    submodules[key] = true
196✔
100
    submodules[i] = nil
196✔
101
  end
102

103
  copas = setmetatable({},{
56✔
104
    __index = function(self, key)
105
      if submodules[key] then
10✔
106
        self[key] = require("copas."..key)
10✔
107
        submodules[key] = nil
10✔
108
        return rawget(self, key)
10✔
109
      end
110
    end,
111
    __call = function(self, ...)
112
      return self.loop(...)
×
113
    end,
114
  })
28✔
115
end
116

117

118
-- Meta information is public even if beginning with an "_"
119
copas._COPYRIGHT   = "Copyright (C) 2005-2022 Kepler Project"
28✔
120
copas._DESCRIPTION = "Coroutine Oriented Portable Asynchronous Services"
28✔
121
copas._VERSION     = "Copas 4.6.0"
28✔
122

123
-- Close the socket associated with the current connection after the handler finishes
124
copas.autoclose = true
28✔
125

126
-- indicator for the loop running
127
copas.running = false
28✔
128

129

130
-------------------------------------------------------------------------------
131
-- Object names, to track names of thread/coroutines and sockets
132
-------------------------------------------------------------------------------
133
local object_names = setmetatable({}, {
56✔
134
  __mode = "k",
135
  __index = function(self, key)
136
    local name = tostring(key)
27✔
137
    if key ~= nil then
27✔
138
      rawset(self, key, name)
27✔
139
    end
140
    return name
27✔
141
  end
142
})
143

144
-------------------------------------------------------------------------------
145
-- Simple set implementation
146
-- adds a FIFO queue for each socket in the set
147
-------------------------------------------------------------------------------
148

149
local function newsocketset()
150
  local set = {}
84✔
151

152
  do  -- set implementation
153
    local reverse = {}
84✔
154

155
    -- Adds a socket to the set, does nothing if it exists
156
    -- @return skt if added, or nil if it existed
157
    function set:insert(skt)
84✔
158
      if not reverse[skt] then
161✔
159
        self[#self + 1] = skt
161✔
160
        reverse[skt] = #self
161✔
161
        return skt
161✔
162
      end
163
    end
164

165
    -- Removes socket from the set, does nothing if not found
166
    -- @return skt if removed, or nil if it wasn't in the set
167
    function set:remove(skt)
84✔
168
      local index = reverse[skt]
227✔
169
      if index then
227✔
170
        reverse[skt] = nil
158✔
171
        local top = self[#self]
158✔
172
        self[#self] = nil
158✔
173
        if top ~= skt then
158✔
174
          reverse[top] = index
22✔
175
          self[index] = top
22✔
176
        end
177
        return skt
158✔
178
      end
179
    end
180

181
  end
182

183
  do  -- queues implementation
184
    local fifo_queues = setmetatable({},{
168✔
185
      __mode = "k",                 -- auto collect queue if socket is gone
186
      __index = function(self, skt) -- auto create fifo queue if not found
187
        local newfifo = {}
55✔
188
        self[skt] = newfifo
55✔
189
        return newfifo
55✔
190
      end,
191
    })
192

193
    -- pushes an item in the fifo queue for the socket.
194
    function set:push(skt, itm)
84✔
195
      local queue = fifo_queues[skt]
146✔
196
      queue[#queue + 1] = itm
146✔
197
    end
198

199
    -- pops an item from the fifo queue for the socket
200
    function set:pop(skt)
84✔
201
      local queue = fifo_queues[skt]
133✔
202
      return table.remove(queue, 1)
133✔
203
    end
204

205
  end
206

207
  return set
84✔
208
end
209

210

211

212
-- Threads immediately resumable
213
local _resumable = {} do
28✔
214
  local resumelist = {}
28✔
215

216
  function _resumable:push(co)
28✔
217
    resumelist[#resumelist + 1] = co
49,769✔
218
  end
219

220
  function _resumable:clear_resumelist()
28✔
221
    local lst = resumelist
48,208✔
222
    resumelist = {}
48,208✔
223
    return lst
48,208✔
224
  end
225

226
  function _resumable:done()
28✔
227
    return resumelist[1] == nil
48,619✔
228
  end
229

230
  function _resumable:count()
28✔
231
    return #resumelist + #_resumable
×
232
  end
233

234
end
235

236

237

238
-- Similar to the socket set above, but tailored for the use of
239
-- sleeping threads
240
local _sleeping = {} do
28✔
241

242
  local heap = binaryheap.minUnique()
28✔
243
  local lethargy = setmetatable({}, { __mode = "k" }) -- list of coroutines sleeping without a wakeup time
28✔
244

245

246
  -- Required base implementation
247
  -----------------------------------------
248
  _sleeping.insert = fnil
28✔
249
  _sleeping.remove = fnil
28✔
250

251
  -- push a new timer on the heap
252
  function _sleeping:push(sleeptime, co)
28✔
253
    if sleeptime < 0 then
49,788✔
254
      lethargy[co] = true
515✔
255
    elseif sleeptime == 0 then
49,273✔
256
      _resumable:push(co)
96,358✔
257
    else
258
      heap:insert(gettime() + sleeptime, co)
1,094✔
259
    end
260
  end
261

262
  -- find the thread that should wake up to the time, if any
263
  function _sleeping:pop(time)
28✔
264
    if time < (heap:peekValue() or math.huge) then
98,546✔
265
      return
48,208✔
266
    end
267
    return heap:pop()
1,065✔
268
  end
269

270
  -- additional methods for time management
271
  -----------------------------------------
272
  function _sleeping:getnext()  -- returns delay until next sleep expires, or nil if there is none
28✔
273
    local t = heap:peekValue()
829✔
274
    if t then
829✔
275
      -- never report less than 0, because select() might block
276
      return math.max(t - gettime(), 0)
829✔
277
    end
278
  end
279

280
  function _sleeping:wakeup(co)
28✔
281
    if lethargy[co] then
515✔
282
      lethargy[co] = nil
512✔
283
      _resumable:push(co)
512✔
284
      return
512✔
285
    end
286
    if heap:remove(co) then
6✔
287
      _resumable:push(co)
1✔
288
    end
289
  end
290

291
  -- @param tos number of timeouts running
292
  function _sleeping:done(tos)
28✔
293
    -- return true if we have nothing more to do
294
    -- the timeout task doesn't qualify as work (fallbacks only),
295
    -- the lethargy also doesn't qualify as work ('dead' tasks),
296
    -- but the combination of a timeout + a lethargy can be work
297
    return heap:size() == 1       -- 1 means only the timeout-timer task is running
626✔
298
           and not (tos > 0 and next(lethargy))
313✔
299
  end
300

301
  -- gets number of threads in binaryheap and lethargy
302
  function _sleeping:status()
28✔
303
    local c = 0
×
304
    for _ in pairs(lethargy) do c = c + 1 end
×
305

306
    return heap:size(), c
×
307
  end
308

309
end   -- _sleeping
310

311

312

313
-------------------------------------------------------------------------------
314
-- Tracking coroutines and sockets
315
-------------------------------------------------------------------------------
316

317
local _servers = newsocketset() -- servers being handled
28✔
318
local _threads = setmetatable({}, {__mode = "k"})  -- registered threads added with addthread()
28✔
319
local _canceled = setmetatable({}, {__mode = "k"}) -- threads that are canceled and pending removal
28✔
320
local _autoclose = setmetatable({}, {__mode = "kv"}) -- sockets (value) to close when a thread (key) exits
28✔
321
local _autoclose_r = setmetatable({}, {__mode = "kv"}) -- reverse: sockets (key) to close when a thread (value) exits
28✔
322

323

324
-- for each socket we log the last read and last write times to enable the
325
-- watchdog to follow up if it takes too long.
326
-- tables contain the time, indexed by the socket
327
local _reading_log = {}
28✔
328
local _writing_log = {}
28✔
329

330
local _closed = {} -- track sockets that have been closed (list/array)
28✔
331

332
local _reading = newsocketset() -- sockets currently being read
28✔
333
local _writing = newsocketset() -- sockets currently being written
28✔
334
local _isSocketTimeout = { -- set of errors indicating a socket-timeout
28✔
335
  ["timeout"] = true,      -- default LuaSocket timeout
336
  ["wantread"] = true,     -- LuaSec specific timeout
337
  ["wantwrite"] = true,    -- LuaSec specific timeout
338
}
339

340
-------------------------------------------------------------------------------
341
-- Coroutine based socket timeouts.
342
-------------------------------------------------------------------------------
343
local user_timeouts_connect
344
local user_timeouts_send
345
local user_timeouts_receive
346
do
347
  local timeout_mt = {
28✔
348
    __mode = "k",
349
    __index = function(self, skt)
350
      -- if there is no timeout found, we insert one automatically, to block forever
351
      self[skt] = math.huge
63✔
352
      return self[skt]
63✔
353
    end,
354
  }
355

356
  user_timeouts_connect = setmetatable({}, timeout_mt)
28✔
357
  user_timeouts_send = setmetatable({}, timeout_mt)
28✔
358
  user_timeouts_receive = setmetatable({}, timeout_mt)
28✔
359
end
360

361
local useSocketTimeoutErrors = setmetatable({},{ __mode = "k" })
28✔
362

363

364
-- sto = socket-time-out
365
local sto_timeout, sto_timed_out, sto_change_queue, sto_error do
28✔
366

367
  local socket_register = setmetatable({}, { __mode = "k" })    -- socket by coroutine
28✔
368
  local operation_register = setmetatable({}, { __mode = "k" }) -- operation "read"/"write" by coroutine
28✔
369
  local timeout_flags = setmetatable({}, { __mode = "k" })      -- true if timedout, by coroutine
28✔
370

371

372
  local function socket_callback(co)
373
    local skt = socket_register[co]
12✔
374
    local queue = operation_register[co]
12✔
375

376
    -- flag the timeout and resume the coroutine
377
    timeout_flags[co] = true
12✔
378
    _resumable:push(co)
12✔
379

380
    -- clear the socket from the current queue
381
    if queue == "read" then
12✔
382
      _reading:remove(skt)
20✔
383
    elseif queue == "write" then
2✔
384
      _writing:remove(skt)
4✔
385
    else
386
      error("bad queue name; expected 'read'/'write', got: "..tostring(queue))
×
387
    end
388
  end
389

390

391
  -- Sets a socket timeout.
392
  -- Calling it as `sto_timeout()` will cancel the timeout.
393
  -- @param queue (string) the queue the socket is currently in, must be either "read" or "write"
394
  -- @param skt (socket) the socket on which to operate
395
  -- @param use_connect_to (bool) timeout to use is determined based on queue (read/write) or if this
396
  -- is truthy, it is the connect timeout.
397
  -- @return true
398
  function sto_timeout(skt, queue, use_connect_to)
28✔
399
    local co = coroutine_running()
888,907✔
400
    socket_register[co] = skt
888,907✔
401
    operation_register[co] = queue
888,907✔
402
    timeout_flags[co] = nil
888,907✔
403
    if skt then
888,907✔
404
      local to = (use_connect_to and user_timeouts_connect[skt]) or
444,468✔
405
                 (queue == "read" and user_timeouts_receive[skt]) or
444,418✔
406
                 user_timeouts_send[skt]
2,449✔
407
      copas.timeout(to, socket_callback)
888,920✔
408
    else
409
      copas.timeout(0)
444,447✔
410
    end
411
    return true
888,907✔
412
  end
413

414

415
  -- Changes the timeout to a different queue (read/write).
416
  -- Only usefull with ssl-handshakes and "wantread", "wantwrite" errors, when
417
  -- the queue has to be changed, so the timeout handler knows where to find the socket.
418
  -- @param queue (string) the new queue the socket is in, must be either "read" or "write"
419
  -- @return true
420
  function sto_change_queue(queue)
28✔
421
    operation_register[coroutine_running()] = queue
127✔
422
    return true
127✔
423
  end
424

425

426
  -- Responds with `true` if the operation timed-out.
427
  function sto_timed_out()
28✔
428
    return timeout_flags[coroutine_running()]
158✔
429
  end
430

431

432
  -- Returns the proper timeout error
433
  function sto_error(err)
28✔
434
    return useSocketTimeoutErrors[coroutine_running()] and err or "timeout"
12✔
435
  end
436
end
437

438

439

440
-------------------------------------------------------------------------------
441
-- Coroutine based socket I/O functions.
442
-------------------------------------------------------------------------------
443

444
-- Returns "tcp"" for plain TCP and "ssl" for ssl-wrapped sockets, so truthy
445
-- for tcp based, and falsy for udp based.
446
local isTCP do
28✔
447
  local lookup = {
28✔
448
    tcp = "tcp",
449
    SSL = "ssl",
450
  }
451

452
  function isTCP(socket)
28✔
453
    return lookup[tostring(socket):sub(1,3)]
182✔
454
  end
455
end
456

457
function copas.close(skt, ...)
28✔
458
  _closed[#_closed+1] = skt
30✔
459
  return skt:close(...)
30✔
460
end
461

462

463

464
-- nil or negative is indefinitly
465
function copas.settimeout(skt, timeout)
28✔
466
  timeout = timeout or -1
29✔
467
  if type(timeout) ~= "number" then
29✔
468
    return nil, "timeout must be 'nil' or a number"
×
469
  end
470

471
  return copas.settimeouts(skt, timeout, timeout, timeout)
29✔
472
end
473

474
-- negative is indefinitly, nil means do not change
475
function copas.settimeouts(skt, connect, send, read)
28✔
476

477
  if connect ~= nil and type(connect) ~= "number" then
64✔
478
    return nil, "connect timeout must be 'nil' or a number"
×
479
  end
480
  if connect then
64✔
481
    if connect < 0 then
64✔
482
      connect = nil
×
483
    end
484
    user_timeouts_connect[skt] = connect
64✔
485
  end
486

487

488
  if send ~= nil and type(send) ~= "number" then
64✔
489
    return nil, "send timeout must be 'nil' or a number"
×
490
  end
491
  if send then
64✔
492
    if send < 0 then
64✔
493
      send = nil
×
494
    end
495
    user_timeouts_send[skt] = send
64✔
496
  end
497

498

499
  if read ~= nil and type(read) ~= "number" then
64✔
500
    return nil, "read timeout must be 'nil' or a number"
×
501
  end
502
  if read then
64✔
503
    if read < 0 then
64✔
504
      read = nil
×
505
    end
506
    user_timeouts_receive[skt] = read
64✔
507
  end
508

509

510
  return true
64✔
511
end
512

513
-- reads a pattern from a client and yields to the reading set on timeouts
514
-- UDP: a UDP socket expects a second argument to be a number, so it MUST
515
-- be provided as the 'pattern' below defaults to a string. Will throw a
516
-- 'bad argument' error if omitted.
517
function copas.receive(client, pattern, part)
28✔
518
  local s, err
519
  pattern = pattern or "*l"
441,961✔
520
  local current_log = _reading_log
441,961✔
521
  sto_timeout(client, "read")
441,961✔
522

523
  repeat
524
    s, err, part = client:receive(pattern, part)
442,033✔
525

526
    -- guarantees that high throughput doesn't take other threads to starvation
527
    if (math.random(100) > 90) then
442,033✔
528
      copas.pause()
44,315✔
529
    end
530

531
    if s then
442,033✔
532
      current_log[client] = nil
441,946✔
533
      sto_timeout()
441,946✔
534
      return s, err, part
441,946✔
535

536
    elseif not _isSocketTimeout[err] then
87✔
537
      current_log[client] = nil
5✔
538
      sto_timeout()
5✔
539
      return s, err, part
5✔
540

541
    elseif sto_timed_out() then
164✔
542
      current_log[client] = nil
9✔
543
      return nil, sto_error(err), part
18✔
544
    end
545

546
    if err == "wantwrite" then -- wantwrite may be returned during SSL renegotiations
73✔
547
      current_log = _writing_log
×
548
      current_log[client] = gettime()
×
549
      sto_change_queue("write")
×
550
      coroutine_yield(client, _writing)
×
551
    else
552
      current_log = _reading_log
73✔
553
      current_log[client] = gettime()
73✔
554
      sto_change_queue("read")
73✔
555
      coroutine_yield(client, _reading)
73✔
556
    end
557
  until false
72✔
558
end
559

560
-- receives data from a client over UDP. Not available for TCP.
561
-- (this is a copy of receive() method, adapted for receivefrom() use)
562
function copas.receivefrom(client, size)
28✔
563
  local s, err, port
564
  size = size or UDP_DATAGRAM_MAX
4✔
565
  sto_timeout(client, "read")
4✔
566

567
  repeat
568
    s, err, port = client:receivefrom(size) -- upon success err holds ip address
8✔
569

570
    -- garantees that high throughput doesn't take other threads to starvation
571
    if (math.random(100) > 90) then
8✔
572
      copas.pause()
×
573
    end
574

575
    if s then
8✔
576
      _reading_log[client] = nil
3✔
577
      sto_timeout()
3✔
578
      return s, err, port
3✔
579

580
    elseif err ~= "timeout" then
5✔
581
      _reading_log[client] = nil
×
582
      sto_timeout()
×
583
      return s, err, port
×
584

585
    elseif sto_timed_out() then
10✔
586
      _reading_log[client] = nil
1✔
587
      return nil, sto_error(err), port
2✔
588
    end
589

590
    _reading_log[client] = gettime()
4✔
591
    coroutine_yield(client, _reading)
4✔
592
  until false
4✔
593
end
594

595
-- same as above but with special treatment when reading chunks,
596
-- unblocks on any data received.
597
function copas.receivepartial(client, pattern, part)
28✔
598
  local s, err
599
  pattern = pattern or "*l"
2✔
600
  local orig_size = #(part or "")
2✔
601
  local current_log = _reading_log
2✔
602
  sto_timeout(client, "read")
2✔
603

604
  repeat
605
    s, err, part = client:receive(pattern, part)
3✔
606

607
    -- guarantees that high throughput doesn't take other threads to starvation
608
    if (math.random(100) > 90) then
3✔
609
      copas.pause()
1✔
610
    end
611

612
    if s or (type(part) == "string" and #part > orig_size) then
3✔
613
      current_log[client] = nil
2✔
614
      sto_timeout()
2✔
615
      return s, err, part
2✔
616

617
    elseif not _isSocketTimeout[err] then
1✔
618
      current_log[client] = nil
×
619
      sto_timeout()
×
620
      return s, err, part
×
621

622
    elseif sto_timed_out() then
2✔
623
      current_log[client] = nil
×
624
      return nil, sto_error(err), part
×
625
    end
626

627
    if err == "wantwrite" then
1✔
628
      current_log = _writing_log
×
629
      current_log[client] = gettime()
×
630
      sto_change_queue("write")
×
631
      coroutine_yield(client, _writing)
×
632
    else
633
      current_log = _reading_log
1✔
634
      current_log[client] = gettime()
1✔
635
      sto_change_queue("read")
1✔
636
      coroutine_yield(client, _reading)
1✔
637
    end
638
  until false
1✔
639
end
640
copas.receivePartial = copas.receivepartial  -- compat: receivePartial is deprecated
28✔
641

642
-- sends data to a client. The operation is buffered and
643
-- yields to the writing set on timeouts
644
-- Note: from and to parameters will be ignored by/for UDP sockets
645
function copas.send(client, data, from, to)
28✔
646
  local s, err
647
  from = from or 1
2,449✔
648
  local lastIndex = from - 1
2,449✔
649
  local current_log = _writing_log
2,449✔
650
  sto_timeout(client, "write")
2,449✔
651

652
  repeat
653
    s, err, lastIndex = client:send(data, lastIndex + 1, to)
2,478✔
654

655
    -- guarantees that high throughput doesn't take other threads to starvation
656
    if (math.random(100) > 90) then
2,478✔
657
      copas.pause()
239✔
658
    end
659

660
    if s then
2,478✔
661
      current_log[client] = nil
2,445✔
662
      sto_timeout()
2,445✔
663
      return s, err, lastIndex
2,445✔
664

665
    elseif not _isSocketTimeout[err] then
33✔
666
      current_log[client] = nil
4✔
667
      sto_timeout()
4✔
668
      return s, err, lastIndex
4✔
669

670
    elseif sto_timed_out() then
58✔
671
      current_log[client] = nil
×
672
      return nil, sto_error(err), lastIndex
×
673
    end
674

675
    if err == "wantread" then
29✔
676
      current_log = _reading_log
×
677
      current_log[client] = gettime()
×
678
      sto_change_queue("read")
×
679
      coroutine_yield(client, _reading)
×
680
    else
681
      current_log = _writing_log
29✔
682
      current_log[client] = gettime()
29✔
683
      sto_change_queue("write")
29✔
684
      coroutine_yield(client, _writing)
29✔
685
    end
686
  until false
29✔
687
end
688

689
function copas.sendto(client, data, ip, port)
28✔
690
  -- deprecated; for backward compatibility only, since UDP doesn't block on sending
691
  return client:sendto(data, ip, port)
×
692
end
693

694
-- waits until connection is completed
695
function copas.connect(skt, host, port)
28✔
696
  skt:settimeout(0)
30✔
697
  local ret, err, tried_more_than_once
698
  sto_timeout(skt, "write", true)
29✔
699

700
  repeat
701
    ret, err = skt:connect(host, port)
48✔
702

703
    -- non-blocking connect on Windows results in error "Operation already
704
    -- in progress" to indicate that it is completing the request async. So essentially
705
    -- it is the same as "timeout"
706
    if ret or (err ~= "timeout" and err ~= "Operation already in progress") then
44✔
707
      _writing_log[skt] = nil
27✔
708
      sto_timeout()
27✔
709
      -- Once the async connect completes, Windows returns the error "already connected"
710
      -- to indicate it is done, so that error should be ignored. Except when it is the
711
      -- first call to connect, then it was already connected to something else and the
712
      -- error should be returned
713
      if (not ret) and (err == "already connected" and tried_more_than_once) then
27✔
714
        return 1
×
715
      end
716
      return ret, err
27✔
717

718
    elseif sto_timed_out() then
34✔
719
      _writing_log[skt] = nil
2✔
720
      return nil, sto_error(err)
4✔
721
    end
722

723
    tried_more_than_once = tried_more_than_once or true
15✔
724
    _writing_log[skt] = gettime()
15✔
725
    coroutine_yield(skt, _writing)
15✔
726
  until false
15✔
727
end
728

729

730
-- Wraps a tcp socket in an ssl socket and configures it. If the socket was
731
-- already wrapped, it does nothing and returns the socket.
732
-- @param wrap_params the parameters for the ssl-context
733
-- @return wrapped socket, or throws an error
734
local function ssl_wrap(skt, wrap_params)
735
  if isTCP(skt) == "ssl" then return skt end -- was already wrapped
52✔
736
  if not wrap_params then
15✔
737
    error("cannot wrap socket into a secure socket (using 'ssl.wrap()') without parameters/context")
×
738
  end
739

740
  ssl = ssl or require("ssl")
15✔
741
  local nskt = assert(ssl.wrap(skt, wrap_params)) -- assert, because we do not want to silently ignore this one!!
30✔
742

743
  nskt:settimeout(0)  -- non-blocking on the ssl-socket
15✔
744
  copas.settimeouts(nskt, user_timeouts_connect[skt],
30✔
745
    user_timeouts_send[skt], user_timeouts_receive[skt]) -- copy copas user-timeout to newly wrapped one
19✔
746

747
  local co = _autoclose_r[skt]
15✔
748
  if co then
15✔
749
    -- socket registered for autoclose, move registration to wrapped one
750
    _autoclose[co] = nskt
4✔
751
    _autoclose_r[skt] = nil
4✔
752
    _autoclose_r[nskt] = co
4✔
753
  end
754

755
  local sock_name = object_names[skt]
15✔
756
  if sock_name ~= tostring(skt) then
15✔
757
    -- socket had a custom name, so copy it over
758
    object_names[nskt] = sock_name
6✔
759
  end
760
  return nskt
15✔
761
end
762

763

764
-- For each luasec method we have a subtable, allows for future extension.
765
-- Required structure:
766
-- {
767
--   wrap = ... -- parameter to 'wrap()'; the ssl parameter table, or the context object
768
--   sni = {                  -- parameters to 'sni()'
769
--     names = string | table -- 1st parameter
770
--     strict = bool          -- 2nd parameter
771
--   }
772
-- }
773
local function normalize_sslt(sslt)
774
  local t = type(sslt)
46✔
775
  local r = setmetatable({}, {
92✔
776
    __index = function(self, key)
777
      -- a bug if this happens, here as a sanity check, just being careful since
778
      -- this is security stuff
779
      error("accessing unknown 'ssl_params' table key: "..tostring(key))
×
780
    end,
781
  })
782
  if t == "nil" then
46✔
783
    r.wrap = false
31✔
784
    r.sni = false
31✔
785

786
  elseif t == "table" then
15✔
787
    if sslt.mode or sslt.protocol then
15✔
788
      -- has the mandatory fields for the ssl-params table for handshake
789
      -- backward compatibility
790
      r.wrap = sslt
4✔
791
      r.sni = false
4✔
792
    else
793
      -- has the target definition, copy our known keys
794
      r.wrap = sslt.wrap or false -- 'or false' because we do not want nils
11✔
795
      r.sni = sslt.sni or false -- 'or false' because we do not want nils
11✔
796
    end
797

798
  elseif t == "userdata" then
×
799
    -- it's an ssl-context object for the handshake
800
    -- backward compatibility
801
    r.wrap = sslt
×
802
    r.sni = false
×
803

804
  else
805
    error("ssl parameters; did not expect type "..tostring(sslt))
×
806
  end
807

808
  return r
46✔
809
end
810

811

812
---
813
-- Peforms an (async) ssl handshake on a connected TCP client socket.
814
-- NOTE: if not ssl-wrapped already, then replace all previous socket references, with the returned new ssl wrapped socket
815
-- Throws error and does not return nil+error, as that might silently fail
816
-- in code like this;
817
--   copas.addserver(s1, function(skt)
818
--       skt = copas.wrap(skt, sparams)
819
--       skt:dohandshake()   --> without explicit error checking, this fails silently and
820
--       skt:send(body)      --> continues unencrypted
821
-- @param skt Regular LuaSocket CLIENT socket object
822
-- @param wrap_params Table with ssl parameters
823
-- @return wrapped ssl socket, or throws an error
824
function copas.dohandshake(skt, wrap_params)
28✔
825
  ssl = ssl or require("ssl")
15✔
826

827
  local nskt = ssl_wrap(skt, wrap_params)
15✔
828

829
  sto_timeout(nskt, "write", true)
15✔
830
  local queue
831

832
  repeat
833
    local success, err = nskt:dohandshake()
39✔
834

835
    if success then
39✔
836
      sto_timeout()
13✔
837
      return nskt
13✔
838

839
    elseif not _isSocketTimeout[err] then
26✔
840
      sto_timeout()
2✔
841
      error("TLS/SSL handshake failed: " .. tostring(err))
2✔
842

843
    elseif sto_timed_out() then
48✔
844
      return nil, sto_error(err)
×
845

846
    elseif err == "wantwrite" then
24✔
847
      sto_change_queue("write")
×
848
      queue = _writing
×
849

850
    elseif err == "wantread" then
24✔
851
      sto_change_queue("read")
24✔
852
      queue = _reading
24✔
853

854
    else
855
      error("TLS/SSL handshake failed: " .. tostring(err))
×
856
    end
857

858
    coroutine_yield(nskt, queue)
24✔
859
  until false
24✔
860
end
861

862
-- flushes a client write buffer (deprecated)
863
function copas.flush()
28✔
864
end
865

866
-- wraps a TCP socket to use Copas methods (send, receive, flush and settimeout)
867
local _skt_mt_tcp = {
28✔
868
      __tostring = function(self)
869
        return tostring(self.socket).." (copas wrapped)"
×
870
      end,
871

872
      __index = {
28✔
873
        send = function (self, data, from, to)
874
          return copas.send (self.socket, data, from, to)
2,448✔
875
        end,
876

877
        receive = function (self, pattern, prefix)
878
          if user_timeouts_receive[self.socket] == 0 then
441,961✔
879
            return copas.receivepartial(self.socket, pattern, prefix)
2✔
880
          end
881
          return copas.receive(self.socket, pattern, prefix)
441,958✔
882
        end,
883

884
        receivepartial = function (self, pattern, prefix)
885
          return copas.receivepartial(self.socket, pattern, prefix)
×
886
        end,
887

888
        flush = function (self)
889
          return copas.flush(self.socket)
×
890
        end,
891

892
        settimeout = function (self, time)
893
          return copas.settimeout(self.socket, time)
29✔
894
        end,
895

896
        settimeouts = function (self, connect, send, receive)
897
          return copas.settimeouts(self.socket, connect, send, receive)
×
898
        end,
899

900
        -- TODO: socket.connect is a shortcut, and must be provided with an alternative
901
        -- if ssl parameters are available, it will also include a handshake
902
        connect = function(self, ...)
903
          local res, err = copas.connect(self.socket, ...)
29✔
904
          if res then
29✔
905
            if self.ssl_params.sni then self:sni() end
26✔
906
            if self.ssl_params.wrap then res, err = self:dohandshake() end
36✔
907
          end
908
          return res, err
28✔
909
        end,
910

911
        close = function(self, ...)
912
          return copas.close(self.socket, ...)
30✔
913
        end,
914

915
        -- TODO: socket.bind is a shortcut, and must be provided with an alternative
916
        bind = function(self, ...) return self.socket:bind(...) end,
28✔
917

918
        -- TODO: is this DNS related? hence blocking?
919
        getsockname = function(self, ...) return self.socket:getsockname(...) end,
28✔
920

921
        getstats = function(self, ...) return self.socket:getstats(...) end,
28✔
922

923
        setstats = function(self, ...) return self.socket:setstats(...) end,
28✔
924

925
        listen = function(self, ...) return self.socket:listen(...) end,
28✔
926

927
        accept = function(self, ...) return self.socket:accept(...) end,
28✔
928

929
        setoption = function(self, ...) return self.socket:setoption(...) end,
28✔
930

931
        -- TODO: is this DNS related? hence blocking?
932
        getpeername = function(self, ...) return self.socket:getpeername(...) end,
28✔
933

934
        shutdown = function(self, ...) return self.socket:shutdown(...) end,
28✔
935

936
        sni = function(self, names, strict)
937
          local sslp = self.ssl_params
11✔
938
          self.socket = ssl_wrap(self.socket, sslp.wrap)
22✔
939
          if names == nil then
11✔
940
            names = sslp.sni.names
9✔
941
            strict = sslp.sni.strict
9✔
942
          end
943
          return self.socket:sni(names, strict)
11✔
944
        end,
945

946
        dohandshake = function(self, wrap_params)
947
          local nskt, err = copas.dohandshake(self.socket, wrap_params or self.ssl_params.wrap)
15✔
948
          if not nskt then return nskt, err end
13✔
949
          self.socket = nskt  -- replace internal socket with the newly wrapped ssl one
13✔
950
          return self
13✔
951
        end,
952

953
      }
28✔
954
}
955

956
-- wraps a UDP socket, copy of TCP one adapted for UDP.
957
local _skt_mt_udp = {__index = { }}
28✔
958
for k,v in pairs(_skt_mt_tcp) do _skt_mt_udp[k] = _skt_mt_udp[k] or v end
84✔
959
for k,v in pairs(_skt_mt_tcp.__index) do _skt_mt_udp.__index[k] = v end
560✔
960

961
_skt_mt_udp.__index.send        = function(self, ...) return self.socket:send(...) end
29✔
962

963
_skt_mt_udp.__index.sendto      = function(self, ...) return self.socket:sendto(...) end
31✔
964

965

966
_skt_mt_udp.__index.receive =     function (self, size)
28✔
967
                                    return copas.receive (self.socket, (size or UDP_DATAGRAM_MAX))
2✔
968
                                  end
969

970
_skt_mt_udp.__index.receivefrom = function (self, size)
28✔
971
                                    return copas.receivefrom (self.socket, (size or UDP_DATAGRAM_MAX))
4✔
972
                                  end
973

974
                                  -- TODO: is this DNS related? hence blocking?
975
_skt_mt_udp.__index.setpeername = function(self, ...) return self.socket:setpeername(...) end
29✔
976

977
_skt_mt_udp.__index.setsockname = function(self, ...) return self.socket:setsockname(...) end
28✔
978

979
                                    -- do not close client, as it is also the server for udp.
980
_skt_mt_udp.__index.close       = function(self, ...) return true end
30✔
981

982
_skt_mt_udp.__index.settimeouts = function (self, connect, send, receive)
28✔
983
                                    return copas.settimeouts(self.socket, connect, send, receive)
×
984
                                  end
985

986

987

988
---
989
-- Wraps a LuaSocket socket object in an async Copas based socket object.
990
-- @param skt The socket to wrap
991
-- @sslt (optional) Table with ssl parameters, use an empty table to use ssl with defaults
992
-- @return wrapped socket object
993
function copas.wrap (skt, sslt)
28✔
994
  if (getmetatable(skt) == _skt_mt_tcp) or (getmetatable(skt) == _skt_mt_udp) then
50✔
995
    return skt -- already wrapped
×
996
  end
997

998
  skt:settimeout(0)
51✔
999

1000
  if isTCP(skt) then
100✔
1001
    return setmetatable ({socket = skt, ssl_params = normalize_sslt(sslt)}, _skt_mt_tcp)
92✔
1002
  else
1003
    return setmetatable ({socket = skt}, _skt_mt_udp)
4✔
1004
  end
1005
end
1006

1007
--- Wraps a handler in a function that deals with wrapping the socket and doing the
1008
-- optional ssl handshake.
1009
function copas.handler(handler, sslparams)
28✔
1010
  -- TODO: pass a timeout value to set, and use during handshake
1011
  return function (skt, ...)
1012
    skt = copas.wrap(skt, sslparams) -- this call will normalize the sslparams table
26✔
1013
    local sslp = skt.ssl_params
13✔
1014
    if sslp.sni then skt:sni(sslp.sni.names, sslp.sni.strict) end
13✔
1015
    if sslp.wrap then skt:dohandshake(sslp.wrap) end
13✔
1016
    return handler(skt, ...)
12✔
1017
  end
1018
end
1019

1020

1021
--------------------------------------------------
1022
-- Error handling
1023
--------------------------------------------------
1024

1025
local _errhandlers = setmetatable({}, { __mode = "k" })   -- error handler per coroutine
28✔
1026

1027

1028
function copas.gettraceback(msg, co, skt)
28✔
1029
  local co_str = co == nil and "nil" or copas.getthreadname(co)
5✔
1030
  local skt_str = skt == nil and "nil" or copas.getsocketname(skt)
5✔
1031
  local msg_str = msg == nil and "" or tostring(msg)
5✔
1032
  if msg_str == "" then
5✔
1033
    msg_str = ("(coroutine: %s, socket: %s)"):format(msg_str, co_str, skt_str)
×
1034
  else
1035
    msg_str = ("%s (coroutine: %s, socket: %s)"):format(msg_str, co_str, skt_str)
5✔
1036
  end
1037

1038
  if type(co) == "thread" then
5✔
1039
    -- regular Copas coroutine
1040
    return debug.traceback(co, msg_str)
5✔
1041
  end
1042
  -- not a coroutine, but the main thread, this happens if a timeout callback
1043
  -- (see `copas.timeout` causes an error (those callbacks run on the main thread).
1044
  return debug.traceback(msg_str, 2)
×
1045
end
1046

1047

1048
local function _deferror(msg, co, skt)
1049
  print(copas.gettraceback(msg, co, skt))
6✔
1050
end
1051

1052

1053
function copas.seterrorhandler(err, default)
28✔
1054
  assert(err == nil or type(err) == "function", "Expected the handler to be a function, or nil")
10✔
1055
  if default then
10✔
1056
    assert(err ~= nil, "Expected the handler to be a function when setting the default")
7✔
1057
    _deferror = err
7✔
1058
  else
1059
    _errhandlers[coroutine_running()] = err
3✔
1060
  end
1061
end
1062
copas.setErrorHandler = copas.seterrorhandler  -- deprecated; old casing
28✔
1063

1064

1065
function copas.geterrorhandler(co)
28✔
1066
  co = co or coroutine_running()
2✔
1067
  return _errhandlers[co] or _deferror
2✔
1068
end
1069

1070

1071
-- if `bool` is truthy, then the original socket errors will be returned in case of timeouts;
1072
-- `timeout, wantread, wantwrite, Operation already in progress`. If falsy, it will always
1073
-- return `timeout`.
1074
function copas.useSocketTimeoutErrors(bool)
28✔
1075
  useSocketTimeoutErrors[coroutine_running()] = not not bool -- force to a boolean
1✔
1076
end
1077

1078
-------------------------------------------------------------------------------
1079
-- Thread handling
1080
-------------------------------------------------------------------------------
1081

1082
local function _doTick (co, skt, ...)
1083
  if not co then return end
50,772✔
1084

1085
  -- if a coroutine was canceled/removed, don't resume it
1086
  if _canceled[co] then
50,772✔
1087
    _canceled[co] = nil -- also clean up the registry
2✔
1088
    _threads[co] = nil
2✔
1089
    return
2✔
1090
  end
1091

1092
  -- res: the socket (being read/write on) or the time to sleep
1093
  -- new_q: either _writing, _reading, or _sleeping
1094
  -- local time_before = gettime()
1095
  local ok, res, new_q = coroutine_resume(co, skt, ...)
50,770✔
1096
  -- local duration = gettime() - time_before
1097
  -- if duration > 1 then
1098
  --   duration = math.floor(duration * 1000)
1099
  --   pcall(_errhandlers[co] or _deferror, "task ran for "..tostring(duration).." milliseconds.", co, skt)
1100
  -- end
1101

1102
  if new_q == _reading or new_q == _writing or new_q == _sleeping then
50,768✔
1103
    -- we're yielding to a new queue
1104
    new_q:insert (res)
49,934✔
1105
    new_q:push (res, co)
49,934✔
1106
    return
49,934✔
1107
  end
1108

1109
  -- coroutine is terminating
1110

1111
  if ok and coroutine_status(co) ~= "dead" then
834✔
1112
    -- it called coroutine.yield from a non-Copas function which is unexpected
1113
    ok = false
1✔
1114
    res = "coroutine.yield was called without a resume first, user-code cannot yield to Copas"
1✔
1115
  end
1116

1117
  if not ok then
834✔
1118
    local k, e = pcall(_errhandlers[co] or _deferror, res, co, skt)
7✔
1119
    if not k then
7✔
1120
      print("Failed executing error handler: " .. tostring(e))
×
1121
    end
1122
  end
1123

1124
  local skt_to_close = _autoclose[co]
834✔
1125
  if skt_to_close then
834✔
1126
    skt_to_close:close()
18✔
1127
    _autoclose[co] = nil
18✔
1128
    _autoclose_r[skt_to_close] = nil
18✔
1129
  end
1130

1131
  _errhandlers[co] = nil
834✔
1132
end
1133

1134

1135
local _accept do
28✔
1136
  local client_counters = setmetatable({}, { __mode = "k" })
28✔
1137

1138
  -- accepts a connection on socket input
1139
  function _accept(server_skt, handler)
28✔
1140
    local client_skt = server_skt:accept()
20✔
1141
    if client_skt then
20✔
1142
      local count = (client_counters[server_skt] or 0) + 1
20✔
1143
      client_counters[server_skt] = count
20✔
1144
      object_names[client_skt] = object_names[server_skt] .. ":client_" .. count
33✔
1145

1146
      client_skt:settimeout(0)
20✔
1147
      copas.settimeouts(client_skt, user_timeouts_connect[server_skt],  -- copy server socket timeout settings
40✔
1148
        user_timeouts_send[server_skt], user_timeouts_receive[server_skt])
33✔
1149

1150
      local co = coroutine_create(handler)
20✔
1151
      object_names[co] = object_names[server_skt] .. ":handler_" .. count
20✔
1152

1153
      if copas.autoclose then
20✔
1154
        _autoclose[co] = client_skt
20✔
1155
        _autoclose_r[client_skt] = co
20✔
1156
      end
1157

1158
      _doTick(co, client_skt)
20✔
1159
    end
1160
  end
1161
end
1162

1163
-------------------------------------------------------------------------------
1164
-- Adds a server/handler pair to Copas dispatcher
1165
-------------------------------------------------------------------------------
1166

1167
do
1168
  local function addTCPserver(server, handler, timeout, name)
1169
    server:settimeout(0)
15✔
1170
    if name then
15✔
1171
      object_names[server] = name
×
1172
    end
1173
    _servers[server] = handler
15✔
1174
    _reading:insert(server)
15✔
1175
    if timeout then
15✔
1176
      copas.settimeout(server, timeout)
×
1177
    end
1178
  end
1179

1180
  local function addUDPserver(server, handler, timeout, name)
1181
    server:settimeout(0)
×
1182
    local co = coroutine_create(handler)
×
1183
    if name then
×
1184
      object_names[server] = name
×
1185
    end
1186
    object_names[co] = object_names[server]..":handler"
×
1187
    _reading:insert(server)
×
1188
    if timeout then
×
1189
      copas.settimeout(server, timeout)
×
1190
    end
1191
    _doTick(co, server)
×
1192
  end
1193

1194

1195
  function copas.addserver(server, handler, timeout, name)
28✔
1196
    if isTCP(server) then
30✔
1197
      addTCPserver(server, handler, timeout, name)
30✔
1198
    else
1199
      addUDPserver(server, handler, timeout, name)
×
1200
    end
1201
  end
1202
end
1203

1204

1205
function copas.removeserver(server, keep_open)
28✔
1206
  local skt = server
13✔
1207
  local mt = getmetatable(server)
13✔
1208
  if mt == _skt_mt_tcp or mt == _skt_mt_udp then
13✔
1209
    skt = server.socket
×
1210
  end
1211

1212
  _servers:remove(skt)
13✔
1213
  _reading:remove(skt)
13✔
1214

1215
  if keep_open then
13✔
1216
    return true
3✔
1217
  end
1218
  return server:close()
10✔
1219
end
1220

1221

1222

1223
-------------------------------------------------------------------------------
1224
-- Adds an new coroutine thread to Copas dispatcher
1225
-------------------------------------------------------------------------------
1226
function copas.addnamedthread(name, handler, ...)
28✔
1227
  if type(name) == "function" and type(handler) == "string" then
851✔
1228
    -- old call, flip args for compatibility
1229
    name, handler = handler, name
×
1230
  end
1231

1232
  -- create a coroutine that skips the first argument, which is always the socket
1233
  -- passed by the scheduler, but `nil` in case of a task/thread
1234
  local thread = coroutine_create(function(_, ...)
1,702✔
1235
    copas.pause()
851✔
1236
    return handler(...)
850✔
1237
  end)
1238
  if name then
851✔
1239
    object_names[thread] = name
61✔
1240
  end
1241

1242
  _threads[thread] = true -- register this thread so it can be removed
851✔
1243
  _doTick (thread, nil, ...)
851✔
1244
  return thread
851✔
1245
end
1246

1247

1248
function copas.addthread(handler, ...)
28✔
1249
  return copas.addnamedthread(nil, handler, ...)
790✔
1250
end
1251

1252

1253
function copas.removethread(thread)
28✔
1254
  -- if the specified coroutine is registered, add it to the canceled table so
1255
  -- that next time it tries to resume it exits.
1256
  _canceled[thread] = _threads[thread or 0]
4✔
1257
end
1258

1259

1260

1261
-------------------------------------------------------------------------------
1262
-- Sleep/pause management functions
1263
-------------------------------------------------------------------------------
1264

1265
-- yields the current coroutine and wakes it after 'sleeptime' seconds.
1266
-- If sleeptime < 0 then it sleeps until explicitly woken up using 'wakeup'
1267
-- TODO: deprecated, remove in next major
1268
function copas.sleep(sleeptime)
28✔
1269
  coroutine_yield((sleeptime or 0), _sleeping)
×
1270
end
1271

1272

1273
-- yields the current coroutine and wakes it after 'sleeptime' seconds.
1274
-- if sleeptime < 0 then it sleeps 0 seconds.
1275
function copas.pause(sleeptime)
28✔
1276
  if sleeptime and sleeptime > 0 then
49,273✔
1277
    coroutine_yield(sleeptime, _sleeping)
2,158✔
1278
  else
1279
    coroutine_yield(0, _sleeping)
48,179✔
1280
  end
1281
end
1282

1283

1284
-- yields the current coroutine until explicitly woken up using 'wakeup'
1285
function copas.pauseforever()
28✔
1286
  coroutine_yield(-1, _sleeping)
515✔
1287
end
1288

1289

1290
-- Wakes up a sleeping coroutine 'co'.
1291
function copas.wakeup(co)
28✔
1292
  _sleeping:wakeup(co)
515✔
1293
end
1294

1295

1296

1297
-------------------------------------------------------------------------------
1298
-- Timeout management
1299
-------------------------------------------------------------------------------
1300

1301
do
1302
  local timeout_register = setmetatable({}, { __mode = "k" })
28✔
1303
  local time_out_thread
1304
  local timerwheel = require("timerwheel").new({
56✔
1305
      precision = TIMEOUT_PRECISION,
28✔
1306
      ringsize = math.floor(60*60*24/TIMEOUT_PRECISION),  -- ring size 1 day
28✔
1307
      err_handler = function(err)
1308
        return _deferror(err, time_out_thread)
2✔
1309
      end,
1310
    })
1311

1312
  time_out_thread = copas.addnamedthread("copas_core_timer", function()
56✔
1313
    while true do
1314
      copas.pause(TIMEOUT_PRECISION)
963✔
1315
      timerwheel:step()
1,872✔
1316
    end
1317
  end)
1318

1319
  -- get the number of timeouts running
1320
  function copas.gettimeouts()
28✔
1321
    return timerwheel:count()
313✔
1322
  end
1323

1324
  --- Sets the timeout for the current coroutine.
1325
  -- @param delay delay (seconds), use 0 (or math.huge) to cancel the timerout
1326
  -- @param callback function with signature: `function(coroutine)` where coroutine is the routine that timed-out
1327
  -- @return true
1328
  function copas.timeout(delay, callback)
28✔
1329
    local co = coroutine_running()
889,679✔
1330
    local existing_timer = timeout_register[co]
889,679✔
1331

1332
    if existing_timer then
889,679✔
1333
      timerwheel:cancel(existing_timer)
633✔
1334
    end
1335

1336
    if delay > 0 and delay ~= math.huge then
889,679✔
1337
      timeout_register[co] = timerwheel:set(delay, callback, co)
1,790✔
1338
    elseif delay == 0 or delay == math.huge then
888,784✔
1339
      timeout_register[co] = nil
888,784✔
1340
    else
1341
      error("timout value must be greater than or equal to 0, got: "..tostring(delay))
×
1342
    end
1343

1344
    return true
889,679✔
1345
  end
1346

1347
end
1348

1349

1350
-------------------------------------------------------------------------------
1351
-- main tasks: manage readable and writable socket sets
1352
-------------------------------------------------------------------------------
1353
-- a task is an object with a required method `step()` that deals with a
1354
-- single step for that task.
1355

1356
local _tasks = {} do
28✔
1357
  function _tasks:add(tsk)
28✔
1358
    _tasks[#_tasks + 1] = tsk
112✔
1359
  end
1360
end
1361

1362

1363
-- a task to check ready to read events
1364
local _readable_task = {} do
28✔
1365

1366
  local function tick(skt)
1367
    local handler = _servers[skt]
111✔
1368
    if handler then
111✔
1369
      _accept(skt, handler)
40✔
1370
    else
1371
      _reading:remove(skt)
91✔
1372
      _doTick(_reading:pop(skt), skt)
182✔
1373
    end
1374
  end
1375

1376
  function _readable_task:step()
28✔
1377
    for _, skt in ipairs(self._events) do
48,320✔
1378
      tick(skt)
111✔
1379
    end
1380
  end
1381

1382
  _tasks:add(_readable_task)
56✔
1383
end
1384

1385

1386
-- a task to check ready to write events
1387
local _writable_task = {} do
28✔
1388

1389
  local function tick(skt)
1390
    _writing:remove(skt)
42✔
1391
    _doTick(_writing:pop(skt), skt)
84✔
1392
  end
1393

1394
  function _writable_task:step()
28✔
1395
    for _, skt in ipairs(self._events) do
48,250✔
1396
      tick(skt)
42✔
1397
    end
1398
  end
1399

1400
  _tasks:add(_writable_task)
56✔
1401
end
1402

1403

1404

1405
-- sleeping threads task
1406
local _sleeping_task = {} do
28✔
1407

1408
  function _sleeping_task:step()
28✔
1409
    local now = gettime()
48,208✔
1410

1411
    local co = _sleeping:pop(now)
48,208✔
1412
    while co do
49,273✔
1413
      -- we're pushing them to _resumable, since that list will be replaced before
1414
      -- executing. This prevents tasks running twice in a row with pause(0) for example.
1415
      -- So here we won't execute, but at _resumable step which is next
1416
      _resumable:push(co)
1,065✔
1417
      co = _sleeping:pop(now)
2,130✔
1418
    end
1419
  end
1420

1421
  _tasks:add(_sleeping_task)
28✔
1422
end
1423

1424

1425

1426
-- resumable threads task
1427
local _resumable_task = {} do
28✔
1428

1429
  function _resumable_task:step()
28✔
1430
    -- replace the resume list before iterating, so items placed in there
1431
    -- will indeed end up in the next copas step, not in this one, and not
1432
    -- create a loop
1433
    local resumelist = _resumable:clear_resumelist()
48,208✔
1434

1435
    for _, co in ipairs(resumelist) do
97,976✔
1436
      _doTick(co)
49,768✔
1437
    end
1438
  end
1439

1440
  _tasks:add(_resumable_task)
28✔
1441
end
1442

1443

1444
-------------------------------------------------------------------------------
1445
-- Checks for reads and writes on sockets
1446
-------------------------------------------------------------------------------
1447
local _select_plain do
28✔
1448

1449
  local last_cleansing = 0
28✔
1450
  local duration = function(t2, t1) return t2-t1 end
48,238✔
1451

1452
  _select_plain = function(timeout)
1453
    local err
1454
    local now = gettime()
48,210✔
1455

1456
    -- remove any closed sockets to prevent select from hanging on them
1457
    if _closed[1] then
48,210✔
1458
      for i, skt in ipairs(_closed) do
55✔
1459
        _closed[i] = { _reading:remove(skt), _writing:remove(skt) }
84✔
1460
      end
1461
    end
1462

1463
    _readable_task._events, _writable_task._events, err = socket.select(_reading, _writing, timeout)
48,210✔
1464
    local r_events, w_events = _readable_task._events, _writable_task._events
48,210✔
1465

1466
    -- inject closed sockets in readable/writeable task so they can error out properly
1467
    if _closed[1] then
48,210✔
1468
      for i, skts in ipairs(_closed) do
55✔
1469
        _closed[i] = nil
28✔
1470
        r_events[#r_events+1] = skts[1]
28✔
1471
        w_events[#w_events+1] = skts[2]
28✔
1472
      end
1473
    end
1474

1475
    if duration(now, last_cleansing) > WATCH_DOG_TIMEOUT then
96,420✔
1476
      last_cleansing = now
27✔
1477

1478
      -- Check all sockets selected for reading, and check how long they have been waiting
1479
      -- for data already, without select returning them as readable
1480
      for skt,time in pairs(_reading_log) do
27✔
1481
        if not r_events[skt] and duration(now, time) > WATCH_DOG_TIMEOUT then
×
1482
          -- This one timedout while waiting to become readable, so move
1483
          -- it in the readable list and try and read anyway, despite not
1484
          -- having been returned by select
1485
          _reading_log[skt] = nil
×
1486
          r_events[#r_events + 1] = skt
×
1487
          r_events[skt] = #r_events
×
1488
        end
1489
      end
1490

1491
      -- Do the same for writing
1492
      for skt,time in pairs(_writing_log) do
27✔
1493
        if not w_events[skt] and duration(now, time) > WATCH_DOG_TIMEOUT then
×
1494
          _writing_log[skt] = nil
×
1495
          w_events[#w_events + 1] = skt
×
1496
          w_events[skt] = #w_events
×
1497
        end
1498
      end
1499
    end
1500

1501
    if err == "timeout" and #r_events + #w_events > 0 then
48,210✔
1502
      return nil
1✔
1503
    else
1504
      return err
48,209✔
1505
    end
1506
  end
1507
end
1508

1509

1510

1511
-------------------------------------------------------------------------------
1512
-- Dispatcher loop step.
1513
-- Listen to client requests and handles them
1514
-- Returns false if no socket-data was handled, or true if there was data
1515
-- handled (or nil + error message)
1516
-------------------------------------------------------------------------------
1517

1518
local copas_stats
1519
local min_ever, max_ever
1520

1521
local _select = _select_plain
28✔
1522

1523
-- instrumented version of _select() to collect stats
1524
local _select_instrumented = function(timeout)
1525
  if copas_stats then
×
1526
    local step_duration = gettime() - copas_stats.step_start
×
1527
    copas_stats.duration_max = math.max(copas_stats.duration_max, step_duration)
×
1528
    copas_stats.duration_min = math.min(copas_stats.duration_min, step_duration)
×
1529
    copas_stats.duration_tot = copas_stats.duration_tot + step_duration
×
1530
    copas_stats.steps = copas_stats.steps + 1
×
1531
  else
1532
    copas_stats = {
×
1533
      duration_max = -1,
1534
      duration_min = 999999,
1535
      duration_tot = 0,
1536
      steps = 0,
1537
    }
1538
  end
1539

1540
  local err = _select_plain(timeout)
×
1541

1542
  local now = gettime()
×
1543
  copas_stats.time_start = copas_stats.time_start or now
×
1544
  copas_stats.step_start = now
×
1545

1546
  return err
×
1547
end
1548

1549

1550
function copas.step(timeout)
28✔
1551
  -- Need to wake up the select call in time for the next sleeping event
1552
  if not _resumable:done() then
96,420✔
1553
    timeout = 0
47,381✔
1554
  else
1555
    timeout = math.min(_sleeping:getnext(), timeout or math.huge)
1,658✔
1556
  end
1557

1558
  local err = _select(timeout)
48,210✔
1559

1560
  for _, tsk in ipairs(_tasks) do
241,043✔
1561
    tsk:step()
192,835✔
1562
  end
1563

1564
  if err then
48,208✔
1565
    if err == "timeout" then
48,067✔
1566
      if timeout + 0.01 > TIMEOUT_PRECISION and math.random(100) > 90 then
48,067✔
1567
        -- we were idle, so occasionally do a GC sweep to ensure lingering
1568
        -- sockets are closed, and we don't accidentally block the loop from
1569
        -- exiting
1570
        collectgarbage()
57✔
1571
      end
1572
      return false
48,067✔
1573
    end
1574
    return nil, err
×
1575
  end
1576

1577
  return true
141✔
1578
end
1579

1580

1581
-------------------------------------------------------------------------------
1582
-- Check whether there is something to do.
1583
-- returns false if there are no sockets for read/write nor tasks scheduled
1584
-- (which means Copas is in an empty spin)
1585
-------------------------------------------------------------------------------
1586
function copas.finished()
28✔
1587
  return #_reading == 0 and #_writing == 0 and _resumable:done() and _sleeping:done(copas.gettimeouts())
49,285✔
1588
end
1589

1590
local _getstats do
28✔
1591
  local _getstats_instrumented, _getstats_plain
1592

1593

1594
  function _getstats_plain(enable)
28✔
1595
    -- this function gets hit if turned off, so turn on if true
1596
    if enable == true then
×
1597
      _select = _select_instrumented
×
1598
      _getstats = _getstats_instrumented
×
1599
      -- reset stats
1600
      min_ever = nil
×
1601
      max_ever = nil
×
1602
      copas_stats = nil
×
1603
    end
1604
    return {}
×
1605
  end
1606

1607

1608
  -- convert from seconds to millisecs, with microsec precision
1609
  local function useconds(t)
1610
    return math.floor((t * 1000000) + 0.5) / 1000
×
1611
  end
1612
  -- convert from seconds to seconds, with millisec precision
1613
  local function mseconds(t)
1614
    return math.floor((t * 1000) + 0.5) / 1000
×
1615
  end
1616

1617

1618
  function _getstats_instrumented(enable)
28✔
1619
    if enable == false then
×
1620
      _select = _select_plain
×
1621
      _getstats = _getstats_plain
×
1622
      -- instrumentation disabled, so switch to the plain implementation
1623
      return _getstats(enable)
×
1624
    end
1625
    if (not copas_stats) or (copas_stats.step == 0) then
×
1626
      return {}
×
1627
    end
1628
    local stats = copas_stats
×
1629
    copas_stats = nil
×
1630
    min_ever = math.min(min_ever or 9999999, stats.duration_min)
×
1631
    max_ever = math.max(max_ever or 0, stats.duration_max)
×
1632
    stats.duration_min_ever = min_ever
×
1633
    stats.duration_max_ever = max_ever
×
1634
    stats.duration_avg = stats.duration_tot / stats.steps
×
1635
    stats.step_start = nil
×
1636
    stats.time_end = gettime()
×
1637
    stats.time_tot = stats.time_end - stats.time_start
×
1638
    stats.time_avg = stats.time_tot / stats.steps
×
1639

1640
    stats.duration_avg = useconds(stats.duration_avg)
×
1641
    stats.duration_max = useconds(stats.duration_max)
×
1642
    stats.duration_max_ever = useconds(stats.duration_max_ever)
×
1643
    stats.duration_min = useconds(stats.duration_min)
×
1644
    stats.duration_min_ever = useconds(stats.duration_min_ever)
×
1645
    stats.duration_tot = useconds(stats.duration_tot)
×
1646
    stats.time_avg = useconds(stats.time_avg)
×
1647
    stats.time_start = mseconds(stats.time_start)
×
1648
    stats.time_end = mseconds(stats.time_end)
×
1649
    stats.time_tot = mseconds(stats.time_tot)
×
1650
    return stats
×
1651
  end
1652

1653
  _getstats = _getstats_plain
28✔
1654
end
1655

1656

1657
function copas.status(enable_stats)
28✔
1658
  local res = _getstats(enable_stats)
×
1659
  res.running = not not copas.running
×
1660
  res.timeout = copas.gettimeouts()
×
1661
  res.timer, res.inactive = _sleeping:status()
×
1662
  res.read = #_reading
×
1663
  res.write = #_writing
×
1664
  res.active = _resumable:count()
×
1665
  return res
×
1666
end
1667

1668

1669
-------------------------------------------------------------------------------
1670
-- Dispatcher endless loop.
1671
-- Listen to client requests and handles them forever
1672
-------------------------------------------------------------------------------
1673
function copas.loop(initializer, timeout)
28✔
1674
  if type(initializer) == "function" then
42✔
1675
    copas.addnamedthread("copas_initializer", initializer)
28✔
1676
  else
1677
    timeout = initializer or timeout
28✔
1678
  end
1679

1680
  copas.running = true
42✔
1681
  while not copas.finished() do copas.step(timeout) end
144,708✔
1682
  copas.running = false
40✔
1683
end
1684

1685

1686
-------------------------------------------------------------------------------
1687
-- Naming sockets and coroutines.
1688
-------------------------------------------------------------------------------
1689
do
1690
  local function realsocket(skt)
1691
    local mt = getmetatable(skt)
15✔
1692
    if mt == _skt_mt_tcp or mt == _skt_mt_udp then
15✔
1693
      return skt.socket
15✔
1694
    else
1695
      return skt
×
1696
    end
1697
  end
1698

1699

1700
  function copas.setsocketname(name, skt)
28✔
1701
    assert(type(name) == "string", "expected arg #1 to be a string")
15✔
1702
    skt = assert(realsocket(skt), "expected arg #2 to be a socket")
30✔
1703
    object_names[skt] = name
15✔
1704
  end
1705

1706

1707
  function copas.getsocketname(skt)
28✔
1708
    skt = assert(realsocket(skt), "expected arg #1 to be a socket")
×
1709
    return object_names[skt]
×
1710
  end
1711
end
1712

1713

1714
function copas.setthreadname(name, coro)
28✔
1715
  assert(type(name) == "string", "expected arg #1 to be a string")
10✔
1716
  coro = coro or coroutine_running()
10✔
1717
  assert(type(coro) == "thread", "expected arg #2 to be a coroutine or nil")
10✔
1718
  object_names[coro] = name
10✔
1719
end
1720

1721

1722
function copas.getthreadname(coro)
28✔
1723
  coro = coro or coroutine_running()
5✔
1724
  assert(type(coro) == "thread", "expected arg #1 to be a coroutine or nil")
5✔
1725
  return object_names[coro]
7✔
1726
end
1727

1728
-------------------------------------------------------------------------------
1729
-- Debug functionality.
1730
-------------------------------------------------------------------------------
1731
do
1732
  copas.debug = {}
28✔
1733

1734
  local log_core    -- if truthy, the core-timer will also be logged
1735
  local debug_log   -- function used as logger
1736

1737

1738
  local debug_yield = function(skt, queue)
1739
    local name = object_names[coroutine_running()]
1,613✔
1740

1741
    if log_core or name ~= "copas_core_timer" then
1,613✔
1742
      if queue == _sleeping then
1,610✔
1743
        debug_log("yielding '", name, "' to SLEEP for ", skt," seconds")
1,605✔
1744

1745
      elseif queue == _writing then
5✔
1746
        debug_log("yielding '", name, "' to WRITE on '", object_names[skt], "'")
2✔
1747

1748
      elseif queue == _reading then
4✔
1749
        debug_log("yielding '", name, "' to READ on '", object_names[skt], "'")
5✔
1750

1751
      else
1752
        debug_log("thread '", name, "' yielding to unexpected queue; ", tostring(queue), " (", type(queue), ")", debug.traceback())
×
1753
      end
1754
    end
1755

1756
    return coroutine.yield(skt, queue)
1,613✔
1757
  end
1758

1759

1760
  local debug_resume = function(coro, skt, ...)
1761
    local name = object_names[coro]
1,615✔
1762

1763
    if skt then
1,615✔
1764
      debug_log("resuming '", name, "' for socket '", object_names[skt], "'")
5✔
1765
    else
1766
      if log_core or name ~= "copas_core_timer" then
1,610✔
1767
        debug_log("resuming '", name, "'")
1,607✔
1768
      end
1769
    end
1770
    return coroutine.resume(coro, skt, ...)
1,615✔
1771
  end
1772

1773

1774
  local debug_create = function(f)
1775
    local f_wrapped = function(...)
1776
      local results = pack(f(...))
4✔
1777
      debug_log("exiting '", object_names[coroutine_running()], "'")
2✔
1778
      return unpack(results)
2✔
1779
    end
1780

1781
    return coroutine.create(f_wrapped)
2✔
1782
  end
1783

1784

1785
  debug_log = fnil
28✔
1786

1787

1788
  -- enables debug output for all coroutine operations.
1789
  function copas.debug.start(logger, core)
56✔
1790
    log_core = core
1✔
1791
    debug_log = logger or print
1✔
1792
    coroutine_yield = debug_yield
1✔
1793
    coroutine_resume = debug_resume
1✔
1794
    coroutine_create = debug_create
1✔
1795
  end
1796

1797

1798
  -- disables debug output for coroutine operations.
1799
  function copas.debug.stop()
56✔
1800
    debug_log = fnil
×
1801
    coroutine_yield = coroutine.yield
×
1802
    coroutine_resume = coroutine.resume
×
1803
    coroutine_create = coroutine.create
×
1804
  end
1805

1806
  do
1807
    local call_id = 0
28✔
1808

1809
    -- Description table of socket functions for debug output.
1810
    -- each socket function name has TWO entries;
1811
    -- 'name_in' and 'name_out', each being an array of names/descriptions of respectively
1812
    -- input parameters and return values.
1813
    -- If either table has a 'callback' key, then that is a function that will be called
1814
    -- with the parameters/return-values for further inspection.
1815
    local args = {
28✔
1816
      settimeout_in = {
28✔
1817
        "socket ",
1818
        "seconds",
1819
        "mode   ",
1820
      },
28✔
1821
      settimeout_out = {
28✔
1822
        "success",
1823
        "error  ",
1824
      },
28✔
1825
      connect_in = {
28✔
1826
        "socket ",
1827
        "address",
1828
        "port   ",
1829
      },
28✔
1830
      connect_out = {
28✔
1831
        "success",
1832
        "error  ",
1833
      },
28✔
1834
      getfd_in = {
28✔
1835
        "socket ",
1836
        -- callback = function(...)
1837
        --   print(debug.traceback("called from:", 4))
1838
        -- end,
1839
      },
28✔
1840
      getfd_out = {
28✔
1841
        "fd",
1842
      },
28✔
1843
      send_in = {
28✔
1844
        "socket   ",
1845
        "data     ",
1846
        "idx-start",
1847
        "idx-end  ",
1848
      },
28✔
1849
      send_out = {
28✔
1850
        "last-idx-send    ",
1851
        "error            ",
1852
        "err-last-idx-send",
1853
      },
28✔
1854
      receive_in = {
28✔
1855
        "socket ",
1856
        "pattern",
1857
        "prefix ",
1858
      },
28✔
1859
      receive_out = {
28✔
1860
        "received    ",
1861
        "error       ",
1862
        "partial data",
1863
      },
28✔
1864
      dirty_in = {
28✔
1865
        "socket",
1866
        -- callback = function(...)
1867
        --   print(debug.traceback("called from:", 4))
1868
        -- end,
1869
      },
28✔
1870
      dirty_out = {
28✔
1871
        "data in read-buffer",
1872
      },
28✔
1873
      close_in = {
28✔
1874
        "socket",
1875
        -- callback = function(...)
1876
        --   print(debug.traceback("called from:", 4))
1877
        -- end,
1878
      },
28✔
1879
      close_out = {
28✔
1880
        "success",
1881
        "error",
1882
      },
28✔
1883
    }
1884
    local function print_call(func, msg, ...)
1885
      print(msg)
262✔
1886
      local arg = pack(...)
262✔
1887
      local desc = args[func] or {}
262✔
1888
      for i = 1, math.max(arg.n, #desc) do
536✔
1889
        local value = arg[i]
274✔
1890
        if type(value) == "string" then
274✔
1891
          local xvalue = value:sub(1,30)
3✔
1892
          if xvalue ~= value then
3✔
1893
            xvalue = xvalue .."(...truncated)"
×
1894
          end
1895
          print("\t"..(desc[i] or i)..": '"..tostring(xvalue).."' ("..type(value).." #"..#value..")")
3✔
1896
        else
1897
          print("\t"..(desc[i] or i)..": '"..tostring(value).."' ("..type(value)..")")
271✔
1898
        end
1899
      end
1900
      if desc.callback then
262✔
1901
        desc.callback(...)
×
1902
      end
1903
    end
1904

1905
    local debug_mt = {
28✔
1906
      __index = function(self, key)
1907
        local value = self.__original_socket[key]
131✔
1908
        if type(value) ~= "function" then
131✔
1909
          return value
×
1910
        end
1911
        return function(self2, ...)
1912
            local my_id = call_id + 1
131✔
1913
            call_id = my_id
131✔
1914
            local results
1915

1916
            if self2 ~= self then
131✔
1917
              -- there is no self
1918
              print_call(tostring(key).."_in", my_id .. "-calling '"..tostring(key) .. "' with; ", self, ...)
×
1919
              results = pack(value(self, ...))
×
1920
            else
1921
              print_call(tostring(key).."_in", my_id .. "-calling '" .. tostring(key) .. "' with; ", self.__original_socket, ...)
131✔
1922
              results = pack(value(self.__original_socket, ...))
262✔
1923
            end
1924
            print_call(tostring(key).."_out", my_id .. "-results '"..tostring(key) .. "' returned; ", unpack(results))
262✔
1925
            return unpack(results)
131✔
1926
          end
1927
      end,
1928
      __tostring = function(self)
1929
        return tostring(self.__original_socket)
4✔
1930
      end
1931
    }
1932

1933

1934
    -- wraps a socket (copas or luasocket) in a debug version printing all calls
1935
    -- and their parameters/return values. Extremely noisy!
1936
    -- returns the wrapped socket.
1937
    -- NOTE: only for plain sockets, will not support TLS
1938
    function copas.debug.socket(original_skt)
56✔
1939
      if (getmetatable(original_skt) == _skt_mt_tcp) or (getmetatable(original_skt) == _skt_mt_udp) then
1✔
1940
        -- already wrapped as Copas socket, so recurse with the original luasocket one
1941
        original_skt.socket = copas.debug.socket(original_skt.socket)
×
1942
        return original_skt
×
1943
      end
1944

1945
      local proxy = setmetatable({
2✔
1946
        __original_socket = original_skt
1✔
1947
      }, debug_mt)
1✔
1948

1949
      return proxy
1✔
1950
    end
1951
  end
1952
end
1953

1954

1955
return copas
28✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc