• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarmodules / copas / 24182952570

09 Apr 2026 09:28AM UTC coverage: 85.118%. Remained the same
24182952570

push

github

web-flow
Merge adc890a32 into 0dd9640ae

4 of 20 new or added lines in 1 file covered. (20.0%)

140 existing lines in 1 file now uncovered.

1407 of 1653 relevant lines covered (85.12%)

75881.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.16
/src/copas.lua
1
-------------------------------------------------------------------------------
2
-- Copas - Coroutine Oriented Portable Asynchronous Services
3
--
4
-- A dispatcher based on coroutines that can be used by TCP/IP servers.
5
-- Uses LuaSocket as the interface with the TCP/IP stack.
6
--
7
-- Authors: Andre Carregal, Javier Guerra, and Fabio Mascarenhas
8
-- Contributors: Diego Nehab, Mike Pall, David Burgess, Leonardo Godinho,
9
--               Thomas Harning Jr., and Gary NG
10
--
11
-- Copyright 2005-2013 - Kepler Project (www.keplerproject.org), 2015-2025 Thijs Schreijer
12
--
13
-- $Id: copas.lua,v 1.37 2009/04/07 22:09:52 carregal Exp $
14
-------------------------------------------------------------------------------
15

16
if package.loaded["socket.http"] and (_VERSION=="Lua 5.1") then     -- obsolete: only for Lua 5.1 compatibility
218✔
17
  error("you must require copas before require'ing socket.http")
×
18
end
19
if package.loaded["copas.http"] and (_VERSION=="Lua 5.1") then     -- obsolete: only for Lua 5.1 compatibility
218✔
20
  error("you must require copas before require'ing copas.http")
×
21
end
22

23
-- load either LuaSocket, or LuaSystem
24
-- note: with luasocket we don't use 'sleep' but 'select' with no sockets
25
local socket, system do
218✔
26
  if pcall(require, "socket") then
220✔
27
    -- found LuaSocket
28
    socket = require "socket"
211✔
29
  end
30

31
  -- try LuaSystem as fallback
32
  if pcall(require, "system") then
220✔
33
    system = require "system"
220✔
34
  end
35

36
  if not (socket or system) then
218✔
37
    error("Neither LuaSocket nor LuaSystem found, Copas requires at least one of them")
×
38
  end
39
end
40

41
local binaryheap = require "binaryheap"
218✔
42
local gettime = (socket or system).gettime
218✔
43
local block_sleep = (socket or system).sleep
218✔
44
local ssl -- only loaded upon demand
45

46
local core_timer_thread
47
local WATCH_DOG_TIMEOUT = 120
218✔
48
local UDP_DATAGRAM_MAX = (socket or {})._DATAGRAMSIZE or 8192
218✔
49
local TIMEOUT_PRECISION = 0.1  -- 100ms
218✔
50
local fnil = function() end
286,788✔
51

52

53
local coroutine_create = coroutine.create
218✔
54
local coroutine_running = coroutine.running
218✔
55
local coroutine_yield = coroutine.yield
218✔
56
local coroutine_resume = coroutine.resume
218✔
57
local coroutine_status = coroutine.status
218✔
58

59

60
-- nil-safe versions for pack/unpack
61
local _unpack = unpack or table.unpack
218✔
62
local unpack = function(t, i, j) return _unpack(t, i or 1, j or t.n or #t) end
930✔
63
local pack = function(...) return { n = select("#", ...), ...} end
1,279✔
64

65

66
local pcall = pcall
218✔
67
if _VERSION=="Lua 5.1" and not jit then     -- obsolete: only for Lua 5.1 compatibility
218✔
UNCOV
68
  pcall = require("coxpcall").pcall
30✔
69
  coroutine_running = require("coxpcall").running
30✔
70
end
71

72

73
if socket then
218✔
74
  -- Redefines LuaSocket functions with coroutine safe versions (pure Lua)
75
  -- (this allows the use of socket.http from within copas)
76
  local err_mt = {
211✔
77
    __tostring = function (self)
UNCOV
78
      return "Copas 'try' error intermediate table: '"..tostring(self[1].."'")
×
79
    end,
80
  }
81

82
  local function statusHandler(status, ...)
83
    if status then return ... end
98✔
84
    local err = (...)
35✔
85
    if type(err) == "table" and getmetatable(err) == err_mt then
35✔
86
      return nil, err[1]
35✔
87
    else
UNCOV
88
      error(err)
×
89
    end
90
  end
91

92
  function socket.protect(func)
211✔
93
    return function (...)
94
            return statusHandler(pcall(func, ...))
126✔
95
          end
96
  end
97

98
  function socket.newtry(finalizer)
211✔
99
    return function (...)
100
            local status = (...)
1,610✔
101
            if not status then
1,610✔
102
              pcall(finalizer or fnil, select(2, ...))
35✔
103
              error(setmetatable({ (select(2, ...)) }, err_mt), 0)
35✔
104
            end
105
            return ...
1,575✔
106
          end
107
  end
108

109
  socket.try = socket.newtry()
269✔
110
end
111

112

113
-- Setup the Copas meta table to auto-load submodules and define a default method
114
local copas do
218✔
115
  local submodules = { "ftp", "future", "http", "lock", "queue", "semaphore", "smtp", "timer" }
218✔
116
  for i, key in ipairs(submodules) do
1,962✔
117
    submodules[key] = true
1,744✔
118
    submodules[i] = nil
1,744✔
119
  end
120

121
  copas = setmetatable({},{
436✔
122
    __index = function(self, key)
123
      if submodules[key] then
288✔
124
        self[key] = require("copas."..key)
292✔
125
        submodules[key] = nil
288✔
126
        return rawget(self, key)
288✔
127
      end
128
    end,
129
    __call = function(self, ...)
130
      return self.loop(...)
7✔
131
    end,
132
  })
218✔
133
end
134

135

136
-- Meta information is public even if beginning with an "_"
137
copas._COPYRIGHT   = "Copyright (C) 2005-2013 Kepler Project, 2015-2026 Thijs Schreijer"
218✔
138
copas._DESCRIPTION = "Coroutine Oriented Portable Asynchronous Services"
218✔
139
copas._VERSION     = "Copas 4.10.0"
218✔
140

141
-- Close the socket associated with the current connection after the handler finishes
142
copas.autoclose = true
218✔
143

144
-- indicator for the loop running
145
copas.running = false
218✔
146

147
-- gettime method from either LuaSocket or LuaSystem: time in (fractional) seconds, since epoch.
148
copas.gettime = gettime
218✔
149

150
-------------------------------------------------------------------------------
151
-- Object names, to track names of thread/coroutines and sockets
152
-------------------------------------------------------------------------------
153
local object_names = setmetatable({}, {
436✔
154
  __mode = "k",
158✔
155
  __index = function(self, key)
156
    local name = tostring(key)
217✔
157
    if key ~= nil then
217✔
158
      rawset(self, key, name)
217✔
159
    end
160
    return name
217✔
161
  end
162
})
163

164
-------------------------------------------------------------------------------
165
-- Simple set implementation
166
-- adds a FIFO queue for each socket in the set
167
-------------------------------------------------------------------------------
168

169
local function newsocketset()
170
  local set = {}
654✔
171

172
  do  -- set implementation
173
    local reverse = {}
654✔
174

175
    -- Adds a socket to the set, does nothing if it exists
176
    -- @return skt if added, or nil if it existed
177
    function set:insert(skt)
654✔
178
      if not reverse[skt] then
1,498✔
179
        self[#self + 1] = skt
1,498✔
180
        reverse[skt] = #self
1,498✔
181
        return skt
1,498✔
182
      end
183
    end
184

185
    -- Removes socket from the set, does nothing if not found
186
    -- @return skt if removed, or nil if it wasn't in the set
187
    function set:remove(skt)
654✔
188
      local index = reverse[skt]
2,093✔
189
      if index then
2,093✔
190
        reverse[skt] = nil
1,491✔
191
        local top = self[#self]
1,491✔
192
        self[#self] = nil
1,491✔
193
        if top ~= skt then
1,491✔
194
          reverse[top] = index
199✔
195
          self[index] = top
199✔
196
        end
197
        return skt
1,491✔
198
      end
199
    end
200

201
  end
202

203
  do  -- queues implementation
204
    local fifo_queues = setmetatable({},{
1,308✔
205
      __mode = "k",                 -- auto collect queue if socket is gone
474✔
206
      __index = function(self, skt) -- auto create fifo queue if not found
207
        local newfifo = {}
588✔
208
        self[skt] = newfifo
588✔
209
        return newfifo
588✔
210
      end,
211
    })
212

213
    -- pushes an item in the fifo queue for the socket.
214
    function set:push(skt, itm)
654✔
215
      local queue = fifo_queues[skt]
1,393✔
216
      queue[#queue + 1] = itm
1,393✔
217
    end
218

219
    -- pops an item from the fifo queue for the socket
220
    function set:pop(skt)
654✔
221
      local queue = fifo_queues[skt]
1,309✔
222
      return table.remove(queue, 1)
1,309✔
223
    end
224

225
  end
226

227
  return set
654✔
228
end
229

230

231

232
-- Threads immediately resumable
233
local _resumable = {} do
218✔
234
  local resumelist = {}
218✔
235

236
  function _resumable:push(co)
218✔
237
    resumelist[#resumelist + 1] = co
286,398✔
238
  end
239

240
  function _resumable:clear_resumelist()
218✔
241
    local lst = resumelist
275,542✔
242
    resumelist = {}
275,542✔
243
    return lst
275,542✔
244
  end
245

246
  function _resumable:done()
218✔
247
    return resumelist[1] == nil
279,481✔
248
  end
249

250
  function _resumable:count()
218✔
UNCOV
251
    return #resumelist + #_resumable
×
252
  end
253

254
end
255

256

257

258
-- Similar to the socket set above, but tailored for the use of
259
-- sleeping threads
260
local _sleeping = {} do
218✔
261

262
  local heap = binaryheap.minUnique()
218✔
263
  local lethargy = setmetatable({}, { __mode = "k" }) -- list of coroutines sleeping without a wakeup time
218✔
264

265

266
  -- Required base implementation
267
  -----------------------------------------
268
  _sleeping.insert = fnil
218✔
269
  _sleeping.remove = fnil
218✔
270

271
  -- push a new timer on the heap
272
  function _sleeping:push(sleeptime, co)
218✔
273
    if sleeptime < 0 then
286,563✔
274
      lethargy[co] = true
3,759✔
275
    elseif sleeptime == 0 then
282,804✔
276
      _resumable:push(co)
394,543✔
277
    else
278
      heap:insert(gettime() + sleeptime, co)
8,513✔
279
    end
280
  end
281

282
  -- find the thread that should wake up to the time, if any
283
  function _sleeping:pop(time)
218✔
284
    if time < (heap:peekValue() or math.huge) then
406,716✔
285
      return
275,542✔
286
    end
287
    return heap:pop()
8,271✔
288
  end
289

290
  -- additional methods for time management
291
  -----------------------------------------
292
  function _sleeping:getnext()  -- returns delay until next sleep expires, or nil if there is none
218✔
293
    local t = heap:peekValue()
6,929✔
294
    if t then
6,929✔
295
      -- never report less than 0, because select() might block
296
      return math.max(t - gettime(), 0)
6,929✔
297
    end
298
  end
299

300
  function _sleeping:wakeup(co)
218✔
301
    if lethargy[co] then
3,773✔
302
      lethargy[co] = nil
3,738✔
303
      _resumable:push(co)
3,738✔
304
      return
3,738✔
305
    end
306
    if heap:remove(co) then
45✔
307
      _resumable:push(co)
14✔
308
    end
309
  end
310

311
  function _sleeping:cancel(co)
218✔
312
    lethargy[co] = nil
63✔
313
    heap:remove(co)
63✔
314
  end
315

316
  function _sleeping:cancelall()
218✔
NEW
317
    while heap:size() > 0 do heap:pop() end
×
NEW
318
    heap:insert(gettime() + TIMEOUT_PRECISION, core_timer_thread)
×
319
    -- lethargy is weak; copas's idle GC sweeps will clean it within a few steps
320
  end
321

322
  -- @param tos number of timeouts running
323
  function _sleeping:done(tos)
218✔
324
    -- return true if we have nothing more to do
325
    -- the timeout task doesn't qualify as work (fallbacks only),
326
    -- the lethargy also doesn't qualify as work ('dead' tasks),
327
    -- but the combination of a timeout + a lethargy can be work
328
    return heap:size() == 1       -- 1 means only the timeout-timer task is running
4,007✔
329
           and not (tos > 0 and next(lethargy))
3,129✔
330
  end
331

332
  -- gets number of threads in binaryheap and lethargy
333
  function _sleeping:status()
218✔
UNCOV
334
    local c = 0
×
UNCOV
335
    for _ in pairs(lethargy) do c = c + 1 end
×
336

UNCOV
337
    return heap:size(), c
×
338
  end
339

340
end   -- _sleeping
341

342

343

344
-------------------------------------------------------------------------------
345
-- Tracking coroutines and sockets
346
-------------------------------------------------------------------------------
347

348
local _servers = newsocketset() -- servers being handled
218✔
349
local _threads = setmetatable({}, {__mode = "k"})  -- registered threads added with addthread()
218✔
350
local _canceled = setmetatable({}, {__mode = "k"}) -- threads that are canceled and pending removal
218✔
351
local _autoclose = setmetatable({}, {__mode = "kv"}) -- sockets (value) to close when a thread (key) exits
218✔
352
local _autoclose_r = setmetatable({}, {__mode = "kv"}) -- reverse: sockets (key) to close when a thread (value) exits
218✔
353

354

355
-- for each socket we log the last read and last write times to enable the
356
-- watchdog to follow up if it takes too long.
357
-- tables contain the time, indexed by the socket
358
local _reading_log = {}
218✔
359
local _writing_log = {}
218✔
360

361
local _closed = {} -- track sockets that have been closed (list/array)
218✔
362

363
local _reading = newsocketset() -- sockets currently being read
218✔
364
local _writing = newsocketset() -- sockets currently being written
218✔
365
local _isSocketTimeout = { -- set of errors indicating a socket-timeout
218✔
366
  ["timeout"] = true,      -- default LuaSocket timeout
158✔
367
  ["wantread"] = true,     -- LuaSec specific timeout
158✔
368
  ["wantwrite"] = true,    -- LuaSec specific timeout
158✔
369
}
370

371
-------------------------------------------------------------------------------
372
-- Coroutine based socket timeouts.
373
-------------------------------------------------------------------------------
374
local user_timeouts_connect
375
local user_timeouts_send
376
local user_timeouts_receive
377
do
378
  local timeout_mt = {
218✔
379
    __mode = "k",
158✔
380
    __index = function(self, skt)
381
      -- if there is no timeout found, we insert one automatically, to block forever
382
      self[skt] = math.huge
483✔
383
      return self[skt]
483✔
384
    end,
385
  }
386

387
  user_timeouts_connect = setmetatable({}, timeout_mt)
218✔
388
  user_timeouts_send = setmetatable({}, timeout_mt)
218✔
389
  user_timeouts_receive = setmetatable({}, timeout_mt)
218✔
390
end
391

392
local useSocketTimeoutErrors = setmetatable({},{ __mode = "k" })
218✔
393

394

395
-- sto = socket-time-out
396
local sto_timeout, sto_timed_out, sto_change_queue, sto_error do
218✔
397

398
  local socket_register = setmetatable({}, { __mode = "k" })    -- socket by coroutine
218✔
399
  local operation_register = setmetatable({}, { __mode = "k" }) -- operation "read"/"write" by coroutine
218✔
400
  local timeout_flags = setmetatable({}, { __mode = "k" })      -- true if timedout, by coroutine
218✔
401

402

403
  local function socket_callback(co)
404
    local skt = socket_register[co]
84✔
405
    local queue = operation_register[co]
84✔
406

407
    -- flag the timeout and resume the coroutine
408
    timeout_flags[co] = true
84✔
409
    _resumable:push(co)
84✔
410

411
    -- clear the socket from the current queue
412
    if queue == "read" then
84✔
413
      _reading:remove(skt)
90✔
414
    elseif queue == "write" then
14✔
415
      _writing:remove(skt)
18✔
416
    else
UNCOV
417
      error("bad queue name; expected 'read'/'write', got: "..tostring(queue))
×
418
    end
419
  end
420

421

422
  -- Sets a socket timeout.
423
  -- Calling it as `sto_timeout()` will cancel the timeout.
424
  -- @param queue (string) the queue the socket is currently in, must be either "read" or "write"
425
  -- @param skt (socket) the socket on which to operate
426
  -- @param use_connect_to (bool) timeout to use is determined based on queue (read/write) or if this
427
  -- is truthy, it is the connect timeout.
428
  -- @return true
429
  function sto_timeout(skt, queue, use_connect_to)
188✔
430
    local co = coroutine_running()
5,160,664✔
431
    socket_register[co] = skt
5,160,664✔
432
    operation_register[co] = queue
5,160,664✔
433
    timeout_flags[co] = nil
5,160,664✔
434
    if skt then
5,160,664✔
435
      local to = (use_connect_to and user_timeouts_connect[skt]) or
2,580,396✔
436
                 (queue == "read" and user_timeouts_receive[skt]) or
2,580,007✔
437
                 user_timeouts_send[skt]
17,759✔
438
      copas.timeout(to, socket_callback)
3,733,549✔
439
    else
440
      copas.timeout(0)
2,580,290✔
441
    end
442
    return true
5,160,664✔
443
  end
444

445

446
  -- Changes the timeout to a different queue (read/write).
447
  -- Only usefull with ssl-handshakes and "wantread", "wantwrite" errors, when
448
  -- the queue has to be changed, so the timeout handler knows where to find the socket.
449
  -- @param queue (string) the new queue the socket is in, must be either "read" or "write"
450
  -- @return true
451
  function sto_change_queue(queue)
188✔
452
    operation_register[coroutine_running()] = queue
1,127✔
453
    return true
1,127✔
454
  end
455

456

457
  -- Responds with `true` if the operation timed-out.
458
  function sto_timed_out()
188✔
459
    return timeout_flags[coroutine_running()]
1,477✔
460
  end
461

462

463
  -- Returns the proper timeout error
464
  function sto_error(err)
188✔
465
    return useSocketTimeoutErrors[coroutine_running()] and err or "timeout"
84✔
466
  end
467
end
468

469

470

471
-------------------------------------------------------------------------------
472
-- Coroutine based socket I/O functions.
473
-------------------------------------------------------------------------------
474

475
-- Returns "tcp"" for plain TCP and "ssl" for ssl-wrapped sockets, so truthy
476
-- for tcp based, and falsy for udp based.
477
local isTCP do
218✔
478
  local lookup = {
218✔
479
    tcp = "tcp",
158✔
480
    SSL = "ssl",
158✔
481
  }
482

483
  function isTCP(socket)
188✔
484
    return lookup[tostring(socket):sub(1,3)]
954✔
485
  end
486
end
487

488
function copas.close(skt, ...)
218✔
489
  _closed[#_closed+1] = skt
252✔
490
  return skt:close(...)
252✔
491
end
492

493

494

495
-- nil or negative is indefinitly
496
function copas.settimeout(skt, timeout)
218✔
497
  timeout = timeout or -1
245✔
498
  if type(timeout) ~= "number" then
245✔
499
    return nil, "timeout must be 'nil' or a number"
21✔
500
  end
501

502
  return copas.settimeouts(skt, timeout, timeout, timeout)
224✔
503
end
504

505
-- negative is indefinitly, nil means do not change
506
function copas.settimeouts(skt, connect, send, read)
218✔
507

508
  if connect ~= nil and type(connect) ~= "number" then
490✔
UNCOV
509
    return nil, "connect timeout must be 'nil' or a number"
×
510
  end
511
  if connect then
490✔
512
    if connect < 0 then
490✔
UNCOV
513
      connect = nil
×
514
    end
515
    user_timeouts_connect[skt] = connect
490✔
516
  end
517

518

519
  if send ~= nil and type(send) ~= "number" then
490✔
520
    return nil, "send timeout must be 'nil' or a number"
×
521
  end
522
  if send then
490✔
523
    if send < 0 then
490✔
UNCOV
524
      send = nil
×
525
    end
526
    user_timeouts_send[skt] = send
490✔
527
  end
528

529

530
  if read ~= nil and type(read) ~= "number" then
490✔
531
    return nil, "read timeout must be 'nil' or a number"
×
532
  end
533
  if read then
490✔
534
    if read < 0 then
490✔
UNCOV
535
      read = nil
×
536
    end
537
    user_timeouts_receive[skt] = read
490✔
538
  end
539

540

541
  return true
490✔
542
end
543

544
-- reads a pattern from a client and yields to the reading set on timeouts
545
-- UDP: a UDP socket expects a second argument to be a number, so it MUST
546
-- be provided as the 'pattern' below defaults to a string. Will throw a
547
-- 'bad argument' error if omitted.
548
function copas.receive(client, pattern, part)
218✔
549
  local s, err
550
  pattern = pattern or "*l"
2,562,202✔
551
  local current_log = _reading_log
2,562,202✔
552
  sto_timeout(client, "read")
2,562,202✔
553

554
  repeat
555
    s, err, part = client:receive(pattern, part)
2,562,908✔
556

557
    -- guarantees that high throughput doesn't take other threads to starvation
558
    if (math.random(100) > 90) then
2,562,908✔
559
      copas.pause()
256,876✔
560
    end
561

562
    if s then
2,562,908✔
563
      current_log[client] = nil
2,562,083✔
564
      sto_timeout()
2,562,083✔
565
      return s, err, part
2,562,083✔
566

567
    elseif not _isSocketTimeout[err] then
825✔
568
      current_log[client] = nil
56✔
569
      sto_timeout()
56✔
570
      return s, err, part
56✔
571

572
    elseif sto_timed_out() then
980✔
573
      current_log[client] = nil
63✔
574
      return nil, sto_error(err), part
81✔
575
    end
576

577
    if err == "wantwrite" then -- wantwrite may be returned during SSL renegotiations
706✔
UNCOV
578
      current_log = _writing_log
×
UNCOV
579
      current_log[client] = gettime()
×
UNCOV
580
      sto_change_queue("write")
×
UNCOV
581
      coroutine_yield(client, _writing)
×
582
    else
583
      current_log = _reading_log
706✔
584
      current_log[client] = gettime()
706✔
585
      sto_change_queue("read")
706✔
586
      coroutine_yield(client, _reading)
706✔
587
    end
588
  until false
706✔
589
end
590

591
-- receives data from a client over UDP. Not available for TCP.
592
-- (this is a copy of receive() method, adapted for receivefrom() use)
593
function copas.receivefrom(client, size)
218✔
594
  local s, err, port
595
  size = size or UDP_DATAGRAM_MAX
28✔
596
  sto_timeout(client, "read")
28✔
597

598
  repeat
599
    s, err, port = client:receivefrom(size) -- upon success err holds ip address
56✔
600

601
    -- garantees that high throughput doesn't take other threads to starvation
602
    if (math.random(100) > 90) then
56✔
603
      copas.pause()
5✔
604
    end
605

606
    if s then
56✔
607
      _reading_log[client] = nil
21✔
608
      sto_timeout()
21✔
609
      return s, err, port
21✔
610

611
    elseif err ~= "timeout" then
35✔
UNCOV
612
      _reading_log[client] = nil
×
UNCOV
613
      sto_timeout()
×
UNCOV
614
      return s, err, port
×
615

616
    elseif sto_timed_out() then
45✔
617
      _reading_log[client] = nil
7✔
618
      return nil, sto_error(err), port
9✔
619
    end
620

621
    _reading_log[client] = gettime()
28✔
622
    coroutine_yield(client, _reading)
28✔
623
  until false
28✔
624
end
625

626
-- same as above but with special treatment when reading chunks,
627
-- unblocks on any data received.
628
function copas.receivepartial(client, pattern, part)
218✔
629
  local s, err
630
  pattern = pattern or "*l"
14✔
631
  local orig_size = #(part or "")
14✔
632
  local current_log = _reading_log
14✔
633
  sto_timeout(client, "read")
14✔
634

635
  repeat
636
    s, err, part = client:receive(pattern, part)
21✔
637

638
    -- guarantees that high throughput doesn't take other threads to starvation
639
    if (math.random(100) > 90) then
21✔
UNCOV
640
      copas.pause()
3✔
641
    end
642

643
    if s or (type(part) == "string" and #part > orig_size) then
21✔
644
      current_log[client] = nil
14✔
645
      sto_timeout()
14✔
646
      return s, err, part
14✔
647

648
    elseif not _isSocketTimeout[err] then
7✔
UNCOV
649
      current_log[client] = nil
×
UNCOV
650
      sto_timeout()
×
UNCOV
651
      return s, err, part
×
652

653
    elseif sto_timed_out() then
9✔
UNCOV
654
      current_log[client] = nil
×
UNCOV
655
      return nil, sto_error(err), part
×
656
    end
657

658
    if err == "wantwrite" then
7✔
UNCOV
659
      current_log = _writing_log
×
UNCOV
660
      current_log[client] = gettime()
×
661
      sto_change_queue("write")
×
662
      coroutine_yield(client, _writing)
×
663
    else
664
      current_log = _reading_log
7✔
665
      current_log[client] = gettime()
7✔
666
      sto_change_queue("read")
7✔
667
      coroutine_yield(client, _reading)
7✔
668
    end
669
  until false
7✔
670
end
671
copas.receivePartial = copas.receivepartial  -- compat: receivePartial is deprecated
218✔
672

673
-- sends data to a client. The operation is buffered and
674
-- yields to the writing set on timeouts
675
-- Note: from and to parameters will be ignored by/for UDP sockets
676
function copas.send(client, data, from, to)
218✔
677
  local s, err
678
  from = from or 1
17,759✔
679
  local lastIndex = from - 1
17,759✔
680
  local current_log = _writing_log
17,759✔
681
  sto_timeout(client, "write")
17,759✔
682

683
  repeat
684
    s, err, lastIndex = client:send(data, lastIndex + 1, to)
17,955✔
685

686
    -- guarantees that high throughput doesn't take other threads to starvation
687
    if (math.random(100) > 90) then
17,955✔
688
      copas.pause()
1,725✔
689
    end
690

691
    if s then
17,955✔
692
      current_log[client] = nil
17,731✔
693
      sto_timeout()
17,731✔
694
      return s, err, lastIndex
17,731✔
695

696
    elseif not _isSocketTimeout[err] then
224✔
697
      current_log[client] = nil
28✔
698
      sto_timeout()
28✔
699
      return s, err, lastIndex
28✔
700

701
    elseif sto_timed_out() then
252✔
UNCOV
702
      current_log[client] = nil
×
UNCOV
703
      return nil, sto_error(err), lastIndex
×
704
    end
705

706
    if err == "wantread" then
196✔
UNCOV
707
      current_log = _reading_log
×
UNCOV
708
      current_log[client] = gettime()
×
709
      sto_change_queue("read")
×
710
      coroutine_yield(client, _reading)
×
711
    else
712
      current_log = _writing_log
196✔
713
      current_log[client] = gettime()
196✔
714
      sto_change_queue("write")
196✔
715
      coroutine_yield(client, _writing)
196✔
716
    end
717
  until false
196✔
718
end
719

720
function copas.sendto(client, data, ip, port)
218✔
721
  -- deprecated; for backward compatibility only, since UDP doesn't block on sending
UNCOV
722
  return client:sendto(data, ip, port)
×
723
end
724

725
-- waits until connection is completed
726
function copas.connect(skt, host, port)
218✔
727
  skt:settimeout(0)
249✔
728
  local ret, err, tried_more_than_once
729
  sto_timeout(skt, "write", true)
245✔
730

731
  repeat
732
    ret, err = skt:connect(host, port)
499✔
733

734
    -- non-blocking connect on Windows results in error "Operation already
735
    -- in progress" to indicate that it is completing the request async. So essentially
736
    -- it is the same as "timeout"
737
    if ret or (err ~= "timeout" and err ~= "Operation already in progress") then
483✔
738
      _writing_log[skt] = nil
231✔
739
      sto_timeout()
231✔
740
      -- Once the async connect completes, Windows returns the error "already connected"
741
      -- to indicate it is done, so that error should be ignored. Except when it is the
742
      -- first call to connect, then it was already connected to something else and the
743
      -- error should be returned
744
      if (not ret) and (err == "already connected" and tried_more_than_once) then
231✔
UNCOV
745
        return 1
×
746
      end
747
      return ret, err
231✔
748

749
    elseif sto_timed_out() then
324✔
750
      _writing_log[skt] = nil
14✔
751
      return nil, sto_error(err)
18✔
752
    end
753

754
    tried_more_than_once = tried_more_than_once or true
238✔
755
    _writing_log[skt] = gettime()
238✔
756
    coroutine_yield(skt, _writing)
238✔
757
  until false
238✔
758
end
759

760

761
-- Wraps a tcp socket in an ssl socket and configures it. If the socket was
762
-- already wrapped, it does nothing and returns the socket.
763
-- @param wrap_params the parameters for the ssl-context
764
-- @return wrapped socket, or throws an error
765
local function ssl_wrap(skt, wrap_params)
766
  if isTCP(skt) == "ssl" then return skt end -- was already wrapped
288✔
767
  if not wrap_params then
126✔
UNCOV
768
    error("cannot wrap socket into a secure socket (using 'ssl.wrap()') without parameters/context")
×
769
  end
770

771
  ssl = ssl or require("ssl")
126✔
772
  local nskt = assert(ssl.wrap(skt, wrap_params)) -- assert, because we do not want to silently ignore this one!!
162✔
773

774
  nskt:settimeout(0)  -- non-blocking on the ssl-socket
126✔
775
  copas.settimeouts(nskt, user_timeouts_connect[skt],
252✔
776
    user_timeouts_send[skt], user_timeouts_receive[skt]) -- copy copas user-timeout to newly wrapped one
134✔
777

778
  local co = _autoclose_r[skt]
126✔
779
  if co then
126✔
780
    -- socket registered for autoclose, move registration to wrapped one
781
    _autoclose[co] = nskt
28✔
782
    _autoclose_r[skt] = nil
28✔
783
    _autoclose_r[nskt] = co
28✔
784
  end
785

786
  local sock_name = object_names[skt]
126✔
787
  if sock_name ~= tostring(skt) then
126✔
788
    -- socket had a custom name, so copy it over
789
    object_names[nskt] = sock_name
42✔
790
  end
791
  return nskt
126✔
792
end
793

794

795
-- For each luasec method we have a subtable, allows for future extension.
796
-- Required structure:
797
-- {
798
--   wrap = ... -- parameter to 'wrap()'; the ssl parameter table, or the context object
799
--   sni = {                  -- parameters to 'sni()'
800
--     names = string | table -- 1st parameter
801
--     strict = bool          -- 2nd parameter
802
--   }
803
-- }
804
local function normalize_sslt(sslt)
805
  local t = type(sslt)
385✔
806
  local r = setmetatable({}, {
770✔
807
    __index = function(self, key)
808
      -- a bug if this happens, here as a sanity check, just being careful since
809
      -- this is security stuff
UNCOV
810
      error("accessing unknown 'ssl_params' table key: "..tostring(key))
×
811
    end,
812
  })
813
  if t == "nil" then
385✔
814
    r.wrap = false
259✔
815
    r.sni = false
259✔
816

817
  elseif t == "table" then
126✔
818
    if sslt.mode or sslt.protocol then
126✔
819
      -- has the mandatory fields for the ssl-params table for handshake
820
      -- backward compatibility
821
      r.wrap = sslt
28✔
822
      r.sni = false
28✔
823
    else
824
      -- has the target definition, copy our known keys
825
      r.wrap = sslt.wrap or false -- 'or false' because we do not want nils
98✔
826
      r.sni = sslt.sni or false -- 'or false' because we do not want nils
98✔
827
    end
828

UNCOV
829
  elseif t == "userdata" then
×
830
    -- it's an ssl-context object for the handshake
831
    -- backward compatibility
UNCOV
832
    r.wrap = sslt
×
UNCOV
833
    r.sni = false
×
834

835
  else
836
    error("ssl parameters; did not expect type "..tostring(sslt))
×
837
  end
838

839
  return r
385✔
840
end
841

842

843
---
844
-- Peforms an (async) ssl handshake on a connected TCP client socket.
845
-- NOTE: if not ssl-wrapped already, then replace all previous socket references, with the returned new ssl wrapped socket
846
-- Throws error and does not return nil+error, as that might silently fail
847
-- in code like this;
848
--   copas.addserver(s1, function(skt)
849
--       skt = copas.wrap(skt, sparams)
850
--       skt:dohandshake()   --> without explicit error checking, this fails silently and
851
--       skt:send(body)      --> continues unencrypted
852
-- @param skt Regular LuaSocket CLIENT socket object
853
-- @param wrap_params Table with ssl parameters
854
-- @return wrapped ssl socket, or throws an error
855
function copas.dohandshake(skt, wrap_params)
218✔
856
  ssl = ssl or require("ssl")
126✔
857

858
  local nskt = ssl_wrap(skt, wrap_params)
126✔
859

860
  sto_timeout(nskt, "write", true)
126✔
861
  local queue
862

863
  repeat
864
    local success, err = nskt:dohandshake()
344✔
865

866
    if success then
344✔
867
      sto_timeout()
112✔
868
      return nskt
112✔
869

870
    elseif not _isSocketTimeout[err] then
232✔
871
      sto_timeout()
14✔
872
      error("TLS/SSL handshake failed: " .. tostring(err))
14✔
873

874
    elseif sto_timed_out() then
280✔
UNCOV
875
      return nil, sto_error(err)
×
876

877
    elseif err == "wantwrite" then
218✔
UNCOV
878
      sto_change_queue("write")
×
UNCOV
879
      queue = _writing
×
880

881
    elseif err == "wantread" then
218✔
882
      sto_change_queue("read")
218✔
883
      queue = _reading
218✔
884

885
    else
886
      error("TLS/SSL handshake failed: " .. tostring(err))
×
887
    end
888

889
    coroutine_yield(nskt, queue)
218✔
890
  until false
218✔
891
end
892

893
-- flushes a client write buffer (deprecated)
894
function copas.flush()
218✔
895
end
896

897
-- wraps a TCP socket to use Copas methods (send, receive, flush and settimeout)
898
local _skt_mt_tcp = {
218✔
899
      __tostring = function(self)
900
        return tostring(self.socket).." (copas wrapped)"
21✔
901
      end,
902

903
      __index = {
218✔
904
        send = function (self, data, from, to)
905
          return copas.send (self.socket, data, from, to)
17,752✔
906
        end,
907

908
        receive = function (self, pattern, prefix)
909
          if user_timeouts_receive[self.socket] == 0 then
2,562,197✔
910
            return copas.receivepartial(self.socket, pattern, prefix)
14✔
911
          end
912
          return copas.receive(self.socket, pattern, prefix)
2,562,181✔
913
        end,
914

915
        receivepartial = function (self, pattern, prefix)
UNCOV
916
          return copas.receivepartial(self.socket, pattern, prefix)
×
917
        end,
918

919
        flush = function (self)
UNCOV
920
          return copas.flush(self.socket)
×
921
        end,
922

923
        settimeout = function (self, time)
924
          return copas.settimeout(self.socket, time)
224✔
925
        end,
926

927
        settimeouts = function (self, connect, send, receive)
UNCOV
928
          return copas.settimeouts(self.socket, connect, send, receive)
×
929
        end,
930

931
        -- TODO: socket.connect is a shortcut, and must be provided with an alternative
932
        -- if ssl parameters are available, it will also include a handshake
933
        connect = function(self, ...)
934
          local res, err = copas.connect(self.socket, ...)
245✔
935
          if res then
245✔
936
            if self.ssl_params.sni then self:sni() end
224✔
937
            if self.ssl_params.wrap then res, err = self:dohandshake() end
250✔
938
          end
939
          return res, err
238✔
940
        end,
941

942
        close = function(self, ...)
943
          return copas.close(self.socket, ...)
252✔
944
        end,
945

946
        -- TODO: socket.bind is a shortcut, and must be provided with an alternative
947
        bind = function(self, ...) return self.socket:bind(...) end,
218✔
948

949
        -- TODO: is this DNS related? hence blocking?
950
        getsockname = function(self, ...)
UNCOV
951
          local ok, ip, port, family = pcall(self.socket.getsockname, self.socket, ...)
×
UNCOV
952
          if ok then
×
UNCOV
953
            return ip, port, family
×
954
          else
UNCOV
955
            return nil, "not implemented by LuaSec"
×
956
          end
957
        end,
958

959
        getstats = function(self, ...) return self.socket:getstats(...) end,
218✔
960

961
        setstats = function(self, ...) return self.socket:setstats(...) end,
218✔
962

963
        listen = function(self, ...) return self.socket:listen(...) end,
218✔
964

965
        accept = function(self, ...) return self.socket:accept(...) end,
218✔
966

967
        setoption = function(self, ...)
UNCOV
968
          local ok, res, err = pcall(self.socket.setoption, self.socket, ...)
×
UNCOV
969
          if ok then
×
UNCOV
970
            return res, err
×
971
          else
UNCOV
972
            return nil, "not implemented by LuaSec"
×
973
          end
974
        end,
975

976
        getoption = function(self, ...)
977
          local ok, val, err = pcall(self.socket.getoption, self.socket, ...)
×
UNCOV
978
          if ok then
×
979
            return val, err
×
980
          else
UNCOV
981
            return nil, "not implemented by LuaSec"
×
982
          end
983
        end,
984

985
        -- TODO: is this DNS related? hence blocking?
986
        getpeername = function(self, ...)
UNCOV
987
          local ok, ip, port, family = pcall(self.socket.getpeername, self.socket, ...)
×
988
          if ok then
×
UNCOV
989
            return ip, port, family
×
990
          else
UNCOV
991
            return nil, "not implemented by LuaSec"
×
992
          end
993
        end,
994

995
        shutdown = function(self, ...) return self.socket:shutdown(...) end,
218✔
996

997
        sni = function(self, names, strict)
998
          local sslp = self.ssl_params
98✔
999
          self.socket = ssl_wrap(self.socket, sslp.wrap)
126✔
1000
          if names == nil then
98✔
1001
            names = sslp.sni.names
84✔
1002
            strict = sslp.sni.strict
84✔
1003
          end
1004
          return self.socket:sni(names, strict)
98✔
1005
        end,
1006

1007
        dohandshake = function(self, wrap_params)
1008
          local nskt, err = copas.dohandshake(self.socket, wrap_params or self.ssl_params.wrap)
126✔
1009
          if not nskt then return nskt, err end
112✔
1010
          self.socket = nskt  -- replace internal socket with the newly wrapped ssl one
112✔
1011
          return self
112✔
1012
        end,
1013

1014
        getalpn = function(self, ...)
UNCOV
1015
          local ok, proto, err = pcall(self.socket.getalpn, self.socket, ...)
×
UNCOV
1016
          if ok then
×
UNCOV
1017
            return proto, err
×
1018
          else
UNCOV
1019
            return nil, "not a tls socket"
×
1020
          end
1021
        end,
1022

1023
        getsniname = function(self, ...)
1024
          local ok, name, err = pcall(self.socket.getsniname, self.socket, ...)
×
UNCOV
1025
          if ok then
×
1026
            return name, err
×
1027
          else
UNCOV
1028
            return nil, "not a tls socket"
×
1029
          end
1030
        end,
1031
      }
218✔
1032
}
1033

1034
-- wraps a UDP socket, copy of TCP one adapted for UDP.
1035
local _skt_mt_udp = {__index = { }}
218✔
1036
for k,v in pairs(_skt_mt_tcp) do _skt_mt_udp[k] = _skt_mt_udp[k] or v end
654✔
1037
for k,v in pairs(_skt_mt_tcp.__index) do _skt_mt_udp.__index[k] = v end
5,014✔
1038

1039
_skt_mt_udp.__index.send        = function(self, ...) return self.socket:send(...) end
225✔
1040

1041
_skt_mt_udp.__index.sendto      = function(self, ...) return self.socket:sendto(...) end
239✔
1042

1043

1044
_skt_mt_udp.__index.receive =     function (self, size)
218✔
1045
                                    return copas.receive (self.socket, (size or UDP_DATAGRAM_MAX))
14✔
1046
                                  end
1047

1048
_skt_mt_udp.__index.receivefrom = function (self, size)
218✔
1049
                                    return copas.receivefrom (self.socket, (size or UDP_DATAGRAM_MAX))
28✔
1050
                                  end
1051

1052
                                  -- TODO: is this DNS related? hence blocking?
1053
_skt_mt_udp.__index.setpeername = function(self, ...) return self.socket:setpeername(...) end
225✔
1054

1055
_skt_mt_udp.__index.setsockname = function(self, ...) return self.socket:setsockname(...) end
218✔
1056

1057
                                    -- do not close client, as it is also the server for udp.
1058
_skt_mt_udp.__index.close       = function(self, ...) return true end
232✔
1059

1060
_skt_mt_udp.__index.settimeouts = function (self, connect, send, receive)
218✔
UNCOV
1061
                                    return copas.settimeouts(self.socket, connect, send, receive)
×
1062
                                  end
1063

1064

1065

1066
---
1067
-- Wraps a LuaSocket socket object in an async Copas based socket object.
1068
-- @param skt The socket to wrap
1069
-- @sslt (optional) Table with ssl parameters, use an empty table to use ssl with defaults
1070
-- @return wrapped socket object
1071
function copas.wrap (skt, sslt)
218✔
1072
  if (getmetatable(skt) == _skt_mt_tcp) or (getmetatable(skt) == _skt_mt_udp) then
413✔
UNCOV
1073
    return skt -- already wrapped
×
1074
  end
1075

1076
  skt:settimeout(0)
417✔
1077

1078
  if isTCP(skt) then
531✔
1079
    return setmetatable ({socket = skt, ssl_params = normalize_sslt(sslt)}, _skt_mt_tcp)
495✔
1080
  else
1081
    return setmetatable ({socket = skt}, _skt_mt_udp)
28✔
1082
  end
1083
end
1084

1085
--- Wraps a handler in a function that deals with wrapping the socket and doing the
1086
-- optional ssl handshake.
1087
function copas.handler(handler, sslparams)
218✔
1088
  -- TODO: pass a timeout value to set, and use during handshake
1089
  return function (skt, ...)
1090
    skt = copas.wrap(skt, sslparams) -- this call will normalize the sslparams table
144✔
1091
    local sslp = skt.ssl_params
112✔
1092
    if sslp.sni then skt:sni(sslp.sni.names, sslp.sni.strict) end
112✔
1093
    if sslp.wrap then skt:dohandshake(sslp.wrap) end
112✔
1094
    return handler(skt, ...)
105✔
1095
  end
1096
end
1097

1098

1099
--------------------------------------------------
1100
-- Error handling
1101
--------------------------------------------------
1102

1103
local _errhandlers = setmetatable({}, { __mode = "k" })   -- error handler per coroutine
218✔
1104

1105

1106
function copas.gettraceback(msg, co, skt)
218✔
1107
  local co_str = co == nil and "nil" or copas.getthreadname(co)
43✔
1108
  local skt_str = skt == nil and "nil" or copas.getsocketname(skt)
43✔
1109
  local msg_str = msg == nil and "" or tostring(msg)
43✔
1110
  if msg_str == "" then
43✔
UNCOV
1111
    msg_str = ("(coroutine: %s, socket: %s)"):format(msg_str, co_str, skt_str)
×
1112
  else
1113
    msg_str = ("%s (coroutine: %s, socket: %s)"):format(msg_str, co_str, skt_str)
43✔
1114
  end
1115

1116
  if type(co) == "thread" then
43✔
1117
    -- regular Copas coroutine
1118
    return debug.traceback(co, msg_str)
43✔
1119
  end
1120
  -- not a coroutine, but the main thread, this happens if a timeout callback
1121
  -- (see `copas.timeout` causes an error (those callbacks run on the main thread).
UNCOV
1122
  return debug.traceback(msg_str, 2)
×
1123
end
1124

1125

1126
local function _deferror(msg, co, skt)
1127
  print(copas.gettraceback(msg, co, skt))
35✔
1128
end
1129

1130

1131
function copas.seterrorhandler(err, default)
218✔
1132
  assert(err == nil or type(err) == "function", "Expected the handler to be a function, or nil")
70✔
1133
  if default then
70✔
1134
    assert(err ~= nil, "Expected the handler to be a function when setting the default")
49✔
1135
    _deferror = err
49✔
1136
  else
1137
    _errhandlers[coroutine_running()] = err
21✔
1138
  end
1139
end
1140
copas.setErrorHandler = copas.seterrorhandler  -- deprecated; old casing
218✔
1141

1142

1143
function copas.geterrorhandler(co)
218✔
1144
  co = co or coroutine_running()
14✔
1145
  return _errhandlers[co] or _deferror
14✔
1146
end
1147

1148

1149
-- if `bool` is truthy, then the original socket errors will be returned in case of timeouts;
1150
-- `timeout, wantread, wantwrite, Operation already in progress`. If falsy, it will always
1151
-- return `timeout`.
1152
function copas.useSocketTimeoutErrors(bool)
218✔
1153
  useSocketTimeoutErrors[coroutine_running()] = not not bool -- force to a boolean
7✔
1154
end
1155

1156
-------------------------------------------------------------------------------
1157
-- Thread handling
1158
-------------------------------------------------------------------------------
1159

1160
local function _doTick (co, skt, ...)
1161
  if not co then return end
294,062✔
1162

1163
  -- if a coroutine was canceled/removed, don't resume it
1164
  if _canceled[co] then
294,062✔
1165
    _canceled[co] = nil -- also clean up the registry
29✔
1166
    _threads[co] = nil
29✔
1167
    return
29✔
1168
  end
1169

1170
  -- res: the socket (being read/write on) or the time to sleep
1171
  -- new_q: either _writing, _reading, or _sleeping
1172
  -- local time_before = gettime()
1173
  local ok, res, new_q = coroutine_resume(co, skt, ...)
294,033✔
1174
  -- local duration = gettime() - time_before
1175
  -- if duration > 1 then
1176
  --   duration = math.floor(duration * 1000)
1177
  --   pcall(_errhandlers[co] or _deferror, "task ran for "..tostring(duration).." milliseconds.", co, skt)
1178
  -- end
1179

1180
  if new_q == _reading or new_q == _writing or new_q == _sleeping then
294,026✔
1181
    -- we're yielding to a new queue
1182
    new_q:insert (res)
287,956✔
1183
    new_q:push (res, co)
287,956✔
1184
    return
287,956✔
1185
  end
1186

1187
  -- coroutine is terminating
1188

1189
  if ok and coroutine_status(co) ~= "dead" then
6,070✔
1190
    -- it called coroutine.yield from a non-Copas function which is unexpected
1191
    ok = false
7✔
1192
    res = "coroutine.yield was called without a resume first, user-code cannot yield to Copas"
7✔
1193
  end
1194

1195
  if not ok then
6,070✔
1196
    local k, e = pcall(_errhandlers[co] or _deferror, res, co, skt)
53✔
1197
    if not k then
53✔
UNCOV
1198
      print("Failed executing error handler: " .. tostring(e))
×
1199
    end
1200
  end
1201

1202
  local skt_to_close = _autoclose[co]
6,070✔
1203
  if skt_to_close then
6,070✔
1204
    skt_to_close:close()
133✔
1205
    _autoclose[co] = nil
133✔
1206
    _autoclose_r[skt_to_close] = nil
133✔
1207
  end
1208

1209
  _errhandlers[co] = nil
6,070✔
1210
end
1211

1212

1213
local _accept do
218✔
1214
  local client_counters = setmetatable({}, { __mode = "k" })
218✔
1215

1216
  -- accepts a connection on socket input
1217
  function _accept(server_skt, handler)
188✔
1218
    local client_skt = server_skt:accept()
140✔
1219
    if client_skt then
140✔
1220
      local count = (client_counters[server_skt] or 0) + 1
140✔
1221
      client_counters[server_skt] = count
140✔
1222
      object_names[client_skt] = object_names[server_skt] .. ":client_" .. count
166✔
1223

1224
      client_skt:settimeout(0)
140✔
1225
      copas.settimeouts(client_skt, user_timeouts_connect[server_skt],  -- copy server socket timeout settings
280✔
1226
        user_timeouts_send[server_skt], user_timeouts_receive[server_skt])
166✔
1227

1228
      local co = coroutine_create(handler)
140✔
1229
      object_names[co] = object_names[server_skt] .. ":handler_" .. count
140✔
1230

1231
      if copas.autoclose then
140✔
1232
        _autoclose[co] = client_skt
140✔
1233
        _autoclose_r[client_skt] = co
140✔
1234
      end
1235

1236
      _doTick(co, client_skt)
140✔
1237
    end
1238
  end
1239
end
1240

1241
-------------------------------------------------------------------------------
1242
-- Adds a server/handler pair to Copas dispatcher
1243
-------------------------------------------------------------------------------
1244

1245
do
1246
  local function addTCPserver(server, handler, timeout, name)
1247
    server:settimeout(0)
105✔
1248
    if name then
105✔
UNCOV
1249
      object_names[server] = name
×
1250
    end
1251
    _servers[server] = handler
105✔
1252
    _reading:insert(server)
105✔
1253
    if timeout then
105✔
1254
      copas.settimeout(server, timeout)
21✔
1255
    end
1256
  end
1257

1258
  local function addUDPserver(server, handler, timeout, name)
UNCOV
1259
    server:settimeout(0)
×
UNCOV
1260
    local co = coroutine_create(handler)
×
UNCOV
1261
    if name then
×
UNCOV
1262
      object_names[server] = name
×
1263
    end
UNCOV
1264
    object_names[co] = object_names[server]..":handler"
×
UNCOV
1265
    _reading:insert(server)
×
1266
    if timeout then
×
1267
      copas.settimeout(server, timeout)
×
1268
    end
1269
    _doTick(co, server)
×
1270
  end
1271

1272

1273
  function copas.addserver(server, handler, timeout, name)
218✔
1274
    if isTCP(server) then
135✔
1275
      addTCPserver(server, handler, timeout, name)
135✔
1276
    else
UNCOV
1277
      addUDPserver(server, handler, timeout, name)
×
1278
    end
1279
  end
1280
end
1281

1282

1283
function copas.removeserver(server, keep_open)
218✔
1284
  local skt = server
98✔
1285
  local mt = getmetatable(server)
98✔
1286
  if mt == _skt_mt_tcp or mt == _skt_mt_udp then
98✔
UNCOV
1287
    skt = server.socket
×
1288
  end
1289

1290
  _servers:remove(skt)
98✔
1291
  _reading:remove(skt)
98✔
1292

1293
  if keep_open then
98✔
1294
    return true
21✔
1295
  end
1296
  return server:close()
77✔
1297
end
1298

1299

1300

1301
-------------------------------------------------------------------------------
1302
-- Adds an new coroutine thread to Copas dispatcher
1303
-------------------------------------------------------------------------------
1304
function copas.addnamedthread(name, handler, ...)
218✔
1305
  if type(name) == "function" and type(handler) == "string" then
6,225✔
1306
    -- old call, flip args for compatibility
UNCOV
1307
    name, handler = handler, name
×
1308
  end
1309

1310
  -- create a coroutine that skips the first argument, which is always the socket
1311
  -- passed by the scheduler, but `nil` in case of a task/thread
1312
  local thread = coroutine_create(function(_, ...)
12,450✔
1313
    copas.pause()
6,225✔
1314
    return handler(...)
6,204✔
1315
  end)
1316
  if name then
6,225✔
1317
    object_names[thread] = name
534✔
1318
  end
1319

1320
  _threads[thread] = true -- register this thread so it can be removed
6,225✔
1321
  _doTick (thread, nil, ...)
6,225✔
1322
  return thread
6,225✔
1323
end
1324

1325

1326
function copas.addthread(handler, ...)
218✔
1327
  return copas.addnamedthread(nil, handler, ...)
5,614✔
1328
end
1329

1330

1331
function copas.removethread(thread)
218✔
1332
  -- if the specified coroutine is registered, add it to the canceled table so
1333
  -- that next time it tries to resume it exits.
1334
  _canceled[thread] = _threads[thread or 0]
63✔
1335
  _sleeping:cancel(thread)
63✔
1336
end
1337

1338

1339

1340
-------------------------------------------------------------------------------
1341
-- Sleep/pause management functions
1342
-------------------------------------------------------------------------------
1343

1344
-- yields the current coroutine and wakes it after 'sleeptime' seconds.
1345
-- If sleeptime < 0 then it sleeps until explicitly woken up using 'wakeup'
1346
-- TODO: deprecated, remove in next major
1347
function copas.sleep(sleeptime)
218✔
UNCOV
1348
  coroutine_yield((sleeptime or 0), _sleeping)
×
1349
end
1350

1351

1352
-- yields the current coroutine and wakes it after 'sleeptime' seconds.
1353
-- if sleeptime < 0 then it sleeps 0 seconds.
1354
function copas.pause(sleeptime)
218✔
1355
  local s = gettime()
282,804✔
1356
  if sleeptime and sleeptime > 0 then
282,804✔
1357
    coroutine_yield(sleeptime, _sleeping)
10,841✔
1358
  else
1359
    coroutine_yield(0, _sleeping)
274,291✔
1360
  end
1361
  return gettime() - s
282,537✔
1362
end
1363

1364

1365
-- yields the current coroutine until explicitly woken up using 'wakeup'
1366
function copas.pauseforever()
218✔
1367
  local s = gettime()
3,759✔
1368
  coroutine_yield(-1, _sleeping)
3,759✔
1369
  return gettime() - s
3,738✔
1370
end
1371

1372

1373
-- Wakes up a sleeping coroutine 'co'.
1374
function copas.wakeup(co)
218✔
1375
  _sleeping:wakeup(co)
3,773✔
1376
end
1377

1378

1379

1380
-------------------------------------------------------------------------------
1381
-- Timeout management
1382
-------------------------------------------------------------------------------
1383

1384
do
1385
  local timeout_register = setmetatable({}, { __mode = "k" })
218✔
1386
  local timerwheel = require("timerwheel").new({
438✔
1387
      now = gettime,
218✔
1388
      precision = TIMEOUT_PRECISION,
218✔
1389
      ringsize = math.floor(60*60*24/TIMEOUT_PRECISION),  -- ring size 1 day
218✔
1390
      err_handler = function(err)
1391
        return _deferror(err, core_timer_thread)
18✔
1392
      end,
1393
    })
1394

1395
  core_timer_thread = copas.addnamedthread("copas_core_timer", function()
436✔
1396
    while true do
1397
      copas.pause(TIMEOUT_PRECISION)
7,450✔
1398
      timerwheel:step()
9,274✔
1399
    end
1400
  end)
1401

1402
  -- get the number of timeouts running
1403
  function copas.gettimeouts()
218✔
1404
    return timerwheel:count()
3,129✔
1405
  end
1406

1407
  --- Sets the timeout for the current coroutine.
1408
  -- @param delay delay (seconds), use 0 (or math.huge) to cancel the timerout
1409
  -- @param callback function with signature: `function(coroutine)` where coroutine is the routine that timed-out
1410
  -- @return true
1411
  function copas.timeout(delay, callback)
218✔
1412
    local co = coroutine_running()
5,166,380✔
1413
    local existing_timer = timeout_register[co]
5,166,380✔
1414

1415
    if existing_timer then
5,166,380✔
1416
      timerwheel:cancel(existing_timer)
5,042✔
1417
    end
1418

1419
    if delay > 0 and delay ~= math.huge then
5,166,380✔
1420
      timeout_register[co] = timerwheel:set(delay, callback, co)
8,848✔
1421
    elseif delay == 0 or delay == math.huge then
5,159,500✔
1422
      timeout_register[co] = nil
5,159,500✔
1423
    else
UNCOV
1424
      error("timout value must be greater than or equal to 0, got: "..tostring(delay))
×
1425
    end
1426

1427
    return true
5,166,380✔
1428
  end
1429

1430
end
1431

1432

1433
-------------------------------------------------------------------------------
1434
-- main tasks: manage readable and writable socket sets
1435
-------------------------------------------------------------------------------
1436
-- a task is an object with a required method `step()` that deals with a
1437
-- single step for that task.
1438

1439
local _tasks = {} do
218✔
1440
  function _tasks:add(tsk)
218✔
1441
    _tasks[#_tasks + 1] = tsk
872✔
1442
  end
1443
end
1444

1445

1446
-- a task to check ready to read events
1447
local _readable_task = {} do
218✔
1448

1449
  _readable_task._events = {}
218✔
1450

1451
  local function tick(skt)
1452
    local handler = _servers[skt]
1,029✔
1453
    if handler then
1,029✔
1454
      _accept(skt, handler)
180✔
1455
    else
1456
      _reading:remove(skt)
889✔
1457
      _doTick(_reading:pop(skt), skt)
1,134✔
1458
    end
1459
  end
1460

1461
  function _readable_task:step()
218✔
1462
    for _, skt in ipairs(self._events) do
276,575✔
1463
      tick(skt)
1,029✔
1464
    end
1465
  end
1466

1467
  _tasks:add(_readable_task)
278✔
1468
end
1469

1470

1471
-- a task to check ready to write events
1472
local _writable_task = {} do
218✔
1473

1474
  _writable_task._events = {}
218✔
1475

1476
  local function tick(skt)
1477
    _writing:remove(skt)
420✔
1478
    _doTick(_writing:pop(skt), skt)
540✔
1479
  end
1480

1481
  function _writable_task:step()
218✔
1482
    for _, skt in ipairs(self._events) do
275,962✔
1483
      tick(skt)
420✔
1484
    end
1485
  end
1486

1487
  _tasks:add(_writable_task)
278✔
1488
end
1489

1490

1491

1492
-- sleeping threads task
1493
local _sleeping_task = {} do
218✔
1494

1495
  function _sleeping_task:step()
218✔
1496
    local now = gettime()
275,542✔
1497

1498
    local co = _sleeping:pop(now)
275,542✔
1499
    while co do
283,813✔
1500
      -- we're pushing them to _resumable, since that list will be replaced before
1501
      -- executing. This prevents tasks running twice in a row with pause(0) for example.
1502
      -- So here we won't execute, but at _resumable step which is next
1503
      _resumable:push(co)
8,271✔
1504
      co = _sleeping:pop(now)
10,602✔
1505
    end
1506
  end
1507

1508
  _tasks:add(_sleeping_task)
218✔
1509
end
1510

1511

1512

1513
-- resumable threads task
1514
local _resumable_task = {} do
218✔
1515

1516
  function _resumable_task:step()
218✔
1517
    -- replace the resume list before iterating, so items placed in there
1518
    -- will indeed end up in the next copas step, not in this one, and not
1519
    -- create a loop
1520
    local resumelist = _resumable:clear_resumelist()
275,542✔
1521

1522
    for _, co in ipairs(resumelist) do
561,927✔
1523
      _doTick(co)
286,388✔
1524
    end
1525
  end
1526

1527
  _tasks:add(_resumable_task)
218✔
1528
end
1529

1530

1531
-------------------------------------------------------------------------------
1532
-- Checks for reads and writes on sockets
1533
-------------------------------------------------------------------------------
1534
local _select_plain do
218✔
1535

1536
  local last_cleansing = 0
218✔
1537
  local duration = function(t2, t1) return t2-t1 end
275,659✔
1538

1539
  if not socket then
218✔
1540
    -- socket module unavailable, switch to luasystem sleep
1541
    _select_plain = block_sleep
7✔
1542
  else
1543
    -- use socket.select to handle socket-io
1544
    _select_plain = function(timeout)
1545
      local err
1546
      local now = gettime()
275,441✔
1547

1548
      -- remove any closed sockets to prevent select from hanging on them
1549
      if _closed[1] then
275,441✔
1550
        for i, skt in ipairs(_closed) do
501✔
1551
          _closed[i] = { _reading:remove(skt), _writing:remove(skt) }
396✔
1552
        end
1553
      end
1554

1555
      _readable_task._events, _writable_task._events, err = socket.select(_reading, _writing, timeout)
275,441✔
1556
      local r_events, w_events = _readable_task._events, _writable_task._events
275,441✔
1557

1558
      -- inject closed sockets in readable/writeable task so they can error out properly
1559
      if _closed[1] then
275,441✔
1560
        for i, skts in ipairs(_closed) do
501✔
1561
          _closed[i] = nil
252✔
1562
          r_events[#r_events+1] = skts[1]
252✔
1563
          w_events[#w_events+1] = skts[2]
252✔
1564
        end
1565
      end
1566

1567
      if duration(now, last_cleansing) > WATCH_DOG_TIMEOUT then
395,983✔
1568
        last_cleansing = now
204✔
1569

1570
        -- Check all sockets selected for reading, and check how long they have been waiting
1571
        -- for data already, without select returning them as readable
1572
        for skt,time in pairs(_reading_log) do
204✔
UNCOV
1573
          if not r_events[skt] and duration(now, time) > WATCH_DOG_TIMEOUT then
×
1574
            -- This one timedout while waiting to become readable, so move
1575
            -- it in the readable list and try and read anyway, despite not
1576
            -- having been returned by select
UNCOV
1577
            _reading_log[skt] = nil
×
UNCOV
1578
            r_events[#r_events + 1] = skt
×
1579
            r_events[skt] = #r_events
×
1580
          end
1581
        end
1582

1583
        -- Do the same for writing
1584
        for skt,time in pairs(_writing_log) do
204✔
1585
          if not w_events[skt] and duration(now, time) > WATCH_DOG_TIMEOUT then
×
UNCOV
1586
            _writing_log[skt] = nil
×
UNCOV
1587
            w_events[#w_events + 1] = skt
×
UNCOV
1588
            w_events[skt] = #w_events
×
1589
          end
1590
        end
1591
      end
1592

1593
      if err == "timeout" and #r_events + #w_events > 0 then
275,441✔
1594
        return nil
7✔
1595
      else
1596
        return err
275,434✔
1597
      end
1598
    end
1599
  end
1600
end
1601

1602

1603

1604
-------------------------------------------------------------------------------
1605
-- Dispatcher loop step.
1606
-- Listen to client requests and handles them
1607
-- Returns false if no socket-data was handled, or true if there was data
1608
-- handled (or nil + error message)
1609
-------------------------------------------------------------------------------
1610

1611
local copas_stats
1612
local min_ever, max_ever
1613

1614
local _select = _select_plain
218✔
1615

1616
-- instrumented version of _select() to collect stats
1617
local _select_instrumented = function(timeout)
UNCOV
1618
  if copas_stats then
×
UNCOV
1619
    local step_duration = gettime() - copas_stats.step_start
×
UNCOV
1620
    copas_stats.duration_max = math.max(copas_stats.duration_max, step_duration)
×
UNCOV
1621
    copas_stats.duration_min = math.min(copas_stats.duration_min, step_duration)
×
UNCOV
1622
    copas_stats.duration_tot = copas_stats.duration_tot + step_duration
×
UNCOV
1623
    copas_stats.steps = copas_stats.steps + 1
×
1624
  else
1625
    copas_stats = {
×
1626
      duration_max = -1,
1627
      duration_min = 999999,
1628
      duration_tot = 0,
1629
      steps = 0,
1630
    }
1631
  end
1632

UNCOV
1633
  local err = _select_plain(timeout)
×
1634

UNCOV
1635
  local now = gettime()
×
UNCOV
1636
  copas_stats.time_start = copas_stats.time_start or now
×
UNCOV
1637
  copas_stats.step_start = now
×
1638

1639
  return err
×
1640
end
1641

1642

1643
function copas.step(timeout)
218✔
1644
  -- Need to wake up the select call in time for the next sleeping event
1645
  if not _resumable:done() then
396,118✔
1646
    timeout = 0
268,617✔
1647
  else
1648
    timeout = math.min(_sleeping:getnext(), timeout or math.huge)
8,857✔
1649
  end
1650

1651
  local err = _select(timeout)
275,546✔
1652

1653
  for _, tsk in ipairs(_tasks) do
1,377,715✔
1654
    tsk:step()
1,102,176✔
1655
  end
1656

1657
  if err then
275,539✔
1658
    if err == "timeout" then
274,279✔
1659
      if timeout + 0.01 > TIMEOUT_PRECISION and math.random(100) > 90 then
274,174✔
1660
        -- we were idle, so occasionally do a GC sweep to ensure lingering
1661
        -- sockets are closed, and we don't accidentally block the loop from
1662
        -- exiting
1663
        collectgarbage()
490✔
1664
      end
1665
      return false
274,174✔
1666
    end
1667
    return nil, err
105✔
1668
  end
1669

1670
  return true
1,260✔
1671
end
1672

1673

1674
-------------------------------------------------------------------------------
1675
-- Check whether there is something to do.
1676
-- returns false if there are no sockets for read/write nor tasks scheduled
1677
-- (which means Copas is in an empty spin)
1678
-------------------------------------------------------------------------------
1679
function copas.finished()
218✔
1680
  return #_reading == 0 and #_writing == 0 and _resumable:done() and _sleeping:done(copas.gettimeouts())
278,396✔
1681
end
1682

1683

1684
local resetexit do
218✔
1685
  local exit_semaphore, exiting
1686

1687
  function resetexit()
188✔
1688
    exit_semaphore = copas.semaphore.new(1, 0, math.huge)
503✔
1689
    exiting = false
351✔
1690
  end
1691

1692
  -- Signals tasks to exit. But only if they check for it. By calling `copas.exiting`
1693
  -- they can check if they should exit. Or by calling `copas.waitforexit` they can
1694
  -- wait until the exit signal is given.
1695
  function copas.exit()
218✔
1696
    if exiting then return end
344✔
1697
    exiting = true
344✔
1698
    exit_semaphore:destroy()
344✔
1699
  end
1700

1701
  -- returns whether Copas is in the process of exiting. Exit can be started by
1702
  -- calling `copas.exit()`.
1703
  function copas.exiting()
218✔
1704
    return exiting
681✔
1705
  end
1706

1707
  -- Pauses the current coroutine until Copas is exiting. To be used as an exit
1708
  -- signal for tasks that need to clean up before exiting.
1709
  function copas.waitforexit()
218✔
1710
    exit_semaphore:take(1)
14✔
1711
  end
1712
end
1713

1714

1715
--- Forcibly cancels all pending work and signals exit.
1716
-- Intended for test teardown only. Abandons all registered threads and sockets
1717
-- without giving them a chance to clean up. After this call copas.finished()
1718
-- will return true and the loop will exit. The module is left in a clean state
1719
-- ready for the next copas.loop() call.
1720
function copas.cancelall()
218✔
1721
  -- 1. clear resumable queue
NEW
1722
  _resumable:clear_resumelist()
×
1723

1724
  -- 2. drain sleeping heap
NEW
1725
  _sleeping:cancelall()
×
1726

1727
  -- 3. close and drain reading sockets
NEW
1728
  while _reading[1] do
×
NEW
1729
    copas.close(_reading[1])
×
NEW
1730
    _reading:remove(_reading[1])
×
1731
  end
1732

1733
  -- 4. close and drain writing sockets
NEW
1734
  while _writing[1] do
×
NEW
1735
    copas.close(_writing[1])
×
NEW
1736
    _writing:remove(_writing[1])
×
1737
  end
1738

1739
  -- 5. remove all servers
NEW
1740
  while _servers[1] do
×
NEW
1741
    copas.removeserver(_servers[1])
×
1742
  end
1743

1744
  -- 6. clear non-weak ancillary tables
NEW
1745
  _closed = {}
×
NEW
1746
  _reading_log = {}
×
NEW
1747
  _writing_log = {}
×
1748

1749
  -- 7. signal exit
NEW
1750
  copas.exit()
×
1751
end
1752

1753

1754
local _getstats do
218✔
1755
  local _getstats_instrumented, _getstats_plain
1756

1757

1758
  function _getstats_plain(enable)
188✔
1759
    -- this function gets hit if turned off, so turn on if true
UNCOV
1760
    if enable == true then
×
UNCOV
1761
      _select = _select_instrumented
×
UNCOV
1762
      _getstats = _getstats_instrumented
×
1763
      -- reset stats
UNCOV
1764
      min_ever = nil
×
UNCOV
1765
      max_ever = nil
×
UNCOV
1766
      copas_stats = nil
×
1767
    end
UNCOV
1768
    return {}
×
1769
  end
1770

1771

1772
  -- convert from seconds to millisecs, with microsec precision
1773
  local function useconds(t)
1774
    return math.floor((t * 1000000) + 0.5) / 1000
×
1775
  end
1776
  -- convert from seconds to seconds, with millisec precision
1777
  local function mseconds(t)
UNCOV
1778
    return math.floor((t * 1000) + 0.5) / 1000
×
1779
  end
1780

1781

1782
  function _getstats_instrumented(enable)
188✔
UNCOV
1783
    if enable == false then
×
UNCOV
1784
      _select = _select_plain
×
1785
      _getstats = _getstats_plain
×
1786
      -- instrumentation disabled, so switch to the plain implementation
UNCOV
1787
      return _getstats(enable)
×
1788
    end
UNCOV
1789
    if (not copas_stats) or (copas_stats.step == 0) then
×
1790
      return {}
×
1791
    end
1792
    local stats = copas_stats
×
UNCOV
1793
    copas_stats = nil
×
UNCOV
1794
    min_ever = math.min(min_ever or 9999999, stats.duration_min)
×
1795
    max_ever = math.max(max_ever or 0, stats.duration_max)
×
UNCOV
1796
    stats.duration_min_ever = min_ever
×
UNCOV
1797
    stats.duration_max_ever = max_ever
×
UNCOV
1798
    stats.duration_avg = stats.duration_tot / stats.steps
×
UNCOV
1799
    stats.step_start = nil
×
UNCOV
1800
    stats.time_end = gettime()
×
UNCOV
1801
    stats.time_tot = stats.time_end - stats.time_start
×
UNCOV
1802
    stats.time_avg = stats.time_tot / stats.steps
×
1803

UNCOV
1804
    stats.duration_avg = useconds(stats.duration_avg)
×
1805
    stats.duration_max = useconds(stats.duration_max)
×
1806
    stats.duration_max_ever = useconds(stats.duration_max_ever)
×
1807
    stats.duration_min = useconds(stats.duration_min)
×
UNCOV
1808
    stats.duration_min_ever = useconds(stats.duration_min_ever)
×
1809
    stats.duration_tot = useconds(stats.duration_tot)
×
1810
    stats.time_avg = useconds(stats.time_avg)
×
1811
    stats.time_start = mseconds(stats.time_start)
×
UNCOV
1812
    stats.time_end = mseconds(stats.time_end)
×
1813
    stats.time_tot = mseconds(stats.time_tot)
×
UNCOV
1814
    return stats
×
1815
  end
1816

1817
  _getstats = _getstats_plain
218✔
1818
end
1819

1820

1821
function copas.status(enable_stats)
218✔
UNCOV
1822
  local res = _getstats(enable_stats)
×
1823
  res.running = not not copas.running
×
UNCOV
1824
  res.timeout = copas.gettimeouts()
×
UNCOV
1825
  res.timer, res.inactive = _sleeping:status()
×
UNCOV
1826
  res.read = #_reading
×
UNCOV
1827
  res.write = #_writing
×
1828
  res.active = _resumable:count()
×
1829
  return res
×
1830
end
1831

1832

1833
-------------------------------------------------------------------------------
1834
-- Dispatcher endless loop.
1835
-- Listen to client requests and handles them forever
1836
-------------------------------------------------------------------------------
1837
function copas.loop(initializer, timeout)
278✔
1838
  if type(initializer) == "function" then
351✔
1839
    copas.addnamedthread("copas_initializer", initializer)
179✔
1840
  else
1841
    timeout = initializer or timeout
210✔
1842
  end
1843

1844
  resetexit()
351✔
1845
  copas.running = true
351✔
1846
  while true do
1847
    copas.step(timeout)
275,546✔
1848
    if copas.finished() then
396,109✔
1849
      if copas.exiting() then
871✔
1850
        break
194✔
1851
      end
1852
      copas.exit()
337✔
1853
    end
1854
  end
1855
  copas.running = false
344✔
1856
end
1857

1858

1859
-------------------------------------------------------------------------------
1860
-- Naming sockets and coroutines.
1861
-------------------------------------------------------------------------------
1862
do
1863
  local function realsocket(skt)
1864
    local mt = getmetatable(skt)
105✔
1865
    if mt == _skt_mt_tcp or mt == _skt_mt_udp then
105✔
1866
      return skt.socket
105✔
1867
    else
1868
      return skt
×
1869
    end
1870
  end
1871

1872

1873
  function copas.setsocketname(name, skt)
278✔
1874
    assert(type(name) == "string", "expected arg #1 to be a string")
105✔
1875
    skt = assert(realsocket(skt), "expected arg #2 to be a socket")
135✔
1876
    object_names[skt] = name
105✔
1877
  end
1878

1879

1880
  function copas.getsocketname(skt)
278✔
UNCOV
1881
    skt = assert(realsocket(skt), "expected arg #1 to be a socket")
×
UNCOV
1882
    return object_names[skt]
×
1883
  end
1884
end
1885

1886

1887
function copas.setthreadname(name, coro)
278✔
1888
  assert(type(name) == "string", "expected arg #1 to be a string")
70✔
1889
  coro = coro or coroutine_running()
70✔
1890
  assert(type(coro) == "thread", "expected arg #2 to be a coroutine or nil")
70✔
1891
  object_names[coro] = name
70✔
1892
end
1893

1894

1895
function copas.getthreadname(coro)
278✔
1896
  coro = coro or coroutine_running()
43✔
1897
  assert(type(coro) == "thread", "expected arg #1 to be a coroutine or nil")
43✔
1898
  return object_names[coro]
47✔
1899
end
1900

1901
-------------------------------------------------------------------------------
1902
-- Debug functionality.
1903
-------------------------------------------------------------------------------
1904
do
1905
  copas.debug = {}
218✔
1906

1907
  local log_core    -- if truthy, the core-timer will also be logged
1908
  local debug_log   -- function used as logger
1909

1910

1911
  local debug_yield = function(skt, queue)
1912
    local name = object_names[coroutine_running()]
6,238✔
1913

1914
    if log_core or name ~= "copas_core_timer" then
6,238✔
1915
      if queue == _sleeping then
6,212✔
1916
        debug_log("yielding '", name, "' to SLEEP for ", skt," seconds")
6,116✔
1917

1918
      elseif queue == _writing then
96✔
1919
        debug_log("yielding '", name, "' to WRITE on '", object_names[skt], "'")
18✔
1920

1921
      elseif queue == _reading then
82✔
1922
        debug_log("yielding '", name, "' to READ on '", object_names[skt], "'")
86✔
1923

1924
      else
UNCOV
1925
        debug_log("thread '", name, "' yielding to unexpected queue; ", tostring(queue), " (", type(queue), ")", debug.traceback())
×
1926
      end
1927
    end
1928

1929
    return coroutine.yield(skt, queue)
6,238✔
1930
  end
1931

1932

1933
  local debug_resume = function(coro, skt, ...)
1934
    local name = object_names[coro]
6,252✔
1935

1936
    if skt then
6,252✔
1937
      debug_log("resuming '", name, "' for socket '", object_names[skt], "'")
96✔
1938
    else
1939
      if log_core or name ~= "copas_core_timer" then
6,156✔
1940
        debug_log("resuming '", name, "'")
6,130✔
1941
      end
1942
    end
1943
    return coroutine.resume(coro, skt, ...)
6,252✔
1944
  end
1945

1946

1947
  local debug_create = function(f)
1948
    local f_wrapped = function(...)
1949
      local results = pack(f(...))
18✔
1950
      debug_log("exiting '", object_names[coroutine_running()], "'")
14✔
1951
      return unpack(results)
14✔
1952
    end
1953

1954
    return coroutine.create(f_wrapped)
14✔
1955
  end
1956

1957

1958
  debug_log = fnil
218✔
1959

1960

1961
  -- enables debug output for all coroutine operations.
1962
  function copas.debug.start(logger, core)
436✔
1963
    log_core = core
7✔
1964
    debug_log = logger or print
7✔
1965
    coroutine_yield = debug_yield
7✔
1966
    coroutine_resume = debug_resume
7✔
1967
    coroutine_create = debug_create
7✔
1968
  end
1969

1970

1971
  -- disables debug output for coroutine operations.
1972
  function copas.debug.stop()
436✔
UNCOV
1973
    debug_log = fnil
×
UNCOV
1974
    coroutine_yield = coroutine.yield
×
UNCOV
1975
    coroutine_resume = coroutine.resume
×
UNCOV
1976
    coroutine_create = coroutine.create
×
1977
  end
1978

1979
  do
1980
    local call_id = 0
218✔
1981

1982
    -- Description table of socket functions for debug output.
1983
    -- each socket function name has TWO entries;
1984
    -- 'name_in' and 'name_out', each being an array of names/descriptions of respectively
1985
    -- input parameters and return values.
1986
    -- If either table has a 'callback' key, then that is a function that will be called
1987
    -- with the parameters/return-values for further inspection.
1988
    local args = {
218✔
1989
      settimeout_in = {
218✔
1990
        "socket ",
158✔
1991
        "seconds",
158✔
1992
        "mode   ",
1993
      },
218✔
1994
      settimeout_out = {
218✔
1995
        "success",
158✔
1996
        "error  ",
1997
      },
218✔
1998
      connect_in = {
218✔
1999
        "socket ",
158✔
2000
        "address",
158✔
2001
        "port   ",
2002
      },
218✔
2003
      connect_out = {
218✔
2004
        "success",
158✔
2005
        "error  ",
2006
      },
218✔
2007
      getfd_in = {
218✔
2008
        "socket ",
2009
        -- callback = function(...)
2010
        --   print(debug.traceback("called from:", 4))
2011
        -- end,
2012
      },
218✔
2013
      getfd_out = {
218✔
2014
        "fd",
2015
      },
218✔
2016
      send_in = {
218✔
2017
        "socket   ",
158✔
2018
        "data     ",
158✔
2019
        "idx-start",
158✔
2020
        "idx-end  ",
2021
      },
218✔
2022
      send_out = {
218✔
2023
        "last-idx-send    ",
158✔
2024
        "error            ",
158✔
2025
        "err-last-idx-send",
2026
      },
218✔
2027
      receive_in = {
218✔
2028
        "socket ",
158✔
2029
        "pattern",
158✔
2030
        "prefix ",
2031
      },
218✔
2032
      receive_out = {
218✔
2033
        "received    ",
158✔
2034
        "error       ",
158✔
2035
        "partial data",
2036
      },
218✔
2037
      dirty_in = {
218✔
2038
        "socket",
2039
        -- callback = function(...)
2040
        --   print(debug.traceback("called from:", 4))
2041
        -- end,
2042
      },
218✔
2043
      dirty_out = {
218✔
2044
        "data in read-buffer",
2045
      },
218✔
2046
      close_in = {
218✔
2047
        "socket",
2048
        -- callback = function(...)
2049
        --   print(debug.traceback("called from:", 4))
2050
        -- end,
2051
      },
218✔
2052
      close_out = {
218✔
2053
        "success",
158✔
2054
        "error",
2055
      },
218✔
2056
    }
2057
    local function print_call(func, msg, ...)
2058
      print(msg)
698✔
2059
      local arg = pack(...)
698✔
2060
      local desc = args[func] or {}
698✔
2061
      for i = 1, math.max(arg.n, #desc) do
1,564✔
2062
        local value = arg[i]
866✔
2063
        if type(value) == "string" then
866✔
2064
          local xvalue = value:sub(1,30)
42✔
2065
          if xvalue ~= value then
42✔
UNCOV
2066
            xvalue = xvalue .."(...truncated)"
×
2067
          end
2068
          print("\t"..(desc[i] or i)..": '"..tostring(xvalue).."' ("..type(value).." #"..#value..")")
42✔
2069
        else
2070
          print("\t"..(desc[i] or i)..": '"..tostring(value).."' ("..type(value)..")")
824✔
2071
        end
2072
      end
2073
      if desc.callback then
698✔
UNCOV
2074
        desc.callback(...)
×
2075
      end
2076
    end
2077

2078
    local debug_mt = {
218✔
2079
      __index = function(self, key)
2080
        local value = self.__original_socket[key]
349✔
2081
        if type(value) ~= "function" then
349✔
UNCOV
2082
          return value
×
2083
        end
2084
        return function(self2, ...)
2085
            local my_id = call_id + 1
349✔
2086
            call_id = my_id
349✔
2087
            local results
2088

2089
            if self2 ~= self then
349✔
2090
              -- there is no self
UNCOV
2091
              print_call(tostring(key).."_in", my_id .. "-calling '"..tostring(key) .. "' with; ", self, ...)
×
UNCOV
2092
              results = pack(value(self, ...))
×
2093
            else
2094
              print_call(tostring(key).."_in", my_id .. "-calling '" .. tostring(key) .. "' with; ", self.__original_socket, ...)
349✔
2095
              results = pack(value(self.__original_socket, ...))
475✔
2096
            end
2097
            print_call(tostring(key).."_out", my_id .. "-results '"..tostring(key) .. "' returned; ", unpack(results))
475✔
2098
            return unpack(results)
349✔
2099
          end
2100
      end,
2101
      __tostring = function(self)
2102
        return tostring(self.__original_socket)
56✔
2103
      end
2104
    }
2105

2106

2107
    -- wraps a socket (copas or luasocket) in a debug version printing all calls
2108
    -- and their parameters/return values. Extremely noisy!
2109
    -- returns the wrapped socket.
2110
    -- NOTE: only for plain sockets, will not support TLS
2111
    function copas.debug.socket(original_skt)
436✔
2112
      if (getmetatable(original_skt) == _skt_mt_tcp) or (getmetatable(original_skt) == _skt_mt_udp) then
14✔
2113
        -- already wrapped as Copas socket, so recurse with the original luasocket one
UNCOV
2114
        original_skt.socket = copas.debug.socket(original_skt.socket)
×
UNCOV
2115
        return original_skt
×
2116
      end
2117

2118
      local proxy = setmetatable({
28✔
2119
        __original_socket = original_skt
14✔
2120
      }, debug_mt)
14✔
2121

2122
      return proxy
14✔
2123
    end
2124
  end
2125
end
2126

2127

2128
return copas
218✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc