• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarmodules / copas / 27377415562

11 Jun 2026 09:05PM UTC coverage: 85.027% (-0.09%) from 85.118%
27377415562

push

github

web-flow
fix(timeout): clear stale socket timeout state after timeout (#186)

7 of 10 new or added lines in 1 file covered. (70.0%)

10 existing lines in 5 files now uncovered.

1414 of 1663 relevant lines covered (85.03%)

80272.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.06
/src/copas.lua
1
-------------------------------------------------------------------------------
2
-- Copas - Coroutine Oriented Portable Asynchronous Services
3
--
4
-- A dispatcher based on coroutines that can be used by TCP/IP servers.
5
-- Uses LuaSocket as the interface with the TCP/IP stack.
6
--
7
-- Authors: Andre Carregal, Javier Guerra, and Fabio Mascarenhas
8
-- Contributors: Diego Nehab, Mike Pall, David Burgess, Leonardo Godinho,
9
--               Thomas Harning Jr., and Gary NG
10
--
11
-- Copyright 2005-2013 - Kepler Project (www.keplerproject.org), 2015-2025 Thijs Schreijer
12
--
13
-- $Id: copas.lua,v 1.37 2009/04/07 22:09:52 carregal Exp $
14
-------------------------------------------------------------------------------
15

16
if package.loaded["socket.http"] and (_VERSION=="Lua 5.1") then     -- obsolete: only for Lua 5.1 compatibility
218✔
17
  error("you must require copas before require'ing socket.http")
×
18
end
19
if package.loaded["copas.http"] and (_VERSION=="Lua 5.1") then     -- obsolete: only for Lua 5.1 compatibility
218✔
20
  error("you must require copas before require'ing copas.http")
×
21
end
22

23
-- load either LuaSocket, or LuaSystem
24
-- note: with luasocket we don't use 'sleep' but 'select' with no sockets
25
local socket, system do
218✔
26
  if pcall(require, "socket") then
220✔
27
    -- found LuaSocket
28
    socket = require "socket"
211✔
29
  end
30

31
  -- try LuaSystem as fallback
32
  if pcall(require, "system") then
220✔
33
    system = require "system"
220✔
34
  end
35

36
  if not (socket or system) then
218✔
37
    error("Neither LuaSocket nor LuaSystem found, Copas requires at least one of them")
×
38
  end
39
end
40

41
local binaryheap = require "binaryheap"
218✔
42
local gettime = (socket or system).gettime
218✔
43
local block_sleep = (socket or system).sleep
218✔
44
local ssl -- only loaded upon demand
45

46
local core_timer_thread
47
local WATCH_DOG_TIMEOUT = 120
218✔
48
local UDP_DATAGRAM_MAX = (socket or {})._DATAGRAMSIZE or 8192
218✔
49
local TIMEOUT_PRECISION = 0.1  -- 100ms
218✔
50
local fnil = function() end
305,130✔
51

52

53
local coroutine_create = coroutine.create
218✔
54
local coroutine_running = coroutine.running
218✔
55
local coroutine_yield = coroutine.yield
218✔
56
local coroutine_resume = coroutine.resume
218✔
57
local coroutine_status = coroutine.status
218✔
58

59

60
-- nil-safe versions for pack/unpack
61
local _unpack = unpack or table.unpack
218✔
62
local unpack = function(t, i, j) return _unpack(t, i or 1, j or t.n or #t) end
1,130✔
63
local pack = function(...) return { n = select("#", ...), ...} end
1,579✔
64

65

66
local pcall = pcall
218✔
67
if _VERSION=="Lua 5.1" and not jit then     -- obsolete: only for Lua 5.1 compatibility
218✔
UNCOV
68
  pcall = require("coxpcall").pcall
30✔
UNCOV
69
  coroutine_running = require("coxpcall").running
30✔
70
end
71

72

73
if socket then
218✔
74
  -- Redefines LuaSocket functions with coroutine safe versions (pure Lua)
75
  -- (this allows the use of socket.http from within copas)
76
  local err_mt = {
211✔
77
    __tostring = function (self)
78
      return "Copas 'try' error intermediate table: '"..tostring(self[1].."'")
×
79
    end,
80
  }
81

82
  local function statusHandler(status, ...)
83
    if status then return ... end
98✔
84
    local err = (...)
35✔
85
    if type(err) == "table" and getmetatable(err) == err_mt then
35✔
86
      return nil, err[1]
35✔
87
    else
88
      error(err)
×
89
    end
90
  end
91

92
  function socket.protect(func)
211✔
93
    return function (...)
94
            return statusHandler(pcall(func, ...))
126✔
95
          end
96
  end
97

98
  function socket.newtry(finalizer)
211✔
99
    return function (...)
100
            local status = (...)
1,610✔
101
            if not status then
1,610✔
102
              pcall(finalizer or fnil, select(2, ...))
35✔
103
              error(setmetatable({ (select(2, ...)) }, err_mt), 0)
35✔
104
            end
105
            return ...
1,575✔
106
          end
107
  end
108

109
  socket.try = socket.newtry()
269✔
110
end
111

112

113
-- Setup the Copas meta table to auto-load submodules and define a default method
114
local copas do
218✔
115
  local submodules = { "ftp", "future", "http", "lock", "queue", "semaphore", "smtp", "timer" }
218✔
116
  for i, key in ipairs(submodules) do
1,962✔
117
    submodules[key] = true
1,744✔
118
    submodules[i] = nil
1,744✔
119
  end
120

121
  copas = setmetatable({},{
436✔
122
    __index = function(self, key)
123
      if submodules[key] then
288✔
124
        self[key] = require("copas."..key)
292✔
125
        submodules[key] = nil
288✔
126
        return rawget(self, key)
288✔
127
      end
128
    end,
129
    __call = function(self, ...)
130
      return self.loop(...)
7✔
131
    end,
132
  })
218✔
133
end
134

135

136
-- Meta information is public even if beginning with an "_"
137
copas._COPYRIGHT   = "Copyright (C) 2005-2013 Kepler Project, 2015-2026 Thijs Schreijer"
218✔
138
copas._DESCRIPTION = "Coroutine Oriented Portable Asynchronous Services"
218✔
139
copas._VERSION     = "Copas 4.10.0"
218✔
140

141
-- Close the socket associated with the current connection after the handler finishes
142
copas.autoclose = true
218✔
143

144
-- indicator for the loop running
145
copas.running = false
218✔
146

147
-- gettime method from either LuaSocket or LuaSystem: time in (fractional) seconds, since epoch.
148
copas.gettime = gettime
218✔
149

150
-------------------------------------------------------------------------------
151
-- Object names, to track names of thread/coroutines and sockets
152
-------------------------------------------------------------------------------
153
local object_names = setmetatable({}, {
436✔
154
  __mode = "k",
158✔
155
  __index = function(self, key)
156
    local name = tostring(key)
224✔
157
    if key ~= nil then
224✔
158
      rawset(self, key, name)
224✔
159
    end
160
    return name
224✔
161
  end
162
})
163

164
-------------------------------------------------------------------------------
165
-- Simple set implementation
166
-- adds a FIFO queue for each socket in the set
167
-------------------------------------------------------------------------------
168

169
local function newsocketset()
170
  local set = {}
654✔
171

172
  do  -- set implementation
173
    local reverse = {}
654✔
174

175
    -- Adds a socket to the set, does nothing if it exists
176
    -- @return skt if added, or nil if it existed
177
    function set:insert(skt)
654✔
178
      if not reverse[skt] then
1,580✔
179
        self[#self + 1] = skt
1,580✔
180
        reverse[skt] = #self
1,580✔
181
        return skt
1,580✔
182
      end
183
    end
184

185
    -- Removes socket from the set, does nothing if not found
186
    -- @return skt if removed, or nil if it wasn't in the set
187
    function set:remove(skt)
654✔
188
      local index = reverse[skt]
2,196✔
189
      if index then
2,196✔
190
        reverse[skt] = nil
1,573✔
191
        local top = self[#self]
1,573✔
192
        self[#self] = nil
1,573✔
193
        if top ~= skt then
1,573✔
194
          reverse[top] = index
198✔
195
          self[index] = top
198✔
196
        end
197
        return skt
1,573✔
198
      end
199
    end
200

201
  end
202

203
  do  -- queues implementation
204
    local fifo_queues = setmetatable({},{
1,308✔
205
      __mode = "k",                 -- auto collect queue if socket is gone
474✔
206
      __index = function(self, skt) -- auto create fifo queue if not found
207
        local newfifo = {}
595✔
208
        self[skt] = newfifo
595✔
209
        return newfifo
595✔
210
      end,
211
    })
212

213
    -- pushes an item in the fifo queue for the socket.
214
    function set:push(skt, itm)
654✔
215
      local queue = fifo_queues[skt]
1,468✔
216
      queue[#queue + 1] = itm
1,468✔
217
    end
218

219
    -- pops an item from the fifo queue for the socket
220
    function set:pop(skt)
654✔
221
      local queue = fifo_queues[skt]
1,377✔
222
      return table.remove(queue, 1)
1,377✔
223
    end
224

225
  end
226

227
  return set
654✔
228
end
229

230

231

232
-- Threads immediately resumable
233
local _resumable = {} do
218✔
234
  local resumelist = {}
218✔
235

236
  function _resumable:push(co)
218✔
237
    resumelist[#resumelist + 1] = co
304,747✔
238
  end
239

240
  function _resumable:clear_resumelist()
218✔
241
    local lst = resumelist
293,882✔
242
    resumelist = {}
293,882✔
243
    return lst
293,882✔
244
  end
245

246
  function _resumable:done()
218✔
247
    return resumelist[1] == nil
297,831✔
248
  end
249

250
  function _resumable:count()
218✔
251
    return #resumelist + #_resumable
×
252
  end
253

254
end
255

256

257

258
-- Similar to the socket set above, but tailored for the use of
259
-- sleeping threads
260
local _sleeping = {} do
218✔
261

262
  local heap = binaryheap.minUnique()
218✔
263
  local lethargy = setmetatable({}, { __mode = "k" }) -- list of coroutines sleeping without a wakeup time
218✔
264

265

266
  -- Required base implementation
267
  -----------------------------------------
268
  _sleeping.insert = fnil
218✔
269
  _sleeping.remove = fnil
218✔
270

271
  -- push a new timer on the heap
272
  function _sleeping:push(sleeptime, co)
218✔
273
    if sleeptime < 0 then
304,905✔
274
      lethargy[co] = true
3,759✔
275
    elseif sleeptime == 0 then
301,146✔
276
      _resumable:push(co)
418,743✔
277
    else
278
      heap:insert(gettime() + sleeptime, co)
8,501✔
279
    end
280
  end
281

282
  -- find the thread that should wake up to the time, if any
283
  function _sleeping:pop(time)
218✔
284
    if time < (heap:peekValue() or math.huge) then
430,992✔
285
      return
293,882✔
286
    end
287
    return heap:pop()
8,259✔
288
  end
289

290
  -- additional methods for time management
291
  -----------------------------------------
292
  function _sleeping:getnext()  -- returns delay until next sleep expires, or nil if there is none
218✔
293
    local t = heap:peekValue()
6,946✔
294
    if t then
6,946✔
295
      -- never report less than 0, because select() might block
296
      return math.max(t - gettime(), 0)
6,946✔
297
    end
298
  end
299

300
  function _sleeping:wakeup(co)
218✔
301
    if lethargy[co] then
3,773✔
302
      lethargy[co] = nil
3,738✔
303
      _resumable:push(co)
3,738✔
304
      return
3,738✔
305
    end
306
    if heap:remove(co) then
45✔
307
      _resumable:push(co)
14✔
308
    end
309
  end
310

311
  function _sleeping:cancel(co)
218✔
312
    lethargy[co] = nil
63✔
313
    heap:remove(co)
63✔
314
  end
315

316
  function _sleeping:cancelall()
218✔
317
    while heap:size() > 0 do heap:pop() end
×
318
    heap:insert(gettime() + TIMEOUT_PRECISION, core_timer_thread)
×
319
    -- lethargy is weak; copas's idle GC sweeps will clean it within a few steps
320
  end
321

322
  -- @param tos number of timeouts running
323
  function _sleeping:done(tos)
218✔
324
    -- return true if we have nothing more to do
325
    -- the timeout task doesn't qualify as work (fallbacks only),
326
    -- the lethargy also doesn't qualify as work ('dead' tasks),
327
    -- but the combination of a timeout + a lethargy can be work
328
    return heap:size() == 1       -- 1 means only the timeout-timer task is running
4,018✔
329
           and not (tos > 0 and next(lethargy))
3,131✔
330
  end
331

332
  -- gets number of threads in binaryheap and lethargy
333
  function _sleeping:status()
218✔
334
    local c = 0
×
335
    for _ in pairs(lethargy) do c = c + 1 end
×
336

337
    return heap:size(), c
×
338
  end
339

340
end   -- _sleeping
341

342

343

344
-------------------------------------------------------------------------------
345
-- Tracking coroutines and sockets
346
-------------------------------------------------------------------------------
347

348
local _servers = newsocketset() -- servers being handled
218✔
349
local _threads = setmetatable({}, {__mode = "k"})  -- registered threads added with addthread()
218✔
350
local _canceled = setmetatable({}, {__mode = "k"}) -- threads that are canceled and pending removal
218✔
351
local _autoclose = setmetatable({}, {__mode = "kv"}) -- sockets (value) to close when a thread (key) exits
218✔
352
local _autoclose_r = setmetatable({}, {__mode = "kv"}) -- reverse: sockets (key) to close when a thread (value) exits
218✔
353

354

355
-- for each socket we log the last read and last write times to enable the
356
-- watchdog to follow up if it takes too long.
357
-- tables contain the time, indexed by the socket
358
local _reading_log = {}
218✔
359
local _writing_log = {}
218✔
360

361
local _closed = {} -- track sockets that have been closed (list/array)
218✔
362

363
local _reading = newsocketset() -- sockets currently being read
218✔
364
local _writing = newsocketset() -- sockets currently being written
218✔
365
local _isSocketTimeout = { -- set of errors indicating a socket-timeout
218✔
366
  ["timeout"] = true,      -- default LuaSocket timeout
158✔
367
  ["wantread"] = true,     -- LuaSec specific timeout
158✔
368
  ["wantwrite"] = true,    -- LuaSec specific timeout
158✔
369
}
370

371
-------------------------------------------------------------------------------
372
-- Coroutine based socket timeouts.
373
-------------------------------------------------------------------------------
374
local user_timeouts_connect
375
local user_timeouts_send
376
local user_timeouts_receive
377
do
378
  local timeout_mt = {
218✔
379
    __mode = "k",
158✔
380
    __index = function(self, skt)
381
      -- if there is no timeout found, we insert one automatically, to block forever
382
      self[skt] = math.huge
504✔
383
      return self[skt]
504✔
384
    end,
385
  }
386

387
  user_timeouts_connect = setmetatable({}, timeout_mt)
218✔
388
  user_timeouts_send = setmetatable({}, timeout_mt)
218✔
389
  user_timeouts_receive = setmetatable({}, timeout_mt)
218✔
390
end
391

392
local useSocketTimeoutErrors = setmetatable({},{ __mode = "k" })
218✔
393

394

395
-- sto = socket-time-out
396
local sto_timeout, sto_timed_out, sto_change_queue, sto_error do
218✔
397

398
  local socket_register = setmetatable({}, { __mode = "k" })    -- socket by coroutine
218✔
399
  local operation_register = setmetatable({}, { __mode = "k" }) -- operation "read"/"write" by coroutine
218✔
400
  local timeout_flags = setmetatable({}, { __mode = "k" })      -- true if timedout, by coroutine
218✔
401

402

403
  local function socket_callback(co)
404
    local skt = socket_register[co]
91✔
405
    local queue = operation_register[co]
91✔
406

407
    -- flag the timeout and resume the coroutine
408
    timeout_flags[co] = true
91✔
409
    _resumable:push(co)
91✔
410

411
    -- clear the socket from the current queue
412
    if queue == "read" then
91✔
413
      _reading:remove(skt)
99✔
414
    elseif queue == "write" then
14✔
415
      _writing:remove(skt)
18✔
416
    else
417
      error("bad queue name; expected 'read'/'write', got: "..tostring(queue))
×
418
    end
419
  end
420

421

422
  -- Sets a socket timeout.
423
  -- Calling it as `sto_timeout()` will cancel the timeout.
424
  -- @param queue (string) the queue the socket is currently in, must be either "read" or "write"
425
  -- @param skt (socket) the socket on which to operate
426
  -- @param use_connect_to (bool) timeout to use is determined based on queue (read/write) or if this
427
  -- is truthy, it is the connect timeout.
428
  -- @return true
429
  function sto_timeout(skt, queue, use_connect_to)
188✔
430
    local co = coroutine_running()
5,496,006✔
431
    socket_register[co] = skt
5,496,006✔
432
    operation_register[co] = queue
5,496,006✔
433
    timeout_flags[co] = nil
5,496,006✔
434
    if skt then
5,496,006✔
435
      local to = (use_connect_to and user_timeouts_connect[skt]) or
2,748,025✔
436
                 (queue == "read" and user_timeouts_receive[skt]) or
2,747,636✔
437
                 user_timeouts_send[skt]
17,759✔
438
      copas.timeout(to, socket_callback)
3,944,648✔
439
    else
440
      copas.timeout(0)
2,748,003✔
441
    end
442
    return true
5,496,006✔
443
  end
444

445

446
  -- Changes the timeout to a different queue (read/write).
447
  -- Only usefull with ssl-handshakes and "wantread", "wantwrite" errors, when
448
  -- the queue has to be changed, so the timeout handler knows where to find the socket.
449
  -- @param queue (string) the new queue the socket is in, must be either "read" or "write"
450
  -- @return true
451
  function sto_change_queue(queue)
188✔
452
    operation_register[coroutine_running()] = queue
1,202✔
453
    return true
1,202✔
454
  end
455

456

457
  -- Responds with `true` if the operation timed-out.
458
  function sto_timed_out()
188✔
459
    return timeout_flags[coroutine_running()]
1,559✔
460
  end
461

462

463
  -- Returns the proper timeout error
464
  function sto_error(err)
188✔
465
    return useSocketTimeoutErrors[coroutine_running()] and err or "timeout"
91✔
466
  end
467

468
  -- only in case of testing export some internals
469
  if _G._TEST then
218✔
470
    copas._socket_register = socket_register
7✔
471
    copas._operation_register = operation_register
7✔
472
    copas._timeout_flags = timeout_flags
7✔
473
  end
474
end
475

476

477

478
-------------------------------------------------------------------------------
479
-- Coroutine based socket I/O functions.
480
-------------------------------------------------------------------------------
481

482
-- Returns "tcp"" for plain TCP and "ssl" for ssl-wrapped sockets, so truthy
483
-- for tcp based, and falsy for udp based.
484
local isTCP do
218✔
485
  local lookup = {
218✔
486
    tcp = "tcp",
158✔
487
    SSL = "ssl",
158✔
488
  }
489

490
  function isTCP(socket)
188✔
491
    return lookup[tostring(socket):sub(1,3)]
972✔
492
  end
493
end
494

495
function copas.close(skt, ...)
218✔
496
  _closed[#_closed+1] = skt
259✔
497
  return skt:close(...)
259✔
498
end
499

500

501

502
-- nil or negative is indefinitly
503
function copas.settimeout(skt, timeout)
218✔
504
  timeout = timeout or -1
252✔
505
  if type(timeout) ~= "number" then
252✔
506
    return nil, "timeout must be 'nil' or a number"
21✔
507
  end
508

509
  return copas.settimeouts(skt, timeout, timeout, timeout)
231✔
510
end
511

512
-- negative is indefinitly, nil means do not change
513
function copas.settimeouts(skt, connect, send, read)
218✔
514

515
  if connect ~= nil and type(connect) ~= "number" then
504✔
516
    return nil, "connect timeout must be 'nil' or a number"
×
517
  end
518
  if connect then
504✔
519
    if connect < 0 then
504✔
520
      connect = nil
×
521
    end
522
    user_timeouts_connect[skt] = connect
504✔
523
  end
524

525

526
  if send ~= nil and type(send) ~= "number" then
504✔
527
    return nil, "send timeout must be 'nil' or a number"
×
528
  end
529
  if send then
504✔
530
    if send < 0 then
504✔
531
      send = nil
×
532
    end
533
    user_timeouts_send[skt] = send
504✔
534
  end
535

536

537
  if read ~= nil and type(read) ~= "number" then
504✔
538
    return nil, "read timeout must be 'nil' or a number"
×
539
  end
540
  if read then
504✔
541
    if read < 0 then
504✔
542
      read = nil
×
543
    end
544
    user_timeouts_receive[skt] = read
504✔
545
  end
546

547

548
  return true
504✔
549
end
550

551
-- reads a pattern from a client and yields to the reading set on timeouts
552
-- UDP: a UDP socket expects a second argument to be a number, so it MUST
553
-- be provided as the 'pattern' below defaults to a string. Will throw a
554
-- 'bad argument' error if omitted.
555
function copas.receive(client, pattern, part)
218✔
556
  local s, err
557
  pattern = pattern or "*l"
2,729,831✔
558
  local current_log = _reading_log
2,729,831✔
559
  sto_timeout(client, "read")
2,729,831✔
560

561
  repeat
562
    s, err, part = client:receive(pattern, part)
2,730,597✔
563

564
    -- guarantees that high throughput doesn't take other threads to starvation
565
    if (math.random(100) > 90) then
2,730,597✔
566
      copas.pause()
273,362✔
567
    end
568

569
    if s then
2,730,597✔
570
      current_log[client] = nil
2,729,705✔
571
      sto_timeout()
2,729,705✔
572
      return s, err, part
2,729,705✔
573

574
    elseif not _isSocketTimeout[err] then
892✔
575
      current_log[client] = nil
56✔
576
      sto_timeout()
56✔
577
      return s, err, part
56✔
578

579
    elseif sto_timed_out() then
1,104✔
580
      current_log[client] = nil
70✔
581
      sto_timeout()
70✔
582
      return nil, sto_error(err), part
90✔
583
    end
584

585
    if err == "wantwrite" then -- wantwrite may be returned during SSL renegotiations
766✔
586
      current_log = _writing_log
×
587
      current_log[client] = gettime()
×
588
      sto_change_queue("write")
×
589
      coroutine_yield(client, _writing)
×
590
    else
591
      current_log = _reading_log
766✔
592
      current_log[client] = gettime()
766✔
593
      sto_change_queue("read")
766✔
594
      coroutine_yield(client, _reading)
766✔
595
    end
596
  until false
766✔
597
end
598

599
-- receives data from a client over UDP. Not available for TCP.
600
-- (this is a copy of receive() method, adapted for receivefrom() use)
601
function copas.receivefrom(client, size)
218✔
602
  local s, err, port
603
  size = size or UDP_DATAGRAM_MAX
28✔
604
  sto_timeout(client, "read")
28✔
605

606
  repeat
607
    s, err, port = client:receivefrom(size) -- upon success err holds ip address
56✔
608

609
    -- garantees that high throughput doesn't take other threads to starvation
610
    if (math.random(100) > 90) then
56✔
611
      copas.pause()
5✔
612
    end
613

614
    if s then
56✔
615
      _reading_log[client] = nil
21✔
616
      sto_timeout()
21✔
617
      return s, err, port
21✔
618

619
    elseif err ~= "timeout" then
35✔
620
      _reading_log[client] = nil
×
621
      sto_timeout()
×
622
      return s, err, port
×
623

624
    elseif sto_timed_out() then
45✔
625
      _reading_log[client] = nil
7✔
626
      sto_timeout()
7✔
627
      return nil, sto_error(err), port
9✔
628
    end
629

630
    _reading_log[client] = gettime()
28✔
631
    coroutine_yield(client, _reading)
28✔
632
  until false
28✔
633
end
634

635
-- same as above but with special treatment when reading chunks,
636
-- unblocks on any data received.
637
function copas.receivepartial(client, pattern, part)
218✔
638
  local s, err
639
  pattern = pattern or "*l"
14✔
640
  local orig_size = #(part or "")
14✔
641
  local current_log = _reading_log
14✔
642
  sto_timeout(client, "read")
14✔
643

644
  repeat
645
    s, err, part = client:receive(pattern, part)
21✔
646

647
    -- guarantees that high throughput doesn't take other threads to starvation
648
    if (math.random(100) > 90) then
21✔
649
      copas.pause()
2✔
650
    end
651

652
    if s or (type(part) == "string" and #part > orig_size) then
21✔
653
      current_log[client] = nil
14✔
654
      sto_timeout()
14✔
655
      return s, err, part
14✔
656

657
    elseif not _isSocketTimeout[err] then
7✔
658
      current_log[client] = nil
×
659
      sto_timeout()
×
660
      return s, err, part
×
661

662
    elseif sto_timed_out() then
9✔
663
      current_log[client] = nil
×
NEW
664
      sto_timeout()
×
UNCOV
665
      return nil, sto_error(err), part
×
666
    end
667

668
    if err == "wantwrite" then
7✔
669
      current_log = _writing_log
×
670
      current_log[client] = gettime()
×
671
      sto_change_queue("write")
×
672
      coroutine_yield(client, _writing)
×
673
    else
674
      current_log = _reading_log
7✔
675
      current_log[client] = gettime()
7✔
676
      sto_change_queue("read")
7✔
677
      coroutine_yield(client, _reading)
7✔
678
    end
679
  until false
7✔
680
end
681
copas.receivePartial = copas.receivepartial  -- compat: receivePartial is deprecated
218✔
682

683
-- sends data to a client. The operation is buffered and
684
-- yields to the writing set on timeouts
685
-- Note: from and to parameters will be ignored by/for UDP sockets
686
function copas.send(client, data, from, to)
218✔
687
  local s, err
688
  from = from or 1
17,759✔
689
  local lastIndex = from - 1
17,759✔
690
  local current_log = _writing_log
17,759✔
691
  sto_timeout(client, "write")
17,759✔
692

693
  repeat
694
    s, err, lastIndex = client:send(data, lastIndex + 1, to)
17,971✔
695

696
    -- guarantees that high throughput doesn't take other threads to starvation
697
    if (math.random(100) > 90) then
17,971✔
698
      copas.pause()
1,765✔
699
    end
700

701
    if s then
17,971✔
702
      current_log[client] = nil
17,731✔
703
      sto_timeout()
17,731✔
704
      return s, err, lastIndex
17,731✔
705

706
    elseif not _isSocketTimeout[err] then
240✔
707
      current_log[client] = nil
28✔
708
      sto_timeout()
28✔
709
      return s, err, lastIndex
28✔
710

711
    elseif sto_timed_out() then
273✔
712
      current_log[client] = nil
×
NEW
713
      sto_timeout()
×
UNCOV
714
      return nil, sto_error(err), lastIndex
×
715
    end
716

717
    if err == "wantread" then
212✔
718
      current_log = _reading_log
×
719
      current_log[client] = gettime()
×
720
      sto_change_queue("read")
×
721
      coroutine_yield(client, _reading)
×
722
    else
723
      current_log = _writing_log
212✔
724
      current_log[client] = gettime()
212✔
725
      sto_change_queue("write")
212✔
726
      coroutine_yield(client, _writing)
212✔
727
    end
728
  until false
212✔
729
end
730

731
function copas.sendto(client, data, ip, port)
218✔
732
  -- deprecated; for backward compatibility only, since UDP doesn't block on sending
733
  return client:sendto(data, ip, port)
×
734
end
735

736
-- waits until connection is completed
737
function copas.connect(skt, host, port)
218✔
738
  skt:settimeout(0)
249✔
739
  local ret, err, tried_more_than_once
740
  sto_timeout(skt, "write", true)
245✔
741

742
  repeat
743
    ret, err = skt:connect(host, port)
499✔
744

745
    -- non-blocking connect on Windows results in error "Operation already
746
    -- in progress" to indicate that it is completing the request async. So essentially
747
    -- it is the same as "timeout"
748
    if ret or (err ~= "timeout" and err ~= "Operation already in progress") then
483✔
749
      _writing_log[skt] = nil
231✔
750
      sto_timeout()
231✔
751
      -- Once the async connect completes, Windows returns the error "already connected"
752
      -- to indicate it is done, so that error should be ignored. Except when it is the
753
      -- first call to connect, then it was already connected to something else and the
754
      -- error should be returned
755
      if (not ret) and (err == "already connected" and tried_more_than_once) then
231✔
756
        return 1
×
757
      end
758
      return ret, err
231✔
759

760
    elseif sto_timed_out() then
324✔
761
      _writing_log[skt] = nil
14✔
762
      sto_timeout()
14✔
763
      return nil, sto_error(err)
18✔
764
    end
765

766
    tried_more_than_once = tried_more_than_once or true
238✔
767
    _writing_log[skt] = gettime()
238✔
768
    coroutine_yield(skt, _writing)
238✔
769
  until false
238✔
770
end
771

772

773
-- Wraps a tcp socket in an ssl socket and configures it. If the socket was
774
-- already wrapped, it does nothing and returns the socket.
775
-- @param wrap_params the parameters for the ssl-context
776
-- @return wrapped socket, or throws an error
777
local function ssl_wrap(skt, wrap_params)
778
  if isTCP(skt) == "ssl" then return skt end -- was already wrapped
288✔
779
  if not wrap_params then
126✔
780
    error("cannot wrap socket into a secure socket (using 'ssl.wrap()') without parameters/context")
×
781
  end
782

783
  ssl = ssl or require("ssl")
126✔
784
  local nskt = assert(ssl.wrap(skt, wrap_params)) -- assert, because we do not want to silently ignore this one!!
162✔
785

786
  nskt:settimeout(0)  -- non-blocking on the ssl-socket
126✔
787
  copas.settimeouts(nskt, user_timeouts_connect[skt],
252✔
788
    user_timeouts_send[skt], user_timeouts_receive[skt]) -- copy copas user-timeout to newly wrapped one
134✔
789

790
  local co = _autoclose_r[skt]
126✔
791
  if co then
126✔
792
    -- socket registered for autoclose, move registration to wrapped one
793
    _autoclose[co] = nskt
28✔
794
    _autoclose_r[skt] = nil
28✔
795
    _autoclose_r[nskt] = co
28✔
796
  end
797

798
  local sock_name = object_names[skt]
126✔
799
  if sock_name ~= tostring(skt) then
126✔
800
    -- socket had a custom name, so copy it over
801
    object_names[nskt] = sock_name
42✔
802
  end
803
  return nskt
126✔
804
end
805

806

807
-- For each luasec method we have a subtable, allows for future extension.
808
-- Required structure:
809
-- {
810
--   wrap = ... -- parameter to 'wrap()'; the ssl parameter table, or the context object
811
--   sni = {                  -- parameters to 'sni()'
812
--     names = string | table -- 1st parameter
813
--     strict = bool          -- 2nd parameter
814
--   }
815
-- }
816
local function normalize_sslt(sslt)
817
  local t = type(sslt)
392✔
818
  local r = setmetatable({}, {
784✔
819
    __index = function(self, key)
820
      -- a bug if this happens, here as a sanity check, just being careful since
821
      -- this is security stuff
822
      error("accessing unknown 'ssl_params' table key: "..tostring(key))
×
823
    end,
824
  })
825
  if t == "nil" then
392✔
826
    r.wrap = false
266✔
827
    r.sni = false
266✔
828

829
  elseif t == "table" then
126✔
830
    if sslt.mode or sslt.protocol then
126✔
831
      -- has the mandatory fields for the ssl-params table for handshake
832
      -- backward compatibility
833
      r.wrap = sslt
28✔
834
      r.sni = false
28✔
835
    else
836
      -- has the target definition, copy our known keys
837
      r.wrap = sslt.wrap or false -- 'or false' because we do not want nils
98✔
838
      r.sni = sslt.sni or false -- 'or false' because we do not want nils
98✔
839
    end
840

841
  elseif t == "userdata" then
×
842
    -- it's an ssl-context object for the handshake
843
    -- backward compatibility
844
    r.wrap = sslt
×
845
    r.sni = false
×
846

847
  else
848
    error("ssl parameters; did not expect type "..tostring(sslt))
×
849
  end
850

851
  return r
392✔
852
end
853

854

855
---
856
-- Peforms an (async) ssl handshake on a connected TCP client socket.
857
-- NOTE: if not ssl-wrapped already, then replace all previous socket references, with the returned new ssl wrapped socket
858
-- Throws error and does not return nil+error, as that might silently fail
859
-- in code like this;
860
--   copas.addserver(s1, function(skt)
861
--       skt = copas.wrap(skt, sparams)
862
--       skt:dohandshake()   --> without explicit error checking, this fails silently and
863
--       skt:send(body)      --> continues unencrypted
864
-- @param skt Regular LuaSocket CLIENT socket object
865
-- @param wrap_params Table with ssl parameters
866
-- @return wrapped ssl socket, or throws an error
867
function copas.dohandshake(skt, wrap_params)
218✔
868
  ssl = ssl or require("ssl")
126✔
869

870
  local nskt = ssl_wrap(skt, wrap_params)
126✔
871

872
  sto_timeout(nskt, "write", true)
126✔
873
  local queue
874

875
  repeat
876
    local success, err = nskt:dohandshake()
343✔
877

878
    if success then
343✔
879
      sto_timeout()
112✔
880
      return nskt
112✔
881

882
    elseif not _isSocketTimeout[err] then
231✔
883
      sto_timeout()
14✔
884
      error("TLS/SSL handshake failed: " .. tostring(err))
14✔
885

886
    elseif sto_timed_out() then
279✔
NEW
887
      sto_timeout()
×
UNCOV
888
      return nil, sto_error(err)
×
889

890
    elseif err == "wantwrite" then
217✔
891
      sto_change_queue("write")
×
892
      queue = _writing
×
893

894
    elseif err == "wantread" then
217✔
895
      sto_change_queue("read")
217✔
896
      queue = _reading
217✔
897

898
    else
899
      error("TLS/SSL handshake failed: " .. tostring(err))
×
900
    end
901

902
    coroutine_yield(nskt, queue)
217✔
903
  until false
217✔
904
end
905

906
-- flushes a client write buffer (deprecated)
907
function copas.flush()
218✔
908
end
909

910
-- wraps a TCP socket to use Copas methods (send, receive, flush and settimeout)
911
local _skt_mt_tcp = {
218✔
912
      __tostring = function(self)
913
        return tostring(self.socket).." (copas wrapped)"
21✔
914
      end,
915

916
      __index = {
218✔
917
        send = function (self, data, from, to)
918
          return copas.send (self.socket, data, from, to)
17,752✔
919
        end,
920

921
        receive = function (self, pattern, prefix)
922
          if user_timeouts_receive[self.socket] == 0 then
2,729,826✔
923
            return copas.receivepartial(self.socket, pattern, prefix)
14✔
924
          end
925
          return copas.receive(self.socket, pattern, prefix)
2,729,810✔
926
        end,
927

928
        receivepartial = function (self, pattern, prefix)
929
          return copas.receivepartial(self.socket, pattern, prefix)
×
930
        end,
931

932
        flush = function (self)
933
          return copas.flush(self.socket)
×
934
        end,
935

936
        settimeout = function (self, time)
937
          return copas.settimeout(self.socket, time)
231✔
938
        end,
939

940
        settimeouts = function (self, connect, send, receive)
941
          return copas.settimeouts(self.socket, connect, send, receive)
×
942
        end,
943

944
        -- TODO: socket.connect is a shortcut, and must be provided with an alternative
945
        -- if ssl parameters are available, it will also include a handshake
946
        connect = function(self, ...)
947
          local res, err = copas.connect(self.socket, ...)
245✔
948
          if res then
245✔
949
            if self.ssl_params.sni then self:sni() end
224✔
950
            if self.ssl_params.wrap then res, err = self:dohandshake() end
250✔
951
          end
952
          return res, err
238✔
953
        end,
954

955
        close = function(self, ...)
956
          return copas.close(self.socket, ...)
259✔
957
        end,
958

959
        -- TODO: socket.bind is a shortcut, and must be provided with an alternative
960
        bind = function(self, ...) return self.socket:bind(...) end,
218✔
961

962
        -- TODO: is this DNS related? hence blocking?
963
        getsockname = function(self, ...)
964
          local ok, ip, port, family = pcall(self.socket.getsockname, self.socket, ...)
×
965
          if ok then
×
966
            return ip, port, family
×
967
          else
968
            return nil, "not implemented by LuaSec"
×
969
          end
970
        end,
971

972
        getstats = function(self, ...) return self.socket:getstats(...) end,
218✔
973

974
        setstats = function(self, ...) return self.socket:setstats(...) end,
218✔
975

976
        listen = function(self, ...) return self.socket:listen(...) end,
218✔
977

978
        accept = function(self, ...) return self.socket:accept(...) end,
218✔
979

980
        setoption = function(self, ...)
981
          local ok, res, err = pcall(self.socket.setoption, self.socket, ...)
×
982
          if ok then
×
983
            return res, err
×
984
          else
985
            return nil, "not implemented by LuaSec"
×
986
          end
987
        end,
988

989
        getoption = function(self, ...)
990
          local ok, val, err = pcall(self.socket.getoption, self.socket, ...)
×
991
          if ok then
×
992
            return val, err
×
993
          else
994
            return nil, "not implemented by LuaSec"
×
995
          end
996
        end,
997

998
        -- TODO: is this DNS related? hence blocking?
999
        getpeername = function(self, ...)
1000
          local ok, ip, port, family = pcall(self.socket.getpeername, self.socket, ...)
×
1001
          if ok then
×
1002
            return ip, port, family
×
1003
          else
1004
            return nil, "not implemented by LuaSec"
×
1005
          end
1006
        end,
1007

1008
        shutdown = function(self, ...) return self.socket:shutdown(...) end,
218✔
1009

1010
        sni = function(self, names, strict)
1011
          local sslp = self.ssl_params
98✔
1012
          self.socket = ssl_wrap(self.socket, sslp.wrap)
126✔
1013
          if names == nil then
98✔
1014
            names = sslp.sni.names
84✔
1015
            strict = sslp.sni.strict
84✔
1016
          end
1017
          return self.socket:sni(names, strict)
98✔
1018
        end,
1019

1020
        dohandshake = function(self, wrap_params)
1021
          local nskt, err = copas.dohandshake(self.socket, wrap_params or self.ssl_params.wrap)
126✔
1022
          if not nskt then return nskt, err end
112✔
1023
          self.socket = nskt  -- replace internal socket with the newly wrapped ssl one
112✔
1024
          return self
112✔
1025
        end,
1026

1027
        getalpn = function(self, ...)
1028
          local ok, proto, err = pcall(self.socket.getalpn, self.socket, ...)
×
1029
          if ok then
×
1030
            return proto, err
×
1031
          else
1032
            return nil, "not a tls socket"
×
1033
          end
1034
        end,
1035

1036
        getsniname = function(self, ...)
1037
          local ok, name, err = pcall(self.socket.getsniname, self.socket, ...)
×
1038
          if ok then
×
1039
            return name, err
×
1040
          else
1041
            return nil, "not a tls socket"
×
1042
          end
1043
        end,
1044
      }
218✔
1045
}
1046

1047
-- wraps a UDP socket, copy of TCP one adapted for UDP.
1048
local _skt_mt_udp = {__index = { }}
218✔
1049
for k,v in pairs(_skt_mt_tcp) do _skt_mt_udp[k] = _skt_mt_udp[k] or v end
654✔
1050
for k,v in pairs(_skt_mt_tcp.__index) do _skt_mt_udp.__index[k] = v end
5,014✔
1051

1052
_skt_mt_udp.__index.send        = function(self, ...) return self.socket:send(...) end
225✔
1053

1054
_skt_mt_udp.__index.sendto      = function(self, ...) return self.socket:sendto(...) end
239✔
1055

1056

1057
_skt_mt_udp.__index.receive =     function (self, size)
218✔
1058
                                    return copas.receive (self.socket, (size or UDP_DATAGRAM_MAX))
14✔
1059
                                  end
1060

1061
_skt_mt_udp.__index.receivefrom = function (self, size)
218✔
1062
                                    return copas.receivefrom (self.socket, (size or UDP_DATAGRAM_MAX))
28✔
1063
                                  end
1064

1065
                                  -- TODO: is this DNS related? hence blocking?
1066
_skt_mt_udp.__index.setpeername = function(self, ...) return self.socket:setpeername(...) end
225✔
1067

1068
_skt_mt_udp.__index.setsockname = function(self, ...) return self.socket:setsockname(...) end
218✔
1069

1070
                                    -- do not close client, as it is also the server for udp.
1071
_skt_mt_udp.__index.close       = function(self, ...) return true end
232✔
1072

1073
_skt_mt_udp.__index.settimeouts = function (self, connect, send, receive)
218✔
1074
                                    return copas.settimeouts(self.socket, connect, send, receive)
×
1075
                                  end
1076

1077

1078

1079
---
1080
-- Wraps a LuaSocket socket object in an async Copas based socket object.
1081
-- @param skt The socket to wrap
1082
-- @sslt (optional) Table with ssl parameters, use an empty table to use ssl with defaults
1083
-- @return wrapped socket object
1084
function copas.wrap (skt, sslt)
218✔
1085
  if (getmetatable(skt) == _skt_mt_tcp) or (getmetatable(skt) == _skt_mt_udp) then
420✔
1086
    return skt -- already wrapped
×
1087
  end
1088

1089
  skt:settimeout(0)
424✔
1090

1091
  if isTCP(skt) then
540✔
1092
    return setmetatable ({socket = skt, ssl_params = normalize_sslt(sslt)}, _skt_mt_tcp)
504✔
1093
  else
1094
    return setmetatable ({socket = skt}, _skt_mt_udp)
28✔
1095
  end
1096
end
1097

1098
--- Wraps a handler in a function that deals with wrapping the socket and doing the
1099
-- optional ssl handshake.
1100
function copas.handler(handler, sslparams)
218✔
1101
  -- TODO: pass a timeout value to set, and use during handshake
1102
  return function (skt, ...)
1103
    skt = copas.wrap(skt, sslparams) -- this call will normalize the sslparams table
144✔
1104
    local sslp = skt.ssl_params
112✔
1105
    if sslp.sni then skt:sni(sslp.sni.names, sslp.sni.strict) end
112✔
1106
    if sslp.wrap then skt:dohandshake(sslp.wrap) end
112✔
1107
    return handler(skt, ...)
105✔
1108
  end
1109
end
1110

1111

1112
--------------------------------------------------
1113
-- Error handling
1114
--------------------------------------------------
1115

1116
local _errhandlers = setmetatable({}, { __mode = "k" })   -- error handler per coroutine
218✔
1117

1118

1119
function copas.gettraceback(msg, co, skt)
218✔
1120
  local co_str = co == nil and "nil" or copas.getthreadname(co)
43✔
1121
  local skt_str = skt == nil and "nil" or copas.getsocketname(skt)
43✔
1122
  local msg_str = msg == nil and "" or tostring(msg)
43✔
1123
  if msg_str == "" then
43✔
1124
    msg_str = ("(coroutine: %s, socket: %s)"):format(msg_str, co_str, skt_str)
×
1125
  else
1126
    msg_str = ("%s (coroutine: %s, socket: %s)"):format(msg_str, co_str, skt_str)
43✔
1127
  end
1128

1129
  if type(co) == "thread" then
43✔
1130
    -- regular Copas coroutine
1131
    return debug.traceback(co, msg_str)
43✔
1132
  end
1133
  -- not a coroutine, but the main thread, this happens if a timeout callback
1134
  -- (see `copas.timeout` causes an error (those callbacks run on the main thread).
1135
  return debug.traceback(msg_str, 2)
×
1136
end
1137

1138

1139
local function _deferror(msg, co, skt)
1140
  print(copas.gettraceback(msg, co, skt))
35✔
1141
end
1142

1143

1144
function copas.seterrorhandler(err, default)
218✔
1145
  assert(err == nil or type(err) == "function", "Expected the handler to be a function, or nil")
70✔
1146
  if default then
70✔
1147
    assert(err ~= nil, "Expected the handler to be a function when setting the default")
49✔
1148
    _deferror = err
49✔
1149
  else
1150
    _errhandlers[coroutine_running()] = err
21✔
1151
  end
1152
end
1153
copas.setErrorHandler = copas.seterrorhandler  -- deprecated; old casing
218✔
1154

1155

1156
function copas.geterrorhandler(co)
218✔
1157
  co = co or coroutine_running()
14✔
1158
  return _errhandlers[co] or _deferror
14✔
1159
end
1160

1161

1162
-- if `bool` is truthy, then the original socket errors will be returned in case of timeouts;
1163
-- `timeout, wantread, wantwrite, Operation already in progress`. If falsy, it will always
1164
-- return `timeout`.
1165
function copas.useSocketTimeoutErrors(bool)
218✔
1166
  useSocketTimeoutErrors[coroutine_running()] = not not bool -- force to a boolean
7✔
1167
end
1168

1169
-------------------------------------------------------------------------------
1170
-- Thread handling
1171
-------------------------------------------------------------------------------
1172

1173
local function _doTick (co, skt, ...)
1174
  if not co then return end
312,493✔
1175

1176
  -- if a coroutine was canceled/removed, don't resume it
1177
  if _canceled[co] then
312,493✔
1178
    _canceled[co] = nil -- also clean up the registry
29✔
1179
    _threads[co] = nil
29✔
1180
    return
29✔
1181
  end
1182

1183
  -- res: the socket (being read/write on) or the time to sleep
1184
  -- new_q: either _writing, _reading, or _sleeping
1185
  -- local time_before = gettime()
1186
  local ok, res, new_q = coroutine_resume(co, skt, ...)
312,464✔
1187
  -- local duration = gettime() - time_before
1188
  -- if duration > 1 then
1189
  --   duration = math.floor(duration * 1000)
1190
  --   pcall(_errhandlers[co] or _deferror, "task ran for "..tostring(duration).." milliseconds.", co, skt)
1191
  -- end
1192

1193
  if new_q == _reading or new_q == _writing or new_q == _sleeping then
312,457✔
1194
    -- we're yielding to a new queue
1195
    new_q:insert (res)
306,373✔
1196
    new_q:push (res, co)
306,373✔
1197
    return
306,373✔
1198
  end
1199

1200
  -- coroutine is terminating
1201

1202
  if ok and coroutine_status(co) ~= "dead" then
6,084✔
1203
    -- it called coroutine.yield from a non-Copas function which is unexpected
1204
    ok = false
7✔
1205
    res = "coroutine.yield was called without a resume first, user-code cannot yield to Copas"
7✔
1206
  end
1207

1208
  if not ok then
6,084✔
1209
    local k, e = pcall(_errhandlers[co] or _deferror, res, co, skt)
53✔
1210
    if not k then
53✔
1211
      print("Failed executing error handler: " .. tostring(e))
×
1212
    end
1213
  end
1214

1215
  local skt_to_close = _autoclose[co]
6,084✔
1216
  if skt_to_close then
6,084✔
1217
    skt_to_close:close()
140✔
1218
    _autoclose[co] = nil
140✔
1219
    _autoclose_r[skt_to_close] = nil
140✔
1220
  end
1221

1222
  _errhandlers[co] = nil
6,084✔
1223
end
1224

1225

1226
local _accept do
218✔
1227
  local client_counters = setmetatable({}, { __mode = "k" })
218✔
1228

1229
  -- accepts a connection on socket input
1230
  function _accept(server_skt, handler)
188✔
1231
    local client_skt = server_skt:accept()
147✔
1232
    if client_skt then
147✔
1233
      local count = (client_counters[server_skt] or 0) + 1
147✔
1234
      client_counters[server_skt] = count
147✔
1235
      object_names[client_skt] = object_names[server_skt] .. ":client_" .. count
175✔
1236

1237
      client_skt:settimeout(0)
147✔
1238
      copas.settimeouts(client_skt, user_timeouts_connect[server_skt],  -- copy server socket timeout settings
294✔
1239
        user_timeouts_send[server_skt], user_timeouts_receive[server_skt])
175✔
1240

1241
      local co = coroutine_create(handler)
147✔
1242
      object_names[co] = object_names[server_skt] .. ":handler_" .. count
147✔
1243

1244
      if copas.autoclose then
147✔
1245
        _autoclose[co] = client_skt
147✔
1246
        _autoclose_r[client_skt] = co
147✔
1247
      end
1248

1249
      _doTick(co, client_skt)
147✔
1250
    end
1251
  end
1252
end
1253

1254
-------------------------------------------------------------------------------
1255
-- Adds a server/handler pair to Copas dispatcher
1256
-------------------------------------------------------------------------------
1257

1258
do
1259
  local function addTCPserver(server, handler, timeout, name)
1260
    server:settimeout(0)
112✔
1261
    if name then
112✔
1262
      object_names[server] = name
×
1263
    end
1264
    _servers[server] = handler
112✔
1265
    _reading:insert(server)
112✔
1266
    if timeout then
112✔
1267
      copas.settimeout(server, timeout)
21✔
1268
    end
1269
  end
1270

1271
  local function addUDPserver(server, handler, timeout, name)
1272
    server:settimeout(0)
×
1273
    local co = coroutine_create(handler)
×
1274
    if name then
×
1275
      object_names[server] = name
×
1276
    end
1277
    object_names[co] = object_names[server]..":handler"
×
1278
    _reading:insert(server)
×
1279
    if timeout then
×
1280
      copas.settimeout(server, timeout)
×
1281
    end
1282
    _doTick(co, server)
×
1283
  end
1284

1285

1286
  function copas.addserver(server, handler, timeout, name)
218✔
1287
    if isTCP(server) then
144✔
1288
      addTCPserver(server, handler, timeout, name)
144✔
1289
    else
1290
      addUDPserver(server, handler, timeout, name)
×
1291
    end
1292
  end
1293
end
1294

1295

1296
function copas.removeserver(server, keep_open)
218✔
1297
  local skt = server
105✔
1298
  local mt = getmetatable(server)
105✔
1299
  if mt == _skt_mt_tcp or mt == _skt_mt_udp then
105✔
1300
    skt = server.socket
×
1301
  end
1302

1303
  _servers:remove(skt)
105✔
1304
  _reading:remove(skt)
105✔
1305

1306
  if keep_open then
105✔
1307
    return true
21✔
1308
  end
1309
  return server:close()
84✔
1310
end
1311

1312

1313

1314
-------------------------------------------------------------------------------
1315
-- Adds an new coroutine thread to Copas dispatcher
1316
-------------------------------------------------------------------------------
1317
function copas.addnamedthread(name, handler, ...)
218✔
1318
  if type(name) == "function" and type(handler) == "string" then
6,232✔
1319
    -- old call, flip args for compatibility
1320
    name, handler = handler, name
×
1321
  end
1322

1323
  -- create a coroutine that skips the first argument, which is always the socket
1324
  -- passed by the scheduler, but `nil` in case of a task/thread
1325
  local thread = coroutine_create(function(_, ...)
12,464✔
1326
    copas.pause()
6,232✔
1327
    return handler(...)
6,211✔
1328
  end)
1329
  if name then
6,232✔
1330
    object_names[thread] = name
534✔
1331
  end
1332

1333
  _threads[thread] = true -- register this thread so it can be removed
6,232✔
1334
  _doTick (thread, nil, ...)
6,232✔
1335
  return thread
6,232✔
1336
end
1337

1338

1339
function copas.addthread(handler, ...)
218✔
1340
  return copas.addnamedthread(nil, handler, ...)
5,621✔
1341
end
1342

1343

1344
function copas.removethread(thread)
218✔
1345
  -- if the specified coroutine is registered, add it to the canceled table so
1346
  -- that next time it tries to resume it exits.
1347
  _canceled[thread] = _threads[thread or 0]
63✔
1348
  _sleeping:cancel(thread)
63✔
1349
end
1350

1351

1352

1353
-------------------------------------------------------------------------------
1354
-- Sleep/pause management functions
1355
-------------------------------------------------------------------------------
1356

1357
-- yields the current coroutine and wakes it after 'sleeptime' seconds.
1358
-- If sleeptime < 0 then it sleeps until explicitly woken up using 'wakeup'
1359
-- TODO: deprecated, remove in next major
1360
function copas.sleep(sleeptime)
218✔
1361
  coroutine_yield((sleeptime or 0), _sleeping)
×
1362
end
1363

1364

1365
-- yields the current coroutine and wakes it after 'sleeptime' seconds.
1366
-- if sleeptime < 0 then it sleeps 0 seconds.
1367
function copas.pause(sleeptime)
218✔
1368
  local s = gettime()
301,146✔
1369
  if sleeptime and sleeptime > 0 then
301,146✔
1370
    coroutine_yield(sleeptime, _sleeping)
10,873✔
1371
  else
1372
    coroutine_yield(0, _sleeping)
292,645✔
1373
  end
1374
  return gettime() - s
300,879✔
1375
end
1376

1377

1378
-- yields the current coroutine until explicitly woken up using 'wakeup'
1379
function copas.pauseforever()
218✔
1380
  local s = gettime()
3,759✔
1381
  coroutine_yield(-1, _sleeping)
3,759✔
1382
  return gettime() - s
3,738✔
1383
end
1384

1385

1386
-- Wakes up a sleeping coroutine 'co'.
1387
function copas.wakeup(co)
218✔
1388
  _sleeping:wakeup(co)
3,773✔
1389
end
1390

1391

1392

1393
-------------------------------------------------------------------------------
1394
-- Timeout management
1395
-------------------------------------------------------------------------------
1396

1397
do
1398
  local timeout_register = setmetatable({}, { __mode = "k" })
218✔
1399
  local timerwheel = require("timerwheel").new({
438✔
1400
      now = gettime,
218✔
1401
      precision = TIMEOUT_PRECISION,
218✔
1402
      ringsize = math.floor(60*60*24/TIMEOUT_PRECISION),  -- ring size 1 day
218✔
1403
      err_handler = function(err)
1404
        return _deferror(err, core_timer_thread)
18✔
1405
      end,
1406
    })
1407

1408
  core_timer_thread = copas.addnamedthread("copas_core_timer", function()
436✔
1409
    while true do
1410
      copas.pause(TIMEOUT_PRECISION)
7,437✔
1411
      timerwheel:step()
9,302✔
1412
    end
1413
  end)
1414

1415
  -- get the number of timeouts running
1416
  function copas.gettimeouts()
218✔
1417
    return timerwheel:count()
3,131✔
1418
  end
1419

1420
  --- Sets the timeout for the current coroutine.
1421
  -- @param delay delay (seconds), use 0 (or math.huge) to cancel the timerout
1422
  -- @param callback function with signature: `function(coroutine)` where coroutine is the routine that timed-out
1423
  -- @return true
1424
  function copas.timeout(delay, callback)
218✔
1425
    local co = coroutine_running()
5,501,722✔
1426
    local existing_timer = timeout_register[co]
5,501,722✔
1427

1428
    if existing_timer then
5,501,722✔
1429
      timerwheel:cancel(existing_timer)
5,086✔
1430
    end
1431

1432
    if delay > 0 and delay ~= math.huge then
5,501,722✔
1433
      timeout_register[co] = timerwheel:set(delay, callback, co)
8,851✔
1434
    elseif delay == 0 or delay == math.huge then
5,494,840✔
1435
      timeout_register[co] = nil
5,494,840✔
1436
    else
1437
      error("timout value must be greater than or equal to 0, got: "..tostring(delay))
×
1438
    end
1439

1440
    return true
5,501,722✔
1441
  end
1442

1443
end
1444

1445

1446
-------------------------------------------------------------------------------
1447
-- main tasks: manage readable and writable socket sets
1448
-------------------------------------------------------------------------------
1449
-- a task is an object with a required method `step()` that deals with a
1450
-- single step for that task.
1451

1452
local _tasks = {} do
218✔
1453
  function _tasks:add(tsk)
218✔
1454
    _tasks[#_tasks + 1] = tsk
872✔
1455
  end
1456
end
1457

1458

1459
-- a task to check ready to read events
1460
local _readable_task = {} do
218✔
1461

1462
  _readable_task._events = {}
218✔
1463

1464
  local function tick(skt)
1465
    local handler = _servers[skt]
1,088✔
1466
    if handler then
1,088✔
1467
      _accept(skt, handler)
189✔
1468
    else
1469
      _reading:remove(skt)
941✔
1470
      _doTick(_reading:pop(skt), skt)
1,239✔
1471
    end
1472
  end
1473

1474
  function _readable_task:step()
218✔
1475
    for _, skt in ipairs(self._events) do
294,974✔
1476
      tick(skt)
1,088✔
1477
    end
1478
  end
1479

1480
  _tasks:add(_readable_task)
278✔
1481
end
1482

1483

1484
-- a task to check ready to write events
1485
local _writable_task = {} do
218✔
1486

1487
  _writable_task._events = {}
218✔
1488

1489
  local function tick(skt)
1490
    _writing:remove(skt)
436✔
1491
    _doTick(_writing:pop(skt), skt)
561✔
1492
  end
1493

1494
  function _writable_task:step()
218✔
1495
    for _, skt in ipairs(self._events) do
294,318✔
1496
      tick(skt)
436✔
1497
    end
1498
  end
1499

1500
  _tasks:add(_writable_task)
278✔
1501
end
1502

1503

1504

1505
-- sleeping threads task
1506
local _sleeping_task = {} do
218✔
1507

1508
  function _sleeping_task:step()
218✔
1509
    local now = gettime()
293,882✔
1510

1511
    local co = _sleeping:pop(now)
293,882✔
1512
    while co do
302,141✔
1513
      -- we're pushing them to _resumable, since that list will be replaced before
1514
      -- executing. This prevents tasks running twice in a row with pause(0) for example.
1515
      -- So here we won't execute, but at _resumable step which is next
1516
      _resumable:push(co)
8,259✔
1517
      co = _sleeping:pop(now)
10,634✔
1518
    end
1519
  end
1520

1521
  _tasks:add(_sleeping_task)
218✔
1522
end
1523

1524

1525

1526
-- resumable threads task
1527
local _resumable_task = {} do
218✔
1528

1529
  function _resumable_task:step()
218✔
1530
    -- replace the resume list before iterating, so items placed in there
1531
    -- will indeed end up in the next copas step, not in this one, and not
1532
    -- create a loop
1533
    local resumelist = _resumable:clear_resumelist()
293,882✔
1534

1535
    for _, co in ipairs(resumelist) do
598,616✔
1536
      _doTick(co)
304,737✔
1537
    end
1538
  end
1539

1540
  _tasks:add(_resumable_task)
218✔
1541
end
1542

1543

1544
-------------------------------------------------------------------------------
1545
-- Checks for reads and writes on sockets
1546
-------------------------------------------------------------------------------
1547
local _select_plain do
218✔
1548

1549
  local last_cleansing = 0
218✔
1550
  local duration = function(t2, t1) return t2-t1 end
293,999✔
1551

1552
  if not socket then
218✔
1553
    -- socket module unavailable, switch to luasystem sleep
1554
    _select_plain = block_sleep
7✔
1555
  else
1556
    -- use socket.select to handle socket-io
1557
    _select_plain = function(timeout)
1558
      local err
1559
      local now = gettime()
293,781✔
1560

1561
      -- remove any closed sockets to prevent select from hanging on them
1562
      if _closed[1] then
293,781✔
1563
        for i, skt in ipairs(_closed) do
515✔
1564
          _closed[i] = { _reading:remove(skt), _writing:remove(skt) }
407✔
1565
        end
1566
      end
1567

1568
      _readable_task._events, _writable_task._events, err = socket.select(_reading, _writing, timeout)
293,781✔
1569
      local r_events, w_events = _readable_task._events, _writable_task._events
293,781✔
1570

1571
      -- inject closed sockets in readable/writeable task so they can error out properly
1572
      if _closed[1] then
293,781✔
1573
        for i, skts in ipairs(_closed) do
515✔
1574
          _closed[i] = nil
259✔
1575
          r_events[#r_events+1] = skts[1]
259✔
1576
          w_events[#w_events+1] = skts[2]
259✔
1577
        end
1578
      end
1579

1580
      if duration(now, last_cleansing) > WATCH_DOG_TIMEOUT then
420,227✔
1581
        last_cleansing = now
204✔
1582

1583
        -- Check all sockets selected for reading, and check how long they have been waiting
1584
        -- for data already, without select returning them as readable
1585
        for skt,time in pairs(_reading_log) do
204✔
1586
          if not r_events[skt] and duration(now, time) > WATCH_DOG_TIMEOUT then
×
1587
            -- This one timedout while waiting to become readable, so move
1588
            -- it in the readable list and try and read anyway, despite not
1589
            -- having been returned by select
1590
            _reading_log[skt] = nil
×
1591
            r_events[#r_events + 1] = skt
×
1592
            r_events[skt] = #r_events
×
1593
          end
1594
        end
1595

1596
        -- Do the same for writing
1597
        for skt,time in pairs(_writing_log) do
204✔
1598
          if not w_events[skt] and duration(now, time) > WATCH_DOG_TIMEOUT then
×
1599
            _writing_log[skt] = nil
×
1600
            w_events[#w_events + 1] = skt
×
1601
            w_events[skt] = #w_events
×
1602
          end
1603
        end
1604
      end
1605

1606
      if err == "timeout" and #r_events + #w_events > 0 then
293,781✔
1607
        return nil
7✔
1608
      else
1609
        return err
293,774✔
1610
      end
1611
    end
1612
  end
1613
end
1614

1615

1616

1617
-------------------------------------------------------------------------------
1618
-- Dispatcher loop step.
1619
-- Listen to client requests and handles them
1620
-- Returns false if no socket-data was handled, or true if there was data
1621
-- handled (or nil + error message)
1622
-------------------------------------------------------------------------------
1623

1624
local copas_stats
1625
local min_ever, max_ever
1626

1627
local _select = _select_plain
218✔
1628

1629
-- instrumented version of _select() to collect stats
1630
local _select_instrumented = function(timeout)
1631
  if copas_stats then
×
1632
    local step_duration = gettime() - copas_stats.step_start
×
1633
    copas_stats.duration_max = math.max(copas_stats.duration_max, step_duration)
×
1634
    copas_stats.duration_min = math.min(copas_stats.duration_min, step_duration)
×
1635
    copas_stats.duration_tot = copas_stats.duration_tot + step_duration
×
1636
    copas_stats.steps = copas_stats.steps + 1
×
1637
  else
1638
    copas_stats = {
×
1639
      duration_max = -1,
1640
      duration_min = 999999,
1641
      duration_tot = 0,
1642
      steps = 0,
1643
    }
1644
  end
1645

1646
  local err = _select_plain(timeout)
×
1647

1648
  local now = gettime()
×
1649
  copas_stats.time_start = copas_stats.time_start or now
×
1650
  copas_stats.step_start = now
×
1651

1652
  return err
×
1653
end
1654

1655

1656
function copas.step(timeout)
218✔
1657
  -- Need to wake up the select call in time for the next sleeping event
1658
  if not _resumable:done() then
420,362✔
1659
    timeout = 0
286,940✔
1660
  else
1661
    timeout = math.min(_sleeping:getnext(), timeout or math.huge)
8,943✔
1662
  end
1663

1664
  local err = _select(timeout)
293,886✔
1665

1666
  for _, tsk in ipairs(_tasks) do
1,469,415✔
1667
    tsk:step()
1,175,536✔
1668
  end
1669

1670
  if err then
293,879✔
1671
    if err == "timeout" then
292,544✔
1672
      if timeout + 0.01 > TIMEOUT_PRECISION and math.random(100) > 90 then
292,439✔
1673
        -- we were idle, so occasionally do a GC sweep to ensure lingering
1674
        -- sockets are closed, and we don't accidentally block the loop from
1675
        -- exiting
1676
        collectgarbage()
485✔
1677
      end
1678
      return false
292,439✔
1679
    end
1680
    return nil, err
105✔
1681
  end
1682

1683
  return true
1,335✔
1684
end
1685

1686

1687
-------------------------------------------------------------------------------
1688
-- Check whether there is something to do.
1689
-- returns false if there are no sockets for read/write nor tasks scheduled
1690
-- (which means Copas is in an empty spin)
1691
-------------------------------------------------------------------------------
1692
function copas.finished()
218✔
1693
  return #_reading == 0 and #_writing == 0 and _resumable:done() and _sleeping:done(copas.gettimeouts())
296,760✔
1694
end
1695

1696

1697
local resetexit do
218✔
1698
  local exit_semaphore, exiting
1699

1700
  function resetexit()
188✔
1701
    exit_semaphore = copas.semaphore.new(1, 0, math.huge)
512✔
1702
    exiting = false
358✔
1703
  end
1704

1705
  -- Signals tasks to exit. But only if they check for it. By calling `copas.exiting`
1706
  -- they can check if they should exit. Or by calling `copas.waitforexit` they can
1707
  -- wait until the exit signal is given.
1708
  function copas.exit()
218✔
1709
    if exiting then return end
351✔
1710
    exiting = true
351✔
1711
    exit_semaphore:destroy()
351✔
1712
  end
1713

1714
  -- returns whether Copas is in the process of exiting. Exit can be started by
1715
  -- calling `copas.exit()`.
1716
  function copas.exiting()
218✔
1717
    return exiting
695✔
1718
  end
1719

1720
  -- Pauses the current coroutine until Copas is exiting. To be used as an exit
1721
  -- signal for tasks that need to clean up before exiting.
1722
  function copas.waitforexit()
218✔
1723
    exit_semaphore:take(1)
14✔
1724
  end
1725
end
1726

1727

1728
--- Forcibly cancels all pending work and signals exit.
1729
-- Intended for test teardown only. Abandons all registered threads and sockets
1730
-- without giving them a chance to clean up. After this call copas.finished()
1731
-- will return true and the loop will exit. The module is left in a clean state
1732
-- ready for the next copas.loop() call.
1733
function copas.cancelall()
218✔
1734
  -- 1. clear resumable queue
1735
  _resumable:clear_resumelist()
×
1736

1737
  -- 2. drain sleeping heap
1738
  _sleeping:cancelall()
×
1739

1740
  -- 3. close and drain reading sockets
1741
  while _reading[1] do
×
1742
    copas.close(_reading[1])
×
1743
    _reading:remove(_reading[1])
×
1744
  end
1745

1746
  -- 4. close and drain writing sockets
1747
  while _writing[1] do
×
1748
    copas.close(_writing[1])
×
1749
    _writing:remove(_writing[1])
×
1750
  end
1751

1752
  -- 5. remove all servers
1753
  while _servers[1] do
×
1754
    copas.removeserver(_servers[1])
×
1755
  end
1756

1757
  -- 6. clear non-weak ancillary tables
1758
  _closed = {}
×
1759
  _reading_log = {}
×
1760
  _writing_log = {}
×
1761

1762
  -- 7. signal exit
1763
  copas.exit()
×
1764
end
1765

1766

1767
local _getstats do
218✔
1768
  local _getstats_instrumented, _getstats_plain
1769

1770

1771
  function _getstats_plain(enable)
188✔
1772
    -- this function gets hit if turned off, so turn on if true
1773
    if enable == true then
×
1774
      _select = _select_instrumented
×
1775
      _getstats = _getstats_instrumented
×
1776
      -- reset stats
1777
      min_ever = nil
×
1778
      max_ever = nil
×
1779
      copas_stats = nil
×
1780
    end
1781
    return {}
×
1782
  end
1783

1784

1785
  -- convert from seconds to millisecs, with microsec precision
1786
  local function useconds(t)
1787
    return math.floor((t * 1000000) + 0.5) / 1000
×
1788
  end
1789
  -- convert from seconds to seconds, with millisec precision
1790
  local function mseconds(t)
1791
    return math.floor((t * 1000) + 0.5) / 1000
×
1792
  end
1793

1794

1795
  function _getstats_instrumented(enable)
188✔
1796
    if enable == false then
×
1797
      _select = _select_plain
×
1798
      _getstats = _getstats_plain
×
1799
      -- instrumentation disabled, so switch to the plain implementation
1800
      return _getstats(enable)
×
1801
    end
1802
    if (not copas_stats) or (copas_stats.step == 0) then
×
1803
      return {}
×
1804
    end
1805
    local stats = copas_stats
×
1806
    copas_stats = nil
×
1807
    min_ever = math.min(min_ever or 9999999, stats.duration_min)
×
1808
    max_ever = math.max(max_ever or 0, stats.duration_max)
×
1809
    stats.duration_min_ever = min_ever
×
1810
    stats.duration_max_ever = max_ever
×
1811
    stats.duration_avg = stats.duration_tot / stats.steps
×
1812
    stats.step_start = nil
×
1813
    stats.time_end = gettime()
×
1814
    stats.time_tot = stats.time_end - stats.time_start
×
1815
    stats.time_avg = stats.time_tot / stats.steps
×
1816

1817
    stats.duration_avg = useconds(stats.duration_avg)
×
1818
    stats.duration_max = useconds(stats.duration_max)
×
1819
    stats.duration_max_ever = useconds(stats.duration_max_ever)
×
1820
    stats.duration_min = useconds(stats.duration_min)
×
1821
    stats.duration_min_ever = useconds(stats.duration_min_ever)
×
1822
    stats.duration_tot = useconds(stats.duration_tot)
×
1823
    stats.time_avg = useconds(stats.time_avg)
×
1824
    stats.time_start = mseconds(stats.time_start)
×
1825
    stats.time_end = mseconds(stats.time_end)
×
1826
    stats.time_tot = mseconds(stats.time_tot)
×
1827
    return stats
×
1828
  end
1829

1830
  _getstats = _getstats_plain
218✔
1831
end
1832

1833

1834
function copas.status(enable_stats)
218✔
1835
  local res = _getstats(enable_stats)
×
1836
  res.running = not not copas.running
×
1837
  res.timeout = copas.gettimeouts()
×
1838
  res.timer, res.inactive = _sleeping:status()
×
1839
  res.read = #_reading
×
1840
  res.write = #_writing
×
1841
  res.active = _resumable:count()
×
1842
  return res
×
1843
end
1844

1845

1846
-------------------------------------------------------------------------------
1847
-- Dispatcher endless loop.
1848
-- Listen to client requests and handles them forever
1849
-------------------------------------------------------------------------------
1850
function copas.loop(initializer, timeout)
278✔
1851
  if type(initializer) == "function" then
358✔
1852
    copas.addnamedthread("copas_initializer", initializer)
179✔
1853
  else
1854
    timeout = initializer or timeout
217✔
1855
  end
1856

1857
  resetexit()
358✔
1858
  copas.running = true
358✔
1859
  while true do
1860
    copas.step(timeout)
293,886✔
1861
    if copas.finished() then
420,353✔
1862
      if copas.exiting() then
889✔
1863
        break
198✔
1864
      end
1865
      copas.exit()
344✔
1866
    end
1867
  end
1868
  copas.running = false
351✔
1869
end
1870

1871

1872
-------------------------------------------------------------------------------
1873
-- Naming sockets and coroutines.
1874
-------------------------------------------------------------------------------
1875
do
1876
  local function realsocket(skt)
1877
    local mt = getmetatable(skt)
105✔
1878
    if mt == _skt_mt_tcp or mt == _skt_mt_udp then
105✔
1879
      return skt.socket
105✔
1880
    else
1881
      return skt
×
1882
    end
1883
  end
1884

1885

1886
  function copas.setsocketname(name, skt)
278✔
1887
    assert(type(name) == "string", "expected arg #1 to be a string")
105✔
1888
    skt = assert(realsocket(skt), "expected arg #2 to be a socket")
135✔
1889
    object_names[skt] = name
105✔
1890
  end
1891

1892

1893
  function copas.getsocketname(skt)
278✔
1894
    skt = assert(realsocket(skt), "expected arg #1 to be a socket")
×
1895
    return object_names[skt]
×
1896
  end
1897
end
1898

1899

1900
function copas.setthreadname(name, coro)
278✔
1901
  assert(type(name) == "string", "expected arg #1 to be a string")
70✔
1902
  coro = coro or coroutine_running()
70✔
1903
  assert(type(coro) == "thread", "expected arg #2 to be a coroutine or nil")
70✔
1904
  object_names[coro] = name
70✔
1905
end
1906

1907

1908
function copas.getthreadname(coro)
278✔
1909
  coro = coro or coroutine_running()
43✔
1910
  assert(type(coro) == "thread", "expected arg #1 to be a coroutine or nil")
43✔
1911
  return object_names[coro]
47✔
1912
end
1913

1914
-------------------------------------------------------------------------------
1915
-- Debug functionality.
1916
-------------------------------------------------------------------------------
1917
do
1918
  copas.debug = {}
218✔
1919

1920
  local log_core    -- if truthy, the core-timer will also be logged
1921
  local debug_log   -- function used as logger
1922

1923

1924
  local debug_yield = function(skt, queue)
1925
    local name = object_names[coroutine_running()]
7,001✔
1926

1927
    if log_core or name ~= "copas_core_timer" then
7,001✔
1928
      if queue == _sleeping then
6,974✔
1929
        debug_log("yielding '", name, "' to SLEEP for ", skt," seconds")
6,867✔
1930

1931
      elseif queue == _writing then
107✔
1932
        debug_log("yielding '", name, "' to WRITE on '", object_names[skt], "'")
18✔
1933

1934
      elseif queue == _reading then
93✔
1935
        debug_log("yielding '", name, "' to READ on '", object_names[skt], "'")
97✔
1936

1937
      else
1938
        debug_log("thread '", name, "' yielding to unexpected queue; ", tostring(queue), " (", type(queue), ")", debug.traceback())
×
1939
      end
1940
    end
1941

1942
    return coroutine.yield(skt, queue)
7,001✔
1943
  end
1944

1945

1946
  local debug_resume = function(coro, skt, ...)
1947
    local name = object_names[coro]
7,015✔
1948

1949
    if skt then
7,015✔
1950
      debug_log("resuming '", name, "' for socket '", object_names[skt], "'")
107✔
1951
    else
1952
      if log_core or name ~= "copas_core_timer" then
6,908✔
1953
        debug_log("resuming '", name, "'")
6,881✔
1954
      end
1955
    end
1956
    return coroutine.resume(coro, skt, ...)
7,015✔
1957
  end
1958

1959

1960
  local debug_create = function(f)
1961
    local f_wrapped = function(...)
1962
      local results = pack(f(...))
18✔
1963
      debug_log("exiting '", object_names[coroutine_running()], "'")
14✔
1964
      return unpack(results)
14✔
1965
    end
1966

1967
    return coroutine.create(f_wrapped)
14✔
1968
  end
1969

1970

1971
  debug_log = fnil
218✔
1972

1973

1974
  -- enables debug output for all coroutine operations.
1975
  function copas.debug.start(logger, core)
436✔
1976
    log_core = core
7✔
1977
    debug_log = logger or print
7✔
1978
    coroutine_yield = debug_yield
7✔
1979
    coroutine_resume = debug_resume
7✔
1980
    coroutine_create = debug_create
7✔
1981
  end
1982

1983

1984
  -- disables debug output for coroutine operations.
1985
  function copas.debug.stop()
436✔
1986
    debug_log = fnil
×
1987
    coroutine_yield = coroutine.yield
×
1988
    coroutine_resume = coroutine.resume
×
1989
    coroutine_create = coroutine.create
×
1990
  end
1991

1992
  do
1993
    local call_id = 0
218✔
1994

1995
    -- Description table of socket functions for debug output.
1996
    -- each socket function name has TWO entries;
1997
    -- 'name_in' and 'name_out', each being an array of names/descriptions of respectively
1998
    -- input parameters and return values.
1999
    -- If either table has a 'callback' key, then that is a function that will be called
2000
    -- with the parameters/return-values for further inspection.
2001
    local args = {
218✔
2002
      settimeout_in = {
218✔
2003
        "socket ",
158✔
2004
        "seconds",
158✔
2005
        "mode   ",
2006
      },
218✔
2007
      settimeout_out = {
218✔
2008
        "success",
158✔
2009
        "error  ",
2010
      },
218✔
2011
      connect_in = {
218✔
2012
        "socket ",
158✔
2013
        "address",
158✔
2014
        "port   ",
2015
      },
218✔
2016
      connect_out = {
218✔
2017
        "success",
158✔
2018
        "error  ",
2019
      },
218✔
2020
      getfd_in = {
218✔
2021
        "socket ",
2022
        -- callback = function(...)
2023
        --   print(debug.traceback("called from:", 4))
2024
        -- end,
2025
      },
218✔
2026
      getfd_out = {
218✔
2027
        "fd",
2028
      },
218✔
2029
      send_in = {
218✔
2030
        "socket   ",
158✔
2031
        "data     ",
158✔
2032
        "idx-start",
158✔
2033
        "idx-end  ",
2034
      },
218✔
2035
      send_out = {
218✔
2036
        "last-idx-send    ",
158✔
2037
        "error            ",
158✔
2038
        "err-last-idx-send",
2039
      },
218✔
2040
      receive_in = {
218✔
2041
        "socket ",
158✔
2042
        "pattern",
158✔
2043
        "prefix ",
2044
      },
218✔
2045
      receive_out = {
218✔
2046
        "received    ",
158✔
2047
        "error       ",
158✔
2048
        "partial data",
2049
      },
218✔
2050
      dirty_in = {
218✔
2051
        "socket",
2052
        -- callback = function(...)
2053
        --   print(debug.traceback("called from:", 4))
2054
        -- end,
2055
      },
218✔
2056
      dirty_out = {
218✔
2057
        "data in read-buffer",
2058
      },
218✔
2059
      close_in = {
218✔
2060
        "socket",
2061
        -- callback = function(...)
2062
        --   print(debug.traceback("called from:", 4))
2063
        -- end,
2064
      },
218✔
2065
      close_out = {
218✔
2066
        "success",
158✔
2067
        "error",
2068
      },
218✔
2069
    }
2070
    local function print_call(func, msg, ...)
2071
      print(msg)
898✔
2072
      local arg = pack(...)
898✔
2073
      local desc = args[func] or {}
898✔
2074
      for i = 1, math.max(arg.n, #desc) do
1,964✔
2075
        local value = arg[i]
1,066✔
2076
        if type(value) == "string" then
1,066✔
2077
          local xvalue = value:sub(1,30)
42✔
2078
          if xvalue ~= value then
42✔
2079
            xvalue = xvalue .."(...truncated)"
×
2080
          end
2081
          print("\t"..(desc[i] or i)..": '"..tostring(xvalue).."' ("..type(value).." #"..#value..")")
42✔
2082
        else
2083
          print("\t"..(desc[i] or i)..": '"..tostring(value).."' ("..type(value)..")")
1,024✔
2084
        end
2085
      end
2086
      if desc.callback then
898✔
2087
        desc.callback(...)
×
2088
      end
2089
    end
2090

2091
    local debug_mt = {
218✔
2092
      __index = function(self, key)
2093
        local value = self.__original_socket[key]
449✔
2094
        if type(value) ~= "function" then
449✔
2095
          return value
×
2096
        end
2097
        return function(self2, ...)
2098
            local my_id = call_id + 1
449✔
2099
            call_id = my_id
449✔
2100
            local results
2101

2102
            if self2 ~= self then
449✔
2103
              -- there is no self
2104
              print_call(tostring(key).."_in", my_id .. "-calling '"..tostring(key) .. "' with; ", self, ...)
×
2105
              results = pack(value(self, ...))
×
2106
            else
2107
              print_call(tostring(key).."_in", my_id .. "-calling '" .. tostring(key) .. "' with; ", self.__original_socket, ...)
449✔
2108
              results = pack(value(self.__original_socket, ...))
643✔
2109
            end
2110
            print_call(tostring(key).."_out", my_id .. "-results '"..tostring(key) .. "' returned; ", unpack(results))
643✔
2111
            return unpack(results)
449✔
2112
          end
2113
      end,
2114
      __tostring = function(self)
2115
        return tostring(self.__original_socket)
56✔
2116
      end
2117
    }
2118

2119

2120
    -- wraps a socket (copas or luasocket) in a debug version printing all calls
2121
    -- and their parameters/return values. Extremely noisy!
2122
    -- returns the wrapped socket.
2123
    -- NOTE: only for plain sockets, will not support TLS
2124
    function copas.debug.socket(original_skt)
436✔
2125
      if (getmetatable(original_skt) == _skt_mt_tcp) or (getmetatable(original_skt) == _skt_mt_udp) then
14✔
2126
        -- already wrapped as Copas socket, so recurse with the original luasocket one
2127
        original_skt.socket = copas.debug.socket(original_skt.socket)
×
2128
        return original_skt
×
2129
      end
2130

2131
      local proxy = setmetatable({
28✔
2132
        __original_socket = original_skt
14✔
2133
      }, debug_mt)
14✔
2134

2135
      return proxy
14✔
2136
    end
2137
  end
2138
end
2139

2140

2141
return copas
218✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc