• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarmodules / copas / 27377564602

11 Jun 2026 09:08PM UTC coverage: 84.967% (-0.06%) from 85.027%
27377564602

push

github

web-flow
Merge bed2b0136 into 29463813f

1413 of 1663 relevant lines covered (84.97%)

74539.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.96
/src/copas.lua
1
-------------------------------------------------------------------------------
2
-- Copas - Coroutine Oriented Portable Asynchronous Services
3
--
4
-- A dispatcher based on coroutines that can be used by TCP/IP servers.
5
-- Uses LuaSocket as the interface with the TCP/IP stack.
6
--
7
-- Authors: Andre Carregal, Javier Guerra, and Fabio Mascarenhas
8
-- Contributors: Diego Nehab, Mike Pall, David Burgess, Leonardo Godinho,
9
--               Thomas Harning Jr., and Gary NG
10
--
11
-- Copyright 2005-2013 - Kepler Project (www.keplerproject.org), 2015-2025 Thijs Schreijer
12
--
13
-- $Id: copas.lua,v 1.37 2009/04/07 22:09:52 carregal Exp $
14
-------------------------------------------------------------------------------
15

16
if package.loaded["socket.http"] and (_VERSION=="Lua 5.1") then     -- obsolete: only for Lua 5.1 compatibility
218✔
17
  error("you must require copas before require'ing socket.http")
×
18
end
19
if package.loaded["copas.http"] and (_VERSION=="Lua 5.1") then     -- obsolete: only for Lua 5.1 compatibility
218✔
20
  error("you must require copas before require'ing copas.http")
×
21
end
22

23
-- load either LuaSocket, or LuaSystem
24
-- note: with luasocket we don't use 'sleep' but 'select' with no sockets
25
local socket, system do
218✔
26
  if pcall(require, "socket") then
220✔
27
    -- found LuaSocket
28
    socket = require "socket"
211✔
29
  end
30

31
  -- try LuaSystem as fallback
32
  if pcall(require, "system") then
220✔
33
    system = require "system"
220✔
34
  end
35

36
  if not (socket or system) then
218✔
37
    error("Neither LuaSocket nor LuaSystem found, Copas requires at least one of them")
×
38
  end
39
end
40

41
local binaryheap = require "binaryheap"
218✔
42
local gettime = (socket or system).gettime
218✔
43
local block_sleep = (socket or system).sleep
218✔
44
local ssl -- only loaded upon demand
45

46
local core_timer_thread
47
local WATCH_DOG_TIMEOUT = 120
218✔
48
local UDP_DATAGRAM_MAX = (socket or {})._DATAGRAMSIZE or 8192
218✔
49
local TIMEOUT_PRECISION = 0.1  -- 100ms
218✔
50
local fnil = function() end
283,141✔
51

52

53
local coroutine_create = coroutine.create
218✔
54
local coroutine_running = coroutine.running
218✔
55
local coroutine_yield = coroutine.yield
218✔
56
local coroutine_resume = coroutine.resume
218✔
57
local coroutine_status = coroutine.status
218✔
58

59

60
-- nil-safe versions for pack/unpack
61
local _unpack = unpack or table.unpack
218✔
62
local unpack = function(t, i, j) return _unpack(t, i or 1, j or t.n or #t) end
918✔
63
local pack = function(...) return { n = select("#", ...), ...} end
1,261✔
64

65

66
local pcall = pcall
218✔
67
if _VERSION=="Lua 5.1" and not jit then     -- obsolete: only for Lua 5.1 compatibility
218✔
68
  pcall = require("coxpcall").pcall
30✔
69
  coroutine_running = require("coxpcall").running
30✔
70
end
71

72

73
if socket then
218✔
74
  -- Redefines LuaSocket functions with coroutine safe versions (pure Lua)
75
  -- (this allows the use of socket.http from within copas)
76
  local err_mt = {
211✔
77
    __tostring = function (self)
78
      return "Copas 'try' error intermediate table: '"..tostring(self[1].."'")
×
79
    end,
80
  }
81

82
  local function statusHandler(status, ...)
83
    if status then return ... end
98✔
84
    local err = (...)
35✔
85
    if type(err) == "table" and getmetatable(err) == err_mt then
35✔
86
      return nil, err[1]
35✔
87
    else
88
      error(err)
×
89
    end
90
  end
91

92
  function socket.protect(func)
211✔
93
    return function (...)
94
            return statusHandler(pcall(func, ...))
126✔
95
          end
96
  end
97

98
  function socket.newtry(finalizer)
211✔
99
    return function (...)
100
            local status = (...)
1,610✔
101
            if not status then
1,610✔
102
              pcall(finalizer or fnil, select(2, ...))
35✔
103
              error(setmetatable({ (select(2, ...)) }, err_mt), 0)
35✔
104
            end
105
            return ...
1,575✔
106
          end
107
  end
108

109
  socket.try = socket.newtry()
269✔
110
end
111

112

113
-- Setup the Copas meta table to auto-load submodules and define a default method
114
local copas do
218✔
115
  local submodules = { "ftp", "future", "http", "lock", "queue", "semaphore", "smtp", "timer" }
218✔
116
  for i, key in ipairs(submodules) do
1,962✔
117
    submodules[key] = true
1,744✔
118
    submodules[i] = nil
1,744✔
119
  end
120

121
  copas = setmetatable({},{
436✔
122
    __index = function(self, key)
123
      if submodules[key] then
288✔
124
        self[key] = require("copas."..key)
292✔
125
        submodules[key] = nil
288✔
126
        return rawget(self, key)
288✔
127
      end
128
    end,
129
    __call = function(self, ...)
130
      return self.loop(...)
7✔
131
    end,
132
  })
218✔
133
end
134

135

136
-- Meta information is public even if beginning with an "_"
137
copas._COPYRIGHT   = "Copyright (C) 2005-2013 Kepler Project, 2015-2026 Thijs Schreijer"
218✔
138
copas._DESCRIPTION = "Coroutine Oriented Portable Asynchronous Services"
218✔
139
copas._VERSION     = "Copas 4.10.0"
218✔
140

141
-- Close the socket associated with the current connection after the handler finishes
142
copas.autoclose = true
218✔
143

144
-- indicator for the loop running
145
copas.running = false
218✔
146

147
-- gettime method from either LuaSocket or LuaSystem: time in (fractional) seconds, since epoch.
148
copas.gettime = gettime
218✔
149

150
-------------------------------------------------------------------------------
151
-- Object names, to track names of thread/coroutines and sockets
152
-------------------------------------------------------------------------------
153
local object_names = setmetatable({}, {
436✔
154
  __mode = "k",
158✔
155
  __index = function(self, key)
156
    local name = tostring(key)
224✔
157
    if key ~= nil then
224✔
158
      rawset(self, key, name)
224✔
159
    end
160
    return name
224✔
161
  end
162
})
163

164
-------------------------------------------------------------------------------
165
-- Simple set implementation
166
-- adds a FIFO queue for each socket in the set
167
-------------------------------------------------------------------------------
168

169
local function newsocketset()
170
  local set = {}
654✔
171

172
  do  -- set implementation
173
    local reverse = {}
654✔
174

175
    -- Adds a socket to the set, does nothing if it exists
176
    -- @return skt if added, or nil if it existed
177
    function set:insert(skt)
654✔
178
      if not reverse[skt] then
1,477✔
179
        self[#self + 1] = skt
1,477✔
180
        reverse[skt] = #self
1,477✔
181
        return skt
1,477✔
182
      end
183
    end
184

185
    -- Removes socket from the set, does nothing if not found
186
    -- @return skt if removed, or nil if it wasn't in the set
187
    function set:remove(skt)
654✔
188
      local index = reverse[skt]
2,093✔
189
      if index then
2,093✔
190
        reverse[skt] = nil
1,470✔
191
        local top = self[#self]
1,470✔
192
        self[#self] = nil
1,470✔
193
        if top ~= skt then
1,470✔
194
          reverse[top] = index
200✔
195
          self[index] = top
200✔
196
        end
197
        return skt
1,470✔
198
      end
199
    end
200

201
  end
202

203
  do  -- queues implementation
204
    local fifo_queues = setmetatable({},{
1,308✔
205
      __mode = "k",                 -- auto collect queue if socket is gone
474✔
206
      __index = function(self, skt) -- auto create fifo queue if not found
207
        local newfifo = {}
595✔
208
        self[skt] = newfifo
595✔
209
        return newfifo
595✔
210
      end,
211
    })
212

213
    -- pushes an item in the fifo queue for the socket.
214
    function set:push(skt, itm)
654✔
215
      local queue = fifo_queues[skt]
1,365✔
216
      queue[#queue + 1] = itm
1,365✔
217
    end
218

219
    -- pops an item from the fifo queue for the socket
220
    function set:pop(skt)
654✔
221
      local queue = fifo_queues[skt]
1,274✔
222
      return table.remove(queue, 1)
1,274✔
223
    end
224

225
  end
226

227
  return set
654✔
228
end
229

230

231

232
-- Threads immediately resumable
233
local _resumable = {} do
218✔
234
  local resumelist = {}
218✔
235

236
  function _resumable:push(co)
218✔
237
    resumelist[#resumelist + 1] = co
282,756✔
238
  end
239

240
  function _resumable:clear_resumelist()
218✔
241
    local lst = resumelist
271,886✔
242
    resumelist = {}
271,886✔
243
    return lst
271,886✔
244
  end
245

246
  function _resumable:done()
218✔
247
    return resumelist[1] == nil
275,860✔
248
  end
249

250
  function _resumable:count()
218✔
251
    return #resumelist + #_resumable
×
252
  end
253

254
end
255

256

257

258
-- Similar to the socket set above, but tailored for the use of
259
-- sleeping threads
260
local _sleeping = {} do
218✔
261

262
  local heap = binaryheap.minUnique()
218✔
263
  local lethargy = setmetatable({}, { __mode = "k" }) -- list of coroutines sleeping without a wakeup time
218✔
264

265

266
  -- Required base implementation
267
  -----------------------------------------
268
  _sleeping.insert = fnil
218✔
269
  _sleeping.remove = fnil
218✔
270

271
  -- push a new timer on the heap
272
  function _sleeping:push(sleeptime, co)
218✔
273
    if sleeptime < 0 then
282,916✔
274
      lethargy[co] = true
3,759✔
275
    elseif sleeptime == 0 then
279,157✔
276
      _resumable:push(co)
389,945✔
277
    else
278
      heap:insert(gettime() + sleeptime, co)
8,542✔
279
    end
280
  end
281

282
  -- find the thread that should wake up to the time, if any
283
  function _sleeping:pop(time)
218✔
284
    if time < (heap:peekValue() or math.huge) then
402,337✔
285
      return
271,886✔
286
    end
287
    return heap:pop()
8,298✔
288
  end
289

290
  -- additional methods for time management
291
  -----------------------------------------
292
  function _sleeping:getnext()  -- returns delay until next sleep expires, or nil if there is none
218✔
293
    local t = heap:peekValue()
6,943✔
294
    if t then
6,943✔
295
      -- never report less than 0, because select() might block
296
      return math.max(t - gettime(), 0)
6,943✔
297
    end
298
  end
299

300
  function _sleeping:wakeup(co)
218✔
301
    if lethargy[co] then
3,773✔
302
      lethargy[co] = nil
3,738✔
303
      _resumable:push(co)
3,738✔
304
      return
3,738✔
305
    end
306
    if heap:remove(co) then
45✔
307
      _resumable:push(co)
14✔
308
    end
309
  end
310

311
  function _sleeping:cancel(co)
218✔
312
    lethargy[co] = nil
63✔
313
    heap:remove(co)
63✔
314
  end
315

316
  function _sleeping:cancelall()
218✔
317
    while heap:size() > 0 do heap:pop() end
×
318
    heap:insert(gettime() + TIMEOUT_PRECISION, core_timer_thread)
×
319
    -- lethargy is weak; copas's idle GC sweeps will clean it within a few steps
320
  end
321

322
  -- @param tos number of timeouts running
323
  function _sleeping:done(tos)
218✔
324
    -- return true if we have nothing more to do
325
    -- the timeout task doesn't qualify as work (fallbacks only),
326
    -- the lethargy also doesn't qualify as work ('dead' tasks),
327
    -- but the combination of a timeout + a lethargy can be work
328
    return heap:size() == 1       -- 1 means only the timeout-timer task is running
4,039✔
329
           and not (tos > 0 and next(lethargy))
3,141✔
330
  end
331

332
  -- gets number of threads in binaryheap and lethargy
333
  function _sleeping:status()
218✔
334
    local c = 0
×
335
    for _ in pairs(lethargy) do c = c + 1 end
×
336

337
    return heap:size(), c
×
338
  end
339

340
end   -- _sleeping
341

342

343

344
-------------------------------------------------------------------------------
345
-- Tracking coroutines and sockets
346
-------------------------------------------------------------------------------
347

348
local _servers = newsocketset() -- servers being handled
218✔
349
local _threads = setmetatable({}, {__mode = "k"})  -- registered threads added with addthread()
218✔
350
local _canceled = setmetatable({}, {__mode = "k"}) -- threads that are canceled and pending removal
218✔
351
local _autoclose = setmetatable({}, {__mode = "kv"}) -- sockets (value) to close when a thread (key) exits
218✔
352
local _autoclose_r = setmetatable({}, {__mode = "kv"}) -- reverse: sockets (key) to close when a thread (value) exits
218✔
353

354

355
-- for each socket we log the last read and last write times to enable the
356
-- watchdog to follow up if it takes too long.
357
-- tables contain the time, indexed by the socket
358
local _reading_log = {}
218✔
359
local _writing_log = {}
218✔
360

361
local _closed = {} -- track sockets that have been closed (list/array)
218✔
362

363
local _reading = newsocketset() -- sockets currently being read
218✔
364
local _writing = newsocketset() -- sockets currently being written
218✔
365
local _isSocketTimeout = { -- set of errors indicating a socket-timeout
218✔
366
  ["timeout"] = true,      -- default LuaSocket timeout
158✔
367
  ["wantread"] = true,     -- LuaSec specific timeout
158✔
368
  ["wantwrite"] = true,    -- LuaSec specific timeout
158✔
369
}
370

371
-------------------------------------------------------------------------------
372
-- Coroutine based socket timeouts.
373
-------------------------------------------------------------------------------
374
local user_timeouts_connect
375
local user_timeouts_send
376
local user_timeouts_receive
377
do
378
  local timeout_mt = {
218✔
379
    __mode = "k",
158✔
380
    __index = function(self, skt)
381
      -- if there is no timeout found, we insert one automatically, to block forever
382
      self[skt] = math.huge
504✔
383
      return self[skt]
504✔
384
    end,
385
  }
386

387
  user_timeouts_connect = setmetatable({}, timeout_mt)
218✔
388
  user_timeouts_send = setmetatable({}, timeout_mt)
218✔
389
  user_timeouts_receive = setmetatable({}, timeout_mt)
218✔
390
end
391

392
local useSocketTimeoutErrors = setmetatable({},{ __mode = "k" })
218✔
393

394

395
-- sto = socket-time-out
396
local sto_timeout, sto_timed_out, sto_change_queue, sto_error do
218✔
397

398
  local socket_register = setmetatable({}, { __mode = "k" })    -- socket by coroutine
218✔
399
  local operation_register = setmetatable({}, { __mode = "k" }) -- operation "read"/"write" by coroutine
218✔
400
  local timeout_flags = setmetatable({}, { __mode = "k" })      -- true if timedout, by coroutine
218✔
401

402

403
  -- The callback called when a socket timeout occurs.
404
  local function socket_callback(co)
405
    local skt = socket_register[co]
91✔
406
    local queue = operation_register[co]
91✔
407

408
    -- flag the timeout and resume the coroutine
409
    timeout_flags[co] = true
91✔
410
    _resumable:push(co)
91✔
411

412
    -- clear the socket from the current queue
413
    if queue == "read" then
91✔
414
      _reading:remove(skt)
99✔
415
    elseif queue == "write" then
14✔
416
      _writing:remove(skt)
18✔
417
    else
418
      error("bad queue name; expected 'read'/'write', got: "..tostring(queue))
×
419
    end
420
  end
421

422

423
  -- Sets a socket timeout.
424
  -- Calling it as `sto_timeout()` will cancel the timeout.
425
  -- @param skt (socket) the socket on which to operate, use 'nil' to cancel the current timeout
426
  -- @param queue (string) the queue the socket is currently in: "read" or "write"
427
  -- @param use_connect_to (bool) if truthy, use the connect timeout instead of the
428
  --   read/write timeout implied by queue. Needed because connect also uses the "write"
429
  --   queue, so the queue value alone cannot distinguish connect from send operations.
430
  -- @return true
431
  function sto_timeout(skt, queue, use_connect_to)
188✔
432
    local co = coroutine_running()
5,101,294✔
433
    socket_register[co] = skt
5,101,294✔
434
    operation_register[co] = queue
5,101,294✔
435
    timeout_flags[co] = nil
5,101,294✔
436
    if skt then
5,101,294✔
437
      local to = (use_connect_to and user_timeouts_connect[skt]) or
2,550,669✔
438
                 (queue == "read" and user_timeouts_receive[skt]) or
2,550,280✔
439
                 user_timeouts_send[skt]
17,758✔
440
      copas.timeout(to, socket_callback)
3,678,495✔
441
    else
442
      copas.timeout(0)
2,550,647✔
443
    end
444
    return true
5,101,294✔
445
  end
446

447

448
  -- Changes the timeout to a different queue (read/write).
449
  -- Only usefull with ssl-handshakes and "wantread", "wantwrite" errors, when
450
  -- the queue has to be changed, so the timeout handler knows where to find the socket.
451
  -- @param queue (string) the new queue the socket is in, must be either "read" or "write"
452
  -- @return true
453
  function sto_change_queue(queue)
188✔
454
    operation_register[coroutine_running()] = queue
1,099✔
455
    return true
1,099✔
456
  end
457

458

459
  -- Responds with `true` if the operation timed-out.
460
  function sto_timed_out()
188✔
461
    return timeout_flags[coroutine_running()]
1,456✔
462
  end
463

464

465
  -- Returns the proper timeout error
466
  function sto_error(err)
188✔
467
    return useSocketTimeoutErrors[coroutine_running()] and err or "timeout"
91✔
468
  end
469

470
  -- only in case of testing export some internals
471
  if _G._TEST then
218✔
472
    copas._socket_register = socket_register
7✔
473
    copas._operation_register = operation_register
7✔
474
    copas._timeout_flags = timeout_flags
7✔
475
  end
476
end
477

478

479

480
-------------------------------------------------------------------------------
481
-- Coroutine based socket I/O functions.
482
-------------------------------------------------------------------------------
483

484
-- Returns "tcp"" for plain TCP and "ssl" for ssl-wrapped sockets, so truthy
485
-- for tcp based, and falsy for udp based.
486
local isTCP do
218✔
487
  local lookup = {
218✔
488
    tcp = "tcp",
158✔
489
    SSL = "ssl",
158✔
490
  }
491

492
  function isTCP(socket)
188✔
493
    return lookup[tostring(socket):sub(1,3)]
972✔
494
  end
495
end
496

497
function copas.close(skt, ...)
218✔
498
  _closed[#_closed+1] = skt
259✔
499
  return skt:close(...)
259✔
500
end
501

502

503

504
-- nil or negative is indefinitly
505
function copas.settimeout(skt, timeout)
218✔
506
  timeout = timeout or -1
252✔
507
  if type(timeout) ~= "number" then
252✔
508
    return nil, "timeout must be 'nil' or a number"
21✔
509
  end
510

511
  return copas.settimeouts(skt, timeout, timeout, timeout)
231✔
512
end
513

514
-- negative is indefinitly, nil means do not change
515
function copas.settimeouts(skt, connect, send, read)
218✔
516

517
  if connect ~= nil and type(connect) ~= "number" then
504✔
518
    return nil, "connect timeout must be 'nil' or a number"
×
519
  end
520
  if connect then
504✔
521
    if connect < 0 then
504✔
522
      connect = nil
×
523
    end
524
    user_timeouts_connect[skt] = connect
504✔
525
  end
526

527

528
  if send ~= nil and type(send) ~= "number" then
504✔
529
    return nil, "send timeout must be 'nil' or a number"
×
530
  end
531
  if send then
504✔
532
    if send < 0 then
504✔
533
      send = nil
×
534
    end
535
    user_timeouts_send[skt] = send
504✔
536
  end
537

538

539
  if read ~= nil and type(read) ~= "number" then
504✔
540
    return nil, "read timeout must be 'nil' or a number"
×
541
  end
542
  if read then
504✔
543
    if read < 0 then
504✔
544
      read = nil
×
545
    end
546
    user_timeouts_receive[skt] = read
504✔
547
  end
548

549

550
  return true
504✔
551
end
552

553
-- reads a pattern from a client and yields to the reading set on timeouts
554
-- UDP: a UDP socket expects a second argument to be a number, so it MUST
555
-- be provided as the 'pattern' below defaults to a string. Will throw a
556
-- 'bad argument' error if omitted.
557
function copas.receive(client, pattern, part)
218✔
558
  local s, err
559
  pattern = pattern or "*l"
2,532,476✔
560
  local current_log = _reading_log
2,532,476✔
561
  sto_timeout(client, "read")
2,532,476✔
562

563
  repeat
564
    s, err, part = client:receive(pattern, part)
2,533,136✔
565

566
    -- guarantees that high throughput doesn't take other threads to starvation
567
    if (math.random(100) > 90) then
2,533,136✔
568
      copas.pause()
253,971✔
569
    end
570

571
    if s then
2,533,136✔
572
      current_log[client] = nil
2,532,350✔
573
      sto_timeout()
2,532,350✔
574
      return s, err, part
2,532,350✔
575

576
    elseif not _isSocketTimeout[err] then
786✔
577
      current_log[client] = nil
56✔
578
      sto_timeout()
56✔
579
      return s, err, part
56✔
580

581
    elseif sto_timed_out() then
980✔
582
      current_log[client] = nil
70✔
583
      sto_timeout()
70✔
584
      return nil, sto_error(err), part
90✔
585
    end
586

587
    if err == "wantwrite" then -- wantwrite may be returned during SSL renegotiations
660✔
588
      current_log = _writing_log
×
589
      current_log[client] = gettime()
×
590
      sto_change_queue("write")
×
591
      coroutine_yield(client, _writing)
×
592
    else
593
      current_log = _reading_log
660✔
594
      current_log[client] = gettime()
660✔
595
      sto_change_queue("read")
660✔
596
      coroutine_yield(client, _reading)
660✔
597
    end
598
  until false
660✔
599
end
600

601
-- receives data from a client over UDP. Not available for TCP.
602
-- (this is a copy of receive() method, adapted for receivefrom() use)
603
function copas.receivefrom(client, size)
218✔
604
  local s, err, port
605
  size = size or UDP_DATAGRAM_MAX
28✔
606
  sto_timeout(client, "read")
28✔
607

608
  repeat
609
    s, err, port = client:receivefrom(size) -- upon success err holds ip address
56✔
610

611
    -- garantees that high throughput doesn't take other threads to starvation
612
    if (math.random(100) > 90) then
56✔
613
      copas.pause()
5✔
614
    end
615

616
    if s then
56✔
617
      _reading_log[client] = nil
21✔
618
      sto_timeout()
21✔
619
      return s, err, port
21✔
620

621
    elseif err ~= "timeout" then
35✔
622
      _reading_log[client] = nil
×
623
      sto_timeout()
×
624
      return s, err, port
×
625

626
    elseif sto_timed_out() then
45✔
627
      _reading_log[client] = nil
7✔
628
      sto_timeout()
7✔
629
      return nil, sto_error(err), port
9✔
630
    end
631

632
    _reading_log[client] = gettime()
28✔
633
    coroutine_yield(client, _reading)
28✔
634
  until false
28✔
635
end
636

637
-- same as above but with special treatment when reading chunks,
638
-- unblocks on any data received.
639
function copas.receivepartial(client, pattern, part)
218✔
640
  local s, err
641
  pattern = pattern or "*l"
14✔
642
  local orig_size = #(part or "")
14✔
643
  local current_log = _reading_log
14✔
644
  sto_timeout(client, "read")
14✔
645

646
  repeat
647
    s, err, part = client:receive(pattern, part)
21✔
648

649
    -- guarantees that high throughput doesn't take other threads to starvation
650
    if (math.random(100) > 90) then
21✔
651
      copas.pause()
×
652
    end
653

654
    if s or (type(part) == "string" and #part > orig_size) then
21✔
655
      current_log[client] = nil
14✔
656
      sto_timeout()
14✔
657
      return s, err, part
14✔
658

659
    elseif not _isSocketTimeout[err] then
7✔
660
      current_log[client] = nil
×
661
      sto_timeout()
×
662
      return s, err, part
×
663

664
    elseif sto_timed_out() then
9✔
665
      current_log[client] = nil
×
666
      sto_timeout()
×
667
      return nil, sto_error(err), part
×
668
    end
669

670
    if err == "wantwrite" then
7✔
671
      current_log = _writing_log
×
672
      current_log[client] = gettime()
×
673
      sto_change_queue("write")
×
674
      coroutine_yield(client, _writing)
×
675
    else
676
      current_log = _reading_log
7✔
677
      current_log[client] = gettime()
7✔
678
      sto_change_queue("read")
7✔
679
      coroutine_yield(client, _reading)
7✔
680
    end
681
  until false
7✔
682
end
683
copas.receivePartial = copas.receivepartial  -- compat: receivePartial is deprecated
218✔
684

685
-- sends data to a client. The operation is buffered and
686
-- yields to the writing set on timeouts
687
-- Note: from and to parameters will be ignored by/for UDP sockets
688
function copas.send(client, data, from, to)
218✔
689
  local s, err
690
  from = from or 1
17,758✔
691
  local lastIndex = from - 1
17,758✔
692
  local current_log = _writing_log
17,758✔
693
  sto_timeout(client, "write")
17,758✔
694

695
  repeat
696
    s, err, lastIndex = client:send(data, lastIndex + 1, to)
17,973✔
697

698
    -- guarantees that high throughput doesn't take other threads to starvation
699
    if (math.random(100) > 90) then
17,973✔
700
      copas.pause()
1,753✔
701
    end
702

703
    if s then
17,973✔
704
      current_log[client] = nil
17,730✔
705
      sto_timeout()
17,730✔
706
      return s, err, lastIndex
17,730✔
707

708
    elseif not _isSocketTimeout[err] then
243✔
709
      current_log[client] = nil
28✔
710
      sto_timeout()
28✔
711
      return s, err, lastIndex
28✔
712

713
    elseif sto_timed_out() then
278✔
714
      current_log[client] = nil
×
715
      sto_timeout()
×
716
      return nil, sto_error(err), lastIndex
×
717
    end
718

719
    if err == "wantread" then
215✔
720
      current_log = _reading_log
×
721
      current_log[client] = gettime()
×
722
      sto_change_queue("read")
×
723
      coroutine_yield(client, _reading)
×
724
    else
725
      current_log = _writing_log
215✔
726
      current_log[client] = gettime()
215✔
727
      sto_change_queue("write")
215✔
728
      coroutine_yield(client, _writing)
215✔
729
    end
730
  until false
215✔
731
end
732

733
function copas.sendto(client, data, ip, port)
218✔
734
  -- deprecated; for backward compatibility only, since UDP doesn't block on sending
735
  return client:sendto(data, ip, port)
×
736
end
737

738
-- waits until connection is completed
739
function copas.connect(skt, host, port)
218✔
740
  skt:settimeout(0)
249✔
741
  local ret, err, tried_more_than_once
742
  sto_timeout(skt, "write", true)
245✔
743

744
  repeat
745
    ret, err = skt:connect(host, port)
499✔
746

747
    -- non-blocking connect on Windows results in error "Operation already
748
    -- in progress" to indicate that it is completing the request async. So essentially
749
    -- it is the same as "timeout"
750
    if ret or (err ~= "timeout" and err ~= "Operation already in progress") then
483✔
751
      _writing_log[skt] = nil
231✔
752
      sto_timeout()
231✔
753
      -- Once the async connect completes, Windows returns the error "already connected"
754
      -- to indicate it is done, so that error should be ignored. Except when it is the
755
      -- first call to connect, then it was already connected to something else and the
756
      -- error should be returned
757
      if (not ret) and (err == "already connected" and tried_more_than_once) then
231✔
758
        return 1
×
759
      end
760
      return ret, err
231✔
761

762
    elseif sto_timed_out() then
324✔
763
      _writing_log[skt] = nil
14✔
764
      sto_timeout()
14✔
765
      return nil, sto_error(err)
18✔
766
    end
767

768
    tried_more_than_once = tried_more_than_once or true
238✔
769
    _writing_log[skt] = gettime()
238✔
770
    coroutine_yield(skt, _writing)
238✔
771
  until false
238✔
772
end
773

774

775
-- Wraps a tcp socket in an ssl socket and configures it. If the socket was
776
-- already wrapped, it does nothing and returns the socket.
777
-- @param wrap_params the parameters for the ssl-context
778
-- @return wrapped socket, or throws an error
779
local function ssl_wrap(skt, wrap_params)
780
  if isTCP(skt) == "ssl" then return skt end -- was already wrapped
288✔
781
  if not wrap_params then
126✔
782
    error("cannot wrap socket into a secure socket (using 'ssl.wrap()') without parameters/context")
×
783
  end
784

785
  ssl = ssl or require("ssl")
126✔
786
  local nskt = assert(ssl.wrap(skt, wrap_params)) -- assert, because we do not want to silently ignore this one!!
162✔
787

788
  nskt:settimeout(0)  -- non-blocking on the ssl-socket
126✔
789
  copas.settimeouts(nskt, user_timeouts_connect[skt],
252✔
790
    user_timeouts_send[skt], user_timeouts_receive[skt]) -- copy copas user-timeout to newly wrapped one
134✔
791

792
  local co = _autoclose_r[skt]
126✔
793
  if co then
126✔
794
    -- socket registered for autoclose, move registration to wrapped one
795
    _autoclose[co] = nskt
28✔
796
    _autoclose_r[skt] = nil
28✔
797
    _autoclose_r[nskt] = co
28✔
798
  end
799

800
  local sock_name = object_names[skt]
126✔
801
  if sock_name ~= tostring(skt) then
126✔
802
    -- socket had a custom name, so copy it over
803
    object_names[nskt] = sock_name
42✔
804
  end
805
  return nskt
126✔
806
end
807

808

809
-- For each luasec method we have a subtable, allows for future extension.
810
-- Required structure:
811
-- {
812
--   wrap = ... -- parameter to 'wrap()'; the ssl parameter table, or the context object
813
--   sni = {                  -- parameters to 'sni()'
814
--     names = string | table -- 1st parameter
815
--     strict = bool          -- 2nd parameter
816
--   }
817
-- }
818
local function normalize_sslt(sslt)
819
  local t = type(sslt)
392✔
820
  local r = setmetatable({}, {
784✔
821
    __index = function(self, key)
822
      -- a bug if this happens, here as a sanity check, just being careful since
823
      -- this is security stuff
824
      error("accessing unknown 'ssl_params' table key: "..tostring(key))
×
825
    end,
826
  })
827
  if t == "nil" then
392✔
828
    r.wrap = false
266✔
829
    r.sni = false
266✔
830

831
  elseif t == "table" then
126✔
832
    if sslt.mode or sslt.protocol then
126✔
833
      -- has the mandatory fields for the ssl-params table for handshake
834
      -- backward compatibility
835
      r.wrap = sslt
28✔
836
      r.sni = false
28✔
837
    else
838
      -- has the target definition, copy our known keys
839
      r.wrap = sslt.wrap or false -- 'or false' because we do not want nils
98✔
840
      r.sni = sslt.sni or false -- 'or false' because we do not want nils
98✔
841
    end
842

843
  elseif t == "userdata" then
×
844
    -- it's an ssl-context object for the handshake
845
    -- backward compatibility
846
    r.wrap = sslt
×
847
    r.sni = false
×
848

849
  else
850
    error("ssl parameters; did not expect type "..tostring(sslt))
×
851
  end
852

853
  return r
392✔
854
end
855

856

857
---
858
-- Peforms an (async) ssl handshake on a connected TCP client socket.
859
-- NOTE: if not ssl-wrapped already, then replace all previous socket references, with the returned new ssl wrapped socket
860
-- Throws error and does not return nil+error, as that might silently fail
861
-- in code like this;
862
--   copas.addserver(s1, function(skt)
863
--       skt = copas.wrap(skt, sparams)
864
--       skt:dohandshake()   --> without explicit error checking, this fails silently and
865
--       skt:send(body)      --> continues unencrypted
866
-- @param skt Regular LuaSocket CLIENT socket object
867
-- @param wrap_params Table with ssl parameters
868
-- @return wrapped ssl socket, or throws an error
869
function copas.dohandshake(skt, wrap_params)
218✔
870
  ssl = ssl or require("ssl")
126✔
871

872
  local nskt = ssl_wrap(skt, wrap_params)
126✔
873

874
  sto_timeout(nskt, "write", true)
126✔
875
  local queue
876

877
  repeat
878
    local success, err = nskt:dohandshake()
343✔
879

880
    if success then
343✔
881
      sto_timeout()
112✔
882
      return nskt
112✔
883

884
    elseif not _isSocketTimeout[err] then
231✔
885
      sto_timeout()
14✔
886
      error("TLS/SSL handshake failed: " .. tostring(err))
14✔
887

888
    elseif sto_timed_out() then
279✔
889
      sto_timeout()
×
890
      return nil, sto_error(err)
×
891

892
    elseif err == "wantwrite" then
217✔
893
      sto_change_queue("write")
×
894
      queue = _writing
×
895

896
    elseif err == "wantread" then
217✔
897
      sto_change_queue("read")
217✔
898
      queue = _reading
217✔
899

900
    else
901
      error("TLS/SSL handshake failed: " .. tostring(err))
×
902
    end
903

904
    coroutine_yield(nskt, queue)
217✔
905
  until false
217✔
906
end
907

908
-- flushes a client write buffer (deprecated)
909
function copas.flush()
218✔
910
end
911

912
-- wraps a TCP socket to use Copas methods (send, receive, flush and settimeout)
913
local _skt_mt_tcp = {
218✔
914
      __tostring = function(self)
915
        return tostring(self.socket).." (copas wrapped)"
21✔
916
      end,
917

918
      __index = {
218✔
919
        send = function (self, data, from, to)
920
          return copas.send (self.socket, data, from, to)
17,751✔
921
        end,
922

923
        receive = function (self, pattern, prefix)
924
          if user_timeouts_receive[self.socket] == 0 then
2,532,471✔
925
            return copas.receivepartial(self.socket, pattern, prefix)
14✔
926
          end
927
          return copas.receive(self.socket, pattern, prefix)
2,532,455✔
928
        end,
929

930
        receivepartial = function (self, pattern, prefix)
931
          return copas.receivepartial(self.socket, pattern, prefix)
×
932
        end,
933

934
        flush = function (self)
935
          return copas.flush(self.socket)
×
936
        end,
937

938
        settimeout = function (self, time)
939
          return copas.settimeout(self.socket, time)
231✔
940
        end,
941

942
        settimeouts = function (self, connect, send, receive)
943
          return copas.settimeouts(self.socket, connect, send, receive)
×
944
        end,
945

946
        -- TODO: socket.connect is a shortcut, and must be provided with an alternative
947
        -- if ssl parameters are available, it will also include a handshake
948
        connect = function(self, ...)
949
          local res, err = copas.connect(self.socket, ...)
245✔
950
          if res then
245✔
951
            if self.ssl_params.sni then self:sni() end
224✔
952
            if self.ssl_params.wrap then res, err = self:dohandshake() end
250✔
953
          end
954
          return res, err
238✔
955
        end,
956

957
        close = function(self, ...)
958
          return copas.close(self.socket, ...)
259✔
959
        end,
960

961
        -- TODO: socket.bind is a shortcut, and must be provided with an alternative
962
        bind = function(self, ...) return self.socket:bind(...) end,
218✔
963

964
        -- TODO: is this DNS related? hence blocking?
965
        getsockname = function(self, ...)
966
          local ok, ip, port, family = pcall(self.socket.getsockname, self.socket, ...)
×
967
          if ok then
×
968
            return ip, port, family
×
969
          else
970
            return nil, "not implemented by LuaSec"
×
971
          end
972
        end,
973

974
        getstats = function(self, ...) return self.socket:getstats(...) end,
218✔
975

976
        setstats = function(self, ...) return self.socket:setstats(...) end,
218✔
977

978
        listen = function(self, ...) return self.socket:listen(...) end,
218✔
979

980
        accept = function(self, ...) return self.socket:accept(...) end,
218✔
981

982
        setoption = function(self, ...)
983
          local ok, res, err = pcall(self.socket.setoption, self.socket, ...)
×
984
          if ok then
×
985
            return res, err
×
986
          else
987
            return nil, "not implemented by LuaSec"
×
988
          end
989
        end,
990

991
        getoption = function(self, ...)
992
          local ok, val, err = pcall(self.socket.getoption, self.socket, ...)
×
993
          if ok then
×
994
            return val, err
×
995
          else
996
            return nil, "not implemented by LuaSec"
×
997
          end
998
        end,
999

1000
        -- TODO: is this DNS related? hence blocking?
1001
        getpeername = function(self, ...)
1002
          local ok, ip, port, family = pcall(self.socket.getpeername, self.socket, ...)
×
1003
          if ok then
×
1004
            return ip, port, family
×
1005
          else
1006
            return nil, "not implemented by LuaSec"
×
1007
          end
1008
        end,
1009

1010
        shutdown = function(self, ...) return self.socket:shutdown(...) end,
218✔
1011

1012
        sni = function(self, names, strict)
1013
          local sslp = self.ssl_params
98✔
1014
          self.socket = ssl_wrap(self.socket, sslp.wrap)
126✔
1015
          if names == nil then
98✔
1016
            names = sslp.sni.names
84✔
1017
            strict = sslp.sni.strict
84✔
1018
          end
1019
          return self.socket:sni(names, strict)
98✔
1020
        end,
1021

1022
        dohandshake = function(self, wrap_params)
1023
          local nskt, err = copas.dohandshake(self.socket, wrap_params or self.ssl_params.wrap)
126✔
1024
          if not nskt then return nskt, err end
112✔
1025
          self.socket = nskt  -- replace internal socket with the newly wrapped ssl one
112✔
1026
          return self
112✔
1027
        end,
1028

1029
        getalpn = function(self, ...)
1030
          local ok, proto, err = pcall(self.socket.getalpn, self.socket, ...)
×
1031
          if ok then
×
1032
            return proto, err
×
1033
          else
1034
            return nil, "not a tls socket"
×
1035
          end
1036
        end,
1037

1038
        getsniname = function(self, ...)
1039
          local ok, name, err = pcall(self.socket.getsniname, self.socket, ...)
×
1040
          if ok then
×
1041
            return name, err
×
1042
          else
1043
            return nil, "not a tls socket"
×
1044
          end
1045
        end,
1046
      }
218✔
1047
}
1048

1049
-- wraps a UDP socket, copy of TCP one adapted for UDP.
1050
local _skt_mt_udp = {__index = { }}
218✔
1051
for k,v in pairs(_skt_mt_tcp) do _skt_mt_udp[k] = _skt_mt_udp[k] or v end
654✔
1052
for k,v in pairs(_skt_mt_tcp.__index) do _skt_mt_udp.__index[k] = v end
5,014✔
1053

1054
_skt_mt_udp.__index.send        = function(self, ...) return self.socket:send(...) end
225✔
1055

1056
_skt_mt_udp.__index.sendto      = function(self, ...) return self.socket:sendto(...) end
239✔
1057

1058

1059
_skt_mt_udp.__index.receive =     function (self, size)
218✔
1060
                                    return copas.receive (self.socket, (size or UDP_DATAGRAM_MAX))
14✔
1061
                                  end
1062

1063
_skt_mt_udp.__index.receivefrom = function (self, size)
218✔
1064
                                    return copas.receivefrom (self.socket, (size or UDP_DATAGRAM_MAX))
28✔
1065
                                  end
1066

1067
                                  -- TODO: is this DNS related? hence blocking?
1068
_skt_mt_udp.__index.setpeername = function(self, ...) return self.socket:setpeername(...) end
225✔
1069

1070
_skt_mt_udp.__index.setsockname = function(self, ...) return self.socket:setsockname(...) end
218✔
1071

1072
                                    -- do not close client, as it is also the server for udp.
1073
_skt_mt_udp.__index.close       = function(self, ...) return true end
232✔
1074

1075
_skt_mt_udp.__index.settimeouts = function (self, connect, send, receive)
218✔
1076
                                    return copas.settimeouts(self.socket, connect, send, receive)
×
1077
                                  end
1078

1079

1080

1081
---
1082
-- Wraps a LuaSocket socket object in an async Copas based socket object.
1083
-- @param skt The socket to wrap
1084
-- @sslt (optional) Table with ssl parameters, use an empty table to use ssl with defaults
1085
-- @return wrapped socket object
1086
function copas.wrap (skt, sslt)
218✔
1087
  if (getmetatable(skt) == _skt_mt_tcp) or (getmetatable(skt) == _skt_mt_udp) then
420✔
1088
    return skt -- already wrapped
×
1089
  end
1090

1091
  skt:settimeout(0)
424✔
1092

1093
  if isTCP(skt) then
540✔
1094
    return setmetatable ({socket = skt, ssl_params = normalize_sslt(sslt)}, _skt_mt_tcp)
504✔
1095
  else
1096
    return setmetatable ({socket = skt}, _skt_mt_udp)
28✔
1097
  end
1098
end
1099

1100
--- Wraps a handler in a function that deals with wrapping the socket and doing the
1101
-- optional ssl handshake.
1102
function copas.handler(handler, sslparams)
218✔
1103
  -- TODO: pass a timeout value to set, and use during handshake
1104
  return function (skt, ...)
1105
    skt = copas.wrap(skt, sslparams) -- this call will normalize the sslparams table
144✔
1106
    local sslp = skt.ssl_params
112✔
1107
    if sslp.sni then skt:sni(sslp.sni.names, sslp.sni.strict) end
112✔
1108
    if sslp.wrap then skt:dohandshake(sslp.wrap) end
112✔
1109
    return handler(skt, ...)
105✔
1110
  end
1111
end
1112

1113

1114
--------------------------------------------------
1115
-- Error handling
1116
--------------------------------------------------
1117

1118
local _errhandlers = setmetatable({}, { __mode = "k" })   -- error handler per coroutine
218✔
1119

1120

1121
function copas.gettraceback(msg, co, skt)
218✔
1122
  local co_str = co == nil and "nil" or copas.getthreadname(co)
43✔
1123
  local skt_str = skt == nil and "nil" or copas.getsocketname(skt)
43✔
1124
  local msg_str = msg == nil and "" or tostring(msg)
43✔
1125
  if msg_str == "" then
43✔
1126
    msg_str = ("(coroutine: %s, socket: %s)"):format(msg_str, co_str, skt_str)
×
1127
  else
1128
    msg_str = ("%s (coroutine: %s, socket: %s)"):format(msg_str, co_str, skt_str)
43✔
1129
  end
1130

1131
  if type(co) == "thread" then
43✔
1132
    -- regular Copas coroutine
1133
    return debug.traceback(co, msg_str)
43✔
1134
  end
1135
  -- not a coroutine, but the main thread, this happens if a timeout callback
1136
  -- (see `copas.timeout` causes an error (those callbacks run on the main thread).
1137
  return debug.traceback(msg_str, 2)
×
1138
end
1139

1140

1141
local function _deferror(msg, co, skt)
1142
  print(copas.gettraceback(msg, co, skt))
35✔
1143
end
1144

1145

1146
function copas.seterrorhandler(err, default)
218✔
1147
  assert(err == nil or type(err) == "function", "Expected the handler to be a function, or nil")
70✔
1148
  if default then
70✔
1149
    assert(err ~= nil, "Expected the handler to be a function when setting the default")
49✔
1150
    _deferror = err
49✔
1151
  else
1152
    _errhandlers[coroutine_running()] = err
21✔
1153
  end
1154
end
1155
copas.setErrorHandler = copas.seterrorhandler  -- deprecated; old casing
218✔
1156

1157

1158
function copas.geterrorhandler(co)
218✔
1159
  co = co or coroutine_running()
14✔
1160
  return _errhandlers[co] or _deferror
14✔
1161
end
1162

1163

1164
-- if `bool` is truthy, then the original socket errors will be returned in case of timeouts;
1165
-- `timeout, wantread, wantwrite, Operation already in progress`. If falsy, it will always
1166
-- return `timeout`.
1167
function copas.useSocketTimeoutErrors(bool)
218✔
1168
  useSocketTimeoutErrors[coroutine_running()] = not not bool -- force to a boolean
7✔
1169
end
1170

1171
-------------------------------------------------------------------------------
1172
-- Thread handling
1173
-------------------------------------------------------------------------------
1174

1175
local function _doTick (co, skt, ...)
1176
  if not co then return end
290,400✔
1177

1178
  -- if a coroutine was canceled/removed, don't resume it
1179
  if _canceled[co] then
290,400✔
1180
    _canceled[co] = nil -- also clean up the registry
28✔
1181
    _threads[co] = nil
28✔
1182
    return
28✔
1183
  end
1184

1185
  -- res: the socket (being read/write on) or the time to sleep
1186
  -- new_q: either _writing, _reading, or _sleeping
1187
  -- local time_before = gettime()
1188
  local ok, res, new_q = coroutine_resume(co, skt, ...)
290,372✔
1189
  -- local duration = gettime() - time_before
1190
  -- if duration > 1 then
1191
  --   duration = math.floor(duration * 1000)
1192
  --   pcall(_errhandlers[co] or _deferror, "task ran for "..tostring(duration).." milliseconds.", co, skt)
1193
  -- end
1194

1195
  if new_q == _reading or new_q == _writing or new_q == _sleeping then
290,365✔
1196
    -- we're yielding to a new queue
1197
    new_q:insert (res)
284,281✔
1198
    new_q:push (res, co)
284,281✔
1199
    return
284,281✔
1200
  end
1201

1202
  -- coroutine is terminating
1203

1204
  if ok and coroutine_status(co) ~= "dead" then
6,084✔
1205
    -- it called coroutine.yield from a non-Copas function which is unexpected
1206
    ok = false
7✔
1207
    res = "coroutine.yield was called without a resume first, user-code cannot yield to Copas"
7✔
1208
  end
1209

1210
  if not ok then
6,084✔
1211
    local k, e = pcall(_errhandlers[co] or _deferror, res, co, skt)
53✔
1212
    if not k then
53✔
1213
      print("Failed executing error handler: " .. tostring(e))
×
1214
    end
1215
  end
1216

1217
  local skt_to_close = _autoclose[co]
6,084✔
1218
  if skt_to_close then
6,084✔
1219
    skt_to_close:close()
140✔
1220
    _autoclose[co] = nil
140✔
1221
    _autoclose_r[skt_to_close] = nil
140✔
1222
  end
1223

1224
  _errhandlers[co] = nil
6,084✔
1225
end
1226

1227

1228
local _accept do
218✔
1229
  local client_counters = setmetatable({}, { __mode = "k" })
218✔
1230

1231
  -- accepts a connection on socket input
1232
  function _accept(server_skt, handler)
188✔
1233
    local client_skt = server_skt:accept()
147✔
1234
    if client_skt then
147✔
1235
      local count = (client_counters[server_skt] or 0) + 1
147✔
1236
      client_counters[server_skt] = count
147✔
1237
      object_names[client_skt] = object_names[server_skt] .. ":client_" .. count
175✔
1238

1239
      client_skt:settimeout(0)
147✔
1240
      copas.settimeouts(client_skt, user_timeouts_connect[server_skt],  -- copy server socket timeout settings
294✔
1241
        user_timeouts_send[server_skt], user_timeouts_receive[server_skt])
175✔
1242

1243
      local co = coroutine_create(handler)
147✔
1244
      object_names[co] = object_names[server_skt] .. ":handler_" .. count
147✔
1245

1246
      if copas.autoclose then
147✔
1247
        _autoclose[co] = client_skt
147✔
1248
        _autoclose_r[client_skt] = co
147✔
1249
      end
1250

1251
      _doTick(co, client_skt)
147✔
1252
    end
1253
  end
1254
end
1255

1256
-------------------------------------------------------------------------------
1257
-- Adds a server/handler pair to Copas dispatcher
1258
-------------------------------------------------------------------------------
1259

1260
do
1261
  local function addTCPserver(server, handler, timeout, name)
1262
    server:settimeout(0)
112✔
1263
    if name then
112✔
1264
      object_names[server] = name
×
1265
    end
1266
    _servers[server] = handler
112✔
1267
    _reading:insert(server)
112✔
1268
    if timeout then
112✔
1269
      copas.settimeout(server, timeout)
21✔
1270
    end
1271
  end
1272

1273
  local function addUDPserver(server, handler, timeout, name)
1274
    server:settimeout(0)
×
1275
    local co = coroutine_create(handler)
×
1276
    if name then
×
1277
      object_names[server] = name
×
1278
    end
1279
    object_names[co] = object_names[server]..":handler"
×
1280
    _reading:insert(server)
×
1281
    if timeout then
×
1282
      copas.settimeout(server, timeout)
×
1283
    end
1284
    _doTick(co, server)
×
1285
  end
1286

1287

1288
  function copas.addserver(server, handler, timeout, name)
218✔
1289
    if isTCP(server) then
144✔
1290
      addTCPserver(server, handler, timeout, name)
144✔
1291
    else
1292
      addUDPserver(server, handler, timeout, name)
×
1293
    end
1294
  end
1295
end
1296

1297

1298
function copas.removeserver(server, keep_open)
218✔
1299
  local skt = server
105✔
1300
  local mt = getmetatable(server)
105✔
1301
  if mt == _skt_mt_tcp or mt == _skt_mt_udp then
105✔
1302
    skt = server.socket
×
1303
  end
1304

1305
  _servers:remove(skt)
105✔
1306
  _reading:remove(skt)
105✔
1307

1308
  if keep_open then
105✔
1309
    return true
21✔
1310
  end
1311
  return server:close()
84✔
1312
end
1313

1314

1315

1316
-------------------------------------------------------------------------------
1317
-- Adds an new coroutine thread to Copas dispatcher
1318
-------------------------------------------------------------------------------
1319
function copas.addnamedthread(name, handler, ...)
218✔
1320
  if type(name) == "function" and type(handler) == "string" then
6,232✔
1321
    -- old call, flip args for compatibility
1322
    name, handler = handler, name
×
1323
  end
1324

1325
  -- create a coroutine that skips the first argument, which is always the socket
1326
  -- passed by the scheduler, but `nil` in case of a task/thread
1327
  local thread = coroutine_create(function(_, ...)
12,464✔
1328
    copas.pause()
6,232✔
1329
    return handler(...)
6,211✔
1330
  end)
1331
  if name then
6,232✔
1332
    object_names[thread] = name
534✔
1333
  end
1334

1335
  _threads[thread] = true -- register this thread so it can be removed
6,232✔
1336
  _doTick (thread, nil, ...)
6,232✔
1337
  return thread
6,232✔
1338
end
1339

1340

1341
function copas.addthread(handler, ...)
218✔
1342
  return copas.addnamedthread(nil, handler, ...)
5,621✔
1343
end
1344

1345

1346
function copas.removethread(thread)
218✔
1347
  -- if the specified coroutine is registered, add it to the canceled table so
1348
  -- that next time it tries to resume it exits.
1349
  _canceled[thread] = _threads[thread or 0]
63✔
1350
  _sleeping:cancel(thread)
63✔
1351
end
1352

1353

1354

1355
-------------------------------------------------------------------------------
1356
-- Sleep/pause management functions
1357
-------------------------------------------------------------------------------
1358

1359
-- yields the current coroutine and wakes it after 'sleeptime' seconds.
1360
-- If sleeptime < 0 then it sleeps until explicitly woken up using 'wakeup'
1361
-- TODO: deprecated, remove in next major
1362
function copas.sleep(sleeptime)
218✔
1363
  coroutine_yield((sleeptime or 0), _sleeping)
×
1364
end
1365

1366

1367
-- yields the current coroutine and wakes it after 'sleeptime' seconds.
1368
-- if sleeptime < 0 then it sleeps 0 seconds.
1369
function copas.pause(sleeptime)
218✔
1370
  local s = gettime()
279,157✔
1371
  if sleeptime and sleeptime > 0 then
279,157✔
1372
    coroutine_yield(sleeptime, _sleeping)
10,956✔
1373
  else
1374
    coroutine_yield(0, _sleeping)
270,615✔
1375
  end
1376
  return gettime() - s
278,890✔
1377
end
1378

1379

1380
-- yields the current coroutine until explicitly woken up using 'wakeup'
1381
function copas.pauseforever()
218✔
1382
  local s = gettime()
3,759✔
1383
  coroutine_yield(-1, _sleeping)
3,759✔
1384
  return gettime() - s
3,738✔
1385
end
1386

1387

1388
-- Wakes up a sleeping coroutine 'co'.
1389
function copas.wakeup(co)
218✔
1390
  _sleeping:wakeup(co)
3,773✔
1391
end
1392

1393

1394

1395
-------------------------------------------------------------------------------
1396
-- Timeout management
1397
-------------------------------------------------------------------------------
1398

1399
do
1400
  local timeout_register = setmetatable({}, { __mode = "k" })
218✔
1401
  local timerwheel = require("timerwheel").new({
438✔
1402
      now = gettime,
218✔
1403
      precision = TIMEOUT_PRECISION,
218✔
1404
      ringsize = math.floor(60*60*24/TIMEOUT_PRECISION),  -- ring size 1 day
218✔
1405
      err_handler = function(err)
1406
        return _deferror(err, core_timer_thread)
18✔
1407
      end,
1408
    })
1409

1410
  core_timer_thread = copas.addnamedthread("copas_core_timer", function()
436✔
1411
    while true do
1412
      copas.pause(TIMEOUT_PRECISION)
7,469✔
1413
      timerwheel:step()
9,371✔
1414
    end
1415
  end)
1416

1417
  -- get the number of timeouts running
1418
  function copas.gettimeouts()
218✔
1419
    return timerwheel:count()
3,141✔
1420
  end
1421

1422
  --- Sets the timeout for the current coroutine.
1423
  -- @param delay delay (seconds), use 0 (or math.huge) to cancel the timerout
1424
  -- @param callback function with signature: `function(coroutine)` where coroutine is the routine that timed-out
1425
  -- @return true
1426
  function copas.timeout(delay, callback)
218✔
1427
    local co = coroutine_running()
5,107,010✔
1428
    local existing_timer = timeout_register[co]
5,107,010✔
1429

1430
    if existing_timer then
5,107,010✔
1431
      timerwheel:cancel(existing_timer)
5,098✔
1432
    end
1433

1434
    if delay > 0 and delay ~= math.huge then
5,107,010✔
1435
      timeout_register[co] = timerwheel:set(delay, callback, co)
8,881✔
1436
    elseif delay == 0 or delay == math.huge then
5,100,116✔
1437
      timeout_register[co] = nil
5,100,116✔
1438
    else
1439
      error("timout value must be greater than or equal to 0, got: "..tostring(delay))
×
1440
    end
1441

1442
    return true
5,107,010✔
1443
  end
1444

1445
end
1446

1447

1448
-------------------------------------------------------------------------------
1449
-- main tasks: manage readable and writable socket sets
1450
-------------------------------------------------------------------------------
1451
-- a task is an object with a required method `step()` that deals with a
1452
-- single step for that task.
1453

1454
local _tasks = {} do
218✔
1455
  function _tasks:add(tsk)
218✔
1456
    _tasks[#_tasks + 1] = tsk
872✔
1457
  end
1458
end
1459

1460

1461
-- a task to check ready to read events
1462
local _readable_task = {} do
218✔
1463

1464
  _readable_task._events = {}
218✔
1465

1466
  local function tick(skt)
1467
    local handler = _servers[skt]
982✔
1468
    if handler then
982✔
1469
      _accept(skt, handler)
189✔
1470
    else
1471
      _reading:remove(skt)
835✔
1472
      _doTick(_reading:pop(skt), skt)
1,115✔
1473
    end
1474
  end
1475

1476
  function _readable_task:step()
218✔
1477
    for _, skt in ipairs(self._events) do
272,873✔
1478
      tick(skt)
982✔
1479
    end
1480
  end
1481

1482
  _tasks:add(_readable_task)
278✔
1483
end
1484

1485

1486
-- a task to check ready to write events
1487
local _writable_task = {} do
218✔
1488

1489
  _writable_task._events = {}
218✔
1490

1491
  local function tick(skt)
1492
    _writing:remove(skt)
439✔
1493
    _doTick(_writing:pop(skt), skt)
566✔
1494
  end
1495

1496
  function _writable_task:step()
218✔
1497
    for _, skt in ipairs(self._events) do
272,325✔
1498
      tick(skt)
439✔
1499
    end
1500
  end
1501

1502
  _tasks:add(_writable_task)
278✔
1503
end
1504

1505

1506

1507
-- sleeping threads task
1508
local _sleeping_task = {} do
218✔
1509

1510
  function _sleeping_task:step()
218✔
1511
    local now = gettime()
271,886✔
1512

1513
    local co = _sleeping:pop(now)
271,886✔
1514
    while co do
280,184✔
1515
      -- we're pushing them to _resumable, since that list will be replaced before
1516
      -- executing. This prevents tasks running twice in a row with pause(0) for example.
1517
      -- So here we won't execute, but at _resumable step which is next
1518
      _resumable:push(co)
8,298✔
1519
      co = _sleeping:pop(now)
10,714✔
1520
    end
1521
  end
1522

1523
  _tasks:add(_sleeping_task)
218✔
1524
end
1525

1526

1527

1528
-- resumable threads task
1529
local _resumable_task = {} do
218✔
1530

1531
  function _resumable_task:step()
218✔
1532
    -- replace the resume list before iterating, so items placed in there
1533
    -- will indeed end up in the next copas step, not in this one, and not
1534
    -- create a loop
1535
    local resumelist = _resumable:clear_resumelist()
271,886✔
1536

1537
    for _, co in ipairs(resumelist) do
554,631✔
1538
      _doTick(co)
282,747✔
1539
    end
1540
  end
1541

1542
  _tasks:add(_resumable_task)
218✔
1543
end
1544

1545

1546
-------------------------------------------------------------------------------
1547
-- Checks for reads and writes on sockets
1548
-------------------------------------------------------------------------------
1549
local _select_plain do
218✔
1550

1551
  local last_cleansing = 0
218✔
1552
  local duration = function(t2, t1) return t2-t1 end
272,004✔
1553

1554
  if not socket then
218✔
1555
    -- socket module unavailable, switch to luasystem sleep
1556
    _select_plain = block_sleep
7✔
1557
  else
1558
    -- use socket.select to handle socket-io
1559
    _select_plain = function(timeout)
1560
      local err
1561
      local now = gettime()
271,786✔
1562

1563
      -- remove any closed sockets to prevent select from hanging on them
1564
      if _closed[1] then
271,786✔
1565
        for i, skt in ipairs(_closed) do
515✔
1566
          _closed[i] = { _reading:remove(skt), _writing:remove(skt) }
407✔
1567
        end
1568
      end
1569

1570
      _readable_task._events, _writable_task._events, err = socket.select(_reading, _writing, timeout)
271,786✔
1571
      local r_events, w_events = _readable_task._events, _writable_task._events
271,786✔
1572

1573
      -- inject closed sockets in readable/writeable task so they can error out properly
1574
      if _closed[1] then
271,786✔
1575
        for i, skts in ipairs(_closed) do
515✔
1576
          _closed[i] = nil
259✔
1577
          r_events[#r_events+1] = skts[1]
259✔
1578
          w_events[#w_events+1] = skts[2]
259✔
1579
        end
1580
      end
1581

1582
      if duration(now, last_cleansing) > WATCH_DOG_TIMEOUT then
391,493✔
1583
        last_cleansing = now
204✔
1584

1585
        -- Check all sockets selected for reading, and check how long they have been waiting
1586
        -- for data already, without select returning them as readable
1587
        for skt,time in pairs(_reading_log) do
204✔
1588
          if not r_events[skt] and duration(now, time) > WATCH_DOG_TIMEOUT then
×
1589
            -- This one timedout while waiting to become readable, so move
1590
            -- it in the readable list and try and read anyway, despite not
1591
            -- having been returned by select
1592
            _reading_log[skt] = nil
×
1593
            r_events[#r_events + 1] = skt
×
1594
            r_events[skt] = #r_events
×
1595
          end
1596
        end
1597

1598
        -- Do the same for writing
1599
        for skt,time in pairs(_writing_log) do
204✔
1600
          if not w_events[skt] and duration(now, time) > WATCH_DOG_TIMEOUT then
×
1601
            _writing_log[skt] = nil
×
1602
            w_events[#w_events + 1] = skt
×
1603
            w_events[skt] = #w_events
×
1604
          end
1605
        end
1606
      end
1607

1608
      if err == "timeout" and #r_events + #w_events > 0 then
271,786✔
1609
        return nil
7✔
1610
      else
1611
        return err
271,779✔
1612
      end
1613
    end
1614
  end
1615
end
1616

1617

1618

1619
-------------------------------------------------------------------------------
1620
-- Dispatcher loop step.
1621
-- Listen to client requests and handles them
1622
-- Returns false if no socket-data was handled, or true if there was data
1623
-- handled (or nil + error message)
1624
-------------------------------------------------------------------------------
1625

1626
local copas_stats
1627
local min_ever, max_ever
1628

1629
local _select = _select_plain
218✔
1630

1631
-- instrumented version of _select() to collect stats
1632
local _select_instrumented = function(timeout)
1633
  if copas_stats then
×
1634
    local step_duration = gettime() - copas_stats.step_start
×
1635
    copas_stats.duration_max = math.max(copas_stats.duration_max, step_duration)
×
1636
    copas_stats.duration_min = math.min(copas_stats.duration_min, step_duration)
×
1637
    copas_stats.duration_tot = copas_stats.duration_tot + step_duration
×
1638
    copas_stats.steps = copas_stats.steps + 1
×
1639
  else
1640
    copas_stats = {
×
1641
      duration_max = -1,
1642
      duration_min = 999999,
1643
      duration_tot = 0,
1644
      steps = 0,
1645
    }
1646
  end
1647

1648
  local err = _select_plain(timeout)
×
1649

1650
  local now = gettime()
×
1651
  copas_stats.time_start = copas_stats.time_start or now
×
1652
  copas_stats.step_start = now
×
1653

1654
  return err
×
1655
end
1656

1657

1658
function copas.step(timeout)
218✔
1659
  -- Need to wake up the select call in time for the next sleeping event
1660
  if not _resumable:done() then
391,628✔
1661
    timeout = 0
264,948✔
1662
  else
1663
    timeout = math.min(_sleeping:getnext(), timeout or math.huge)
8,969✔
1664
  end
1665

1666
  local err = _select(timeout)
271,891✔
1667

1668
  for _, tsk in ipairs(_tasks) do
1,359,438✔
1669
    tsk:step()
1,087,554✔
1670
  end
1671

1672
  if err then
271,884✔
1673
    if err == "timeout" then
270,652✔
1674
      if timeout + 0.01 > TIMEOUT_PRECISION and math.random(100) > 90 then
270,547✔
1675
        -- we were idle, so occasionally do a GC sweep to ensure lingering
1676
        -- sockets are closed, and we don't accidentally block the loop from
1677
        -- exiting
1678
        collectgarbage()
471✔
1679
      end
1680
      return false
270,547✔
1681
    end
1682
    return nil, err
105✔
1683
  end
1684

1685
  return true
1,232✔
1686
end
1687

1688

1689
-------------------------------------------------------------------------------
1690
-- Check whether there is something to do.
1691
-- returns false if there are no sockets for read/write nor tasks scheduled
1692
-- (which means Copas is in an empty spin)
1693
-------------------------------------------------------------------------------
1694
function copas.finished()
218✔
1695
  return #_reading == 0 and #_writing == 0 and _resumable:done() and _sleeping:done(copas.gettimeouts())
274,801✔
1696
end
1697

1698

1699
local resetexit do
218✔
1700
  local exit_semaphore, exiting
1701

1702
  function resetexit()
188✔
1703
    exit_semaphore = copas.semaphore.new(1, 0, math.huge)
512✔
1704
    exiting = false
358✔
1705
  end
1706

1707
  -- Signals tasks to exit. But only if they check for it. By calling `copas.exiting`
1708
  -- they can check if they should exit. Or by calling `copas.waitforexit` they can
1709
  -- wait until the exit signal is given.
1710
  function copas.exit()
218✔
1711
    if exiting then return end
351✔
1712
    exiting = true
351✔
1713
    exit_semaphore:destroy()
351✔
1714
  end
1715

1716
  -- returns whether Copas is in the process of exiting. Exit can be started by
1717
  -- calling `copas.exit()`.
1718
  function copas.exiting()
218✔
1719
    return exiting
695✔
1720
  end
1721

1722
  -- Pauses the current coroutine until Copas is exiting. To be used as an exit
1723
  -- signal for tasks that need to clean up before exiting.
1724
  function copas.waitforexit()
218✔
1725
    exit_semaphore:take(1)
14✔
1726
  end
1727
end
1728

1729

1730
--- Forcibly cancels all pending work and signals exit.
1731
-- Intended for test teardown only. Abandons all registered threads and sockets
1732
-- without giving them a chance to clean up. After this call copas.finished()
1733
-- will return true and the loop will exit. The module is left in a clean state
1734
-- ready for the next copas.loop() call.
1735
function copas.cancelall()
218✔
1736
  -- 1. clear resumable queue
1737
  _resumable:clear_resumelist()
×
1738

1739
  -- 2. drain sleeping heap
1740
  _sleeping:cancelall()
×
1741

1742
  -- 3. close and drain reading sockets
1743
  while _reading[1] do
×
1744
    copas.close(_reading[1])
×
1745
    _reading:remove(_reading[1])
×
1746
  end
1747

1748
  -- 4. close and drain writing sockets
1749
  while _writing[1] do
×
1750
    copas.close(_writing[1])
×
1751
    _writing:remove(_writing[1])
×
1752
  end
1753

1754
  -- 5. remove all servers
1755
  while _servers[1] do
×
1756
    copas.removeserver(_servers[1])
×
1757
  end
1758

1759
  -- 6. clear non-weak ancillary tables
1760
  _closed = {}
×
1761
  _reading_log = {}
×
1762
  _writing_log = {}
×
1763

1764
  -- 7. signal exit
1765
  copas.exit()
×
1766
end
1767

1768

1769
local _getstats do
218✔
1770
  local _getstats_instrumented, _getstats_plain
1771

1772

1773
  function _getstats_plain(enable)
188✔
1774
    -- this function gets hit if turned off, so turn on if true
1775
    if enable == true then
×
1776
      _select = _select_instrumented
×
1777
      _getstats = _getstats_instrumented
×
1778
      -- reset stats
1779
      min_ever = nil
×
1780
      max_ever = nil
×
1781
      copas_stats = nil
×
1782
    end
1783
    return {}
×
1784
  end
1785

1786

1787
  -- convert from seconds to millisecs, with microsec precision
1788
  local function useconds(t)
1789
    return math.floor((t * 1000000) + 0.5) / 1000
×
1790
  end
1791
  -- convert from seconds to seconds, with millisec precision
1792
  local function mseconds(t)
1793
    return math.floor((t * 1000) + 0.5) / 1000
×
1794
  end
1795

1796

1797
  function _getstats_instrumented(enable)
188✔
1798
    if enable == false then
×
1799
      _select = _select_plain
×
1800
      _getstats = _getstats_plain
×
1801
      -- instrumentation disabled, so switch to the plain implementation
1802
      return _getstats(enable)
×
1803
    end
1804
    if (not copas_stats) or (copas_stats.step == 0) then
×
1805
      return {}
×
1806
    end
1807
    local stats = copas_stats
×
1808
    copas_stats = nil
×
1809
    min_ever = math.min(min_ever or 9999999, stats.duration_min)
×
1810
    max_ever = math.max(max_ever or 0, stats.duration_max)
×
1811
    stats.duration_min_ever = min_ever
×
1812
    stats.duration_max_ever = max_ever
×
1813
    stats.duration_avg = stats.duration_tot / stats.steps
×
1814
    stats.step_start = nil
×
1815
    stats.time_end = gettime()
×
1816
    stats.time_tot = stats.time_end - stats.time_start
×
1817
    stats.time_avg = stats.time_tot / stats.steps
×
1818

1819
    stats.duration_avg = useconds(stats.duration_avg)
×
1820
    stats.duration_max = useconds(stats.duration_max)
×
1821
    stats.duration_max_ever = useconds(stats.duration_max_ever)
×
1822
    stats.duration_min = useconds(stats.duration_min)
×
1823
    stats.duration_min_ever = useconds(stats.duration_min_ever)
×
1824
    stats.duration_tot = useconds(stats.duration_tot)
×
1825
    stats.time_avg = useconds(stats.time_avg)
×
1826
    stats.time_start = mseconds(stats.time_start)
×
1827
    stats.time_end = mseconds(stats.time_end)
×
1828
    stats.time_tot = mseconds(stats.time_tot)
×
1829
    return stats
×
1830
  end
1831

1832
  _getstats = _getstats_plain
218✔
1833
end
1834

1835

1836
function copas.status(enable_stats)
218✔
1837
  local res = _getstats(enable_stats)
×
1838
  res.running = not not copas.running
×
1839
  res.timeout = copas.gettimeouts()
×
1840
  res.timer, res.inactive = _sleeping:status()
×
1841
  res.read = #_reading
×
1842
  res.write = #_writing
×
1843
  res.active = _resumable:count()
×
1844
  return res
×
1845
end
1846

1847

1848
-------------------------------------------------------------------------------
1849
-- Dispatcher endless loop.
1850
-- Listen to client requests and handles them forever
1851
-------------------------------------------------------------------------------
1852
function copas.loop(initializer, timeout)
278✔
1853
  if type(initializer) == "function" then
358✔
1854
    copas.addnamedthread("copas_initializer", initializer)
179✔
1855
  else
1856
    timeout = initializer or timeout
217✔
1857
  end
1858

1859
  resetexit()
358✔
1860
  copas.running = true
358✔
1861
  while true do
1862
    copas.step(timeout)
271,891✔
1863
    if copas.finished() then
391,619✔
1864
      if copas.exiting() then
889✔
1865
        break
198✔
1866
      end
1867
      copas.exit()
344✔
1868
    end
1869
  end
1870
  copas.running = false
351✔
1871
end
1872

1873

1874
-------------------------------------------------------------------------------
1875
-- Naming sockets and coroutines.
1876
-------------------------------------------------------------------------------
1877
do
1878
  local function realsocket(skt)
1879
    local mt = getmetatable(skt)
105✔
1880
    if mt == _skt_mt_tcp or mt == _skt_mt_udp then
105✔
1881
      return skt.socket
105✔
1882
    else
1883
      return skt
×
1884
    end
1885
  end
1886

1887

1888
  function copas.setsocketname(name, skt)
278✔
1889
    assert(type(name) == "string", "expected arg #1 to be a string")
105✔
1890
    skt = assert(realsocket(skt), "expected arg #2 to be a socket")
135✔
1891
    object_names[skt] = name
105✔
1892
  end
1893

1894

1895
  function copas.getsocketname(skt)
278✔
1896
    skt = assert(realsocket(skt), "expected arg #1 to be a socket")
×
1897
    return object_names[skt]
×
1898
  end
1899
end
1900

1901

1902
function copas.setthreadname(name, coro)
278✔
1903
  assert(type(name) == "string", "expected arg #1 to be a string")
70✔
1904
  coro = coro or coroutine_running()
70✔
1905
  assert(type(coro) == "thread", "expected arg #2 to be a coroutine or nil")
70✔
1906
  object_names[coro] = name
70✔
1907
end
1908

1909

1910
function copas.getthreadname(coro)
278✔
1911
  coro = coro or coroutine_running()
43✔
1912
  assert(type(coro) == "thread", "expected arg #1 to be a coroutine or nil")
43✔
1913
  return object_names[coro]
47✔
1914
end
1915

1916
-------------------------------------------------------------------------------
1917
-- Debug functionality.
1918
-------------------------------------------------------------------------------
1919
do
1920
  copas.debug = {}
218✔
1921

1922
  local log_core    -- if truthy, the core-timer will also be logged
1923
  local debug_log   -- function used as logger
1924

1925

1926
  local debug_yield = function(skt, queue)
1927
    local name = object_names[coroutine_running()]
5,276✔
1928

1929
    if log_core or name ~= "copas_core_timer" then
5,276✔
1930
      if queue == _sleeping then
5,252✔
1931
        debug_log("yielding '", name, "' to SLEEP for ", skt," seconds")
5,162✔
1932

1933
      elseif queue == _writing then
90✔
1934
        debug_log("yielding '", name, "' to WRITE on '", object_names[skt], "'")
18✔
1935

1936
      elseif queue == _reading then
76✔
1937
        debug_log("yielding '", name, "' to READ on '", object_names[skt], "'")
80✔
1938

1939
      else
1940
        debug_log("thread '", name, "' yielding to unexpected queue; ", tostring(queue), " (", type(queue), ")", debug.traceback())
×
1941
      end
1942
    end
1943

1944
    return coroutine.yield(skt, queue)
5,276✔
1945
  end
1946

1947

1948
  local debug_resume = function(coro, skt, ...)
1949
    local name = object_names[coro]
5,290✔
1950

1951
    if skt then
5,290✔
1952
      debug_log("resuming '", name, "' for socket '", object_names[skt], "'")
90✔
1953
    else
1954
      if log_core or name ~= "copas_core_timer" then
5,200✔
1955
        debug_log("resuming '", name, "'")
5,176✔
1956
      end
1957
    end
1958
    return coroutine.resume(coro, skt, ...)
5,290✔
1959
  end
1960

1961

1962
  local debug_create = function(f)
1963
    local f_wrapped = function(...)
1964
      local results = pack(f(...))
18✔
1965
      debug_log("exiting '", object_names[coroutine_running()], "'")
14✔
1966
      return unpack(results)
14✔
1967
    end
1968

1969
    return coroutine.create(f_wrapped)
14✔
1970
  end
1971

1972

1973
  debug_log = fnil
218✔
1974

1975

1976
  -- enables debug output for all coroutine operations.
1977
  function copas.debug.start(logger, core)
436✔
1978
    log_core = core
7✔
1979
    debug_log = logger or print
7✔
1980
    coroutine_yield = debug_yield
7✔
1981
    coroutine_resume = debug_resume
7✔
1982
    coroutine_create = debug_create
7✔
1983
  end
1984

1985

1986
  -- disables debug output for coroutine operations.
1987
  function copas.debug.stop()
436✔
1988
    debug_log = fnil
×
1989
    coroutine_yield = coroutine.yield
×
1990
    coroutine_resume = coroutine.resume
×
1991
    coroutine_create = coroutine.create
×
1992
  end
1993

1994
  do
1995
    local call_id = 0
218✔
1996

1997
    -- Description table of socket functions for debug output.
1998
    -- each socket function name has TWO entries;
1999
    -- 'name_in' and 'name_out', each being an array of names/descriptions of respectively
2000
    -- input parameters and return values.
2001
    -- If either table has a 'callback' key, then that is a function that will be called
2002
    -- with the parameters/return-values for further inspection.
2003
    local args = {
218✔
2004
      settimeout_in = {
218✔
2005
        "socket ",
158✔
2006
        "seconds",
158✔
2007
        "mode   ",
2008
      },
218✔
2009
      settimeout_out = {
218✔
2010
        "success",
158✔
2011
        "error  ",
2012
      },
218✔
2013
      connect_in = {
218✔
2014
        "socket ",
158✔
2015
        "address",
158✔
2016
        "port   ",
2017
      },
218✔
2018
      connect_out = {
218✔
2019
        "success",
158✔
2020
        "error  ",
2021
      },
218✔
2022
      getfd_in = {
218✔
2023
        "socket ",
2024
        -- callback = function(...)
2025
        --   print(debug.traceback("called from:", 4))
2026
        -- end,
2027
      },
218✔
2028
      getfd_out = {
218✔
2029
        "fd",
2030
      },
218✔
2031
      send_in = {
218✔
2032
        "socket   ",
158✔
2033
        "data     ",
158✔
2034
        "idx-start",
158✔
2035
        "idx-end  ",
2036
      },
218✔
2037
      send_out = {
218✔
2038
        "last-idx-send    ",
158✔
2039
        "error            ",
158✔
2040
        "err-last-idx-send",
2041
      },
218✔
2042
      receive_in = {
218✔
2043
        "socket ",
158✔
2044
        "pattern",
158✔
2045
        "prefix ",
2046
      },
218✔
2047
      receive_out = {
218✔
2048
        "received    ",
158✔
2049
        "error       ",
158✔
2050
        "partial data",
2051
      },
218✔
2052
      dirty_in = {
218✔
2053
        "socket",
2054
        -- callback = function(...)
2055
        --   print(debug.traceback("called from:", 4))
2056
        -- end,
2057
      },
218✔
2058
      dirty_out = {
218✔
2059
        "data in read-buffer",
2060
      },
218✔
2061
      close_in = {
218✔
2062
        "socket",
2063
        -- callback = function(...)
2064
        --   print(debug.traceback("called from:", 4))
2065
        -- end,
2066
      },
218✔
2067
      close_out = {
218✔
2068
        "success",
158✔
2069
        "error",
2070
      },
218✔
2071
    }
2072
    local function print_call(func, msg, ...)
2073
      print(msg)
686✔
2074
      local arg = pack(...)
686✔
2075
      local desc = args[func] or {}
686✔
2076
      for i = 1, math.max(arg.n, #desc) do
1,540✔
2077
        local value = arg[i]
854✔
2078
        if type(value) == "string" then
854✔
2079
          local xvalue = value:sub(1,30)
42✔
2080
          if xvalue ~= value then
42✔
2081
            xvalue = xvalue .."(...truncated)"
×
2082
          end
2083
          print("\t"..(desc[i] or i)..": '"..tostring(xvalue).."' ("..type(value).." #"..#value..")")
42✔
2084
        else
2085
          print("\t"..(desc[i] or i)..": '"..tostring(value).."' ("..type(value)..")")
812✔
2086
        end
2087
      end
2088
      if desc.callback then
686✔
2089
        desc.callback(...)
×
2090
      end
2091
    end
2092

2093
    local debug_mt = {
218✔
2094
      __index = function(self, key)
2095
        local value = self.__original_socket[key]
343✔
2096
        if type(value) ~= "function" then
343✔
2097
          return value
×
2098
        end
2099
        return function(self2, ...)
2100
            local my_id = call_id + 1
343✔
2101
            call_id = my_id
343✔
2102
            local results
2103

2104
            if self2 ~= self then
343✔
2105
              -- there is no self
2106
              print_call(tostring(key).."_in", my_id .. "-calling '"..tostring(key) .. "' with; ", self, ...)
×
2107
              results = pack(value(self, ...))
×
2108
            else
2109
              print_call(tostring(key).."_in", my_id .. "-calling '" .. tostring(key) .. "' with; ", self.__original_socket, ...)
343✔
2110
              results = pack(value(self.__original_socket, ...))
532✔
2111
            end
2112
            print_call(tostring(key).."_out", my_id .. "-results '"..tostring(key) .. "' returned; ", unpack(results))
532✔
2113
            return unpack(results)
343✔
2114
          end
2115
      end,
2116
      __tostring = function(self)
2117
        return tostring(self.__original_socket)
56✔
2118
      end
2119
    }
2120

2121

2122
    -- wraps a socket (copas or luasocket) in a debug version printing all calls
2123
    -- and their parameters/return values. Extremely noisy!
2124
    -- returns the wrapped socket.
2125
    -- NOTE: only for plain sockets, will not support TLS
2126
    function copas.debug.socket(original_skt)
436✔
2127
      if (getmetatable(original_skt) == _skt_mt_tcp) or (getmetatable(original_skt) == _skt_mt_udp) then
14✔
2128
        -- already wrapped as Copas socket, so recurse with the original luasocket one
2129
        original_skt.socket = copas.debug.socket(original_skt.socket)
×
2130
        return original_skt
×
2131
      end
2132

2133
      local proxy = setmetatable({
28✔
2134
        __original_socket = original_skt
14✔
2135
      }, debug_mt)
14✔
2136

2137
      return proxy
14✔
2138
    end
2139
  end
2140
end
2141

2142

2143
return copas
218✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc