• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarmodules / copas / 10159170494

30 Jul 2024 08:47AM UTC coverage: 84.343%. Remained the same
10159170494

push

github

web-flow
Merge c401387df into 20f1cf71d

1309 of 1552 relevant lines covered (84.34%)

51445.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.04
/src/copas.lua
1
-------------------------------------------------------------------------------
2
-- Copas - Coroutine Oriented Portable Asynchronous Services
3
--
4
-- A dispatcher based on coroutines that can be used by TCP/IP servers.
5
-- Uses LuaSocket as the interface with the TCP/IP stack.
6
--
7
-- Authors: Andre Carregal, Javier Guerra, and Fabio Mascarenhas
8
-- Contributors: Diego Nehab, Mike Pall, David Burgess, Leonardo Godinho,
9
--               Thomas Harning Jr., and Gary NG
10
--
11
-- Copyright 2005-2013 - Kepler Project (www.keplerproject.org), 2015-2024 Thijs Schreijer
12
--
13
-- $Id: copas.lua,v 1.37 2009/04/07 22:09:52 carregal Exp $
14
-------------------------------------------------------------------------------
15

16
if package.loaded["socket.http"] and (_VERSION=="Lua 5.1") then     -- obsolete: only for Lua 5.1 compatibility
151✔
17
  error("you must require copas before require'ing socket.http")
×
18
end
19
if package.loaded["copas.http"] and (_VERSION=="Lua 5.1") then     -- obsolete: only for Lua 5.1 compatibility
151✔
20
  error("you must require copas before require'ing copas.http")
×
21
end
22

23
-- load either LuaSocket, or LuaSystem
24
local socket, system do
151✔
25
  if pcall(require, "socket") then
152✔
26
    -- found LuaSocket
27
    socket = require "socket"
146✔
28
  else
29
    -- fallback to LuaSystem
30
    if pcall(require, "system") then
6✔
31
      system = require "system"
6✔
32
    else
33
      error("Neither LuaSocket nor LuaSystem found, Copas requires at least one of them")
×
34
    end
35
  end
36
end
37

38
local binaryheap = require "binaryheap"
151✔
39
local gettime = (socket or system).gettime
151✔
40
local ssl -- only loaded upon demand
41

42
local WATCH_DOG_TIMEOUT = 120
151✔
43
local UDP_DATAGRAM_MAX = (socket or {})._DATAGRAMSIZE or 8192
151✔
44
local TIMEOUT_PRECISION = 0.1  -- 100ms
151✔
45
local fnil = function() end
183,107✔
46

47

48
local coroutine_create = coroutine.create
151✔
49
local coroutine_running = coroutine.running
151✔
50
local coroutine_yield = coroutine.yield
151✔
51
local coroutine_resume = coroutine.resume
151✔
52
local coroutine_status = coroutine.status
151✔
53

54

55
-- nil-safe versions for pack/unpack
56
local _unpack = unpack or table.unpack
151✔
57
local unpack = function(t, i, j) return _unpack(t, i or 1, j or t.n or #t) end
401✔
58
local pack = function(...) return { n = select("#", ...), ...} end
521✔
59

60

61
local pcall = pcall
151✔
62
if _VERSION=="Lua 5.1" and not jit then     -- obsolete: only for Lua 5.1 compatibility
151✔
63
  pcall = require("coxpcall").pcall
29✔
64
  coroutine_running = require("coxpcall").running
29✔
65
end
66

67

68
if socket then
151✔
69
  -- Redefines LuaSocket functions with coroutine safe versions (pure Lua)
70
  -- (this allows the use of socket.http from within copas)
71
  local err_mt = {
146✔
72
    __tostring = function (self)
73
      return "Copas 'try' error intermediate table: '"..tostring(self[1].."'")
×
74
    end,
75
  }
76

77
  local function statusHandler(status, ...)
78
    if status then return ... end
70✔
79
    local err = (...)
25✔
80
    if type(err) == "table" and getmetatable(err) == err_mt then
25✔
81
      return nil, err[1]
25✔
82
    else
83
      error(err)
×
84
    end
85
  end
86

87
  function socket.protect(func)
146✔
88
    return function (...)
89
            return statusHandler(pcall(func, ...))
84✔
90
          end
91
  end
92

93
  function socket.newtry(finalizer)
146✔
94
    return function (...)
95
            local status = (...)
1,095✔
96
            if not status then
1,095✔
97
              pcall(finalizer or fnil, select(2, ...))
25✔
98
              error(setmetatable({ (select(2, ...)) }, err_mt), 0)
25✔
99
            end
100
            return ...
1,070✔
101
          end
102
  end
103

104
  socket.try = socket.newtry()
174✔
105
end
106

107

108
-- Setup the Copas meta table to auto-load submodules and define a default method
109
local copas do
151✔
110
  local submodules = { "ftp", "http", "lock", "queue", "semaphore", "smtp", "timer" }
151✔
111
  for i, key in ipairs(submodules) do
1,208✔
112
    submodules[key] = true
1,057✔
113
    submodules[i] = nil
1,057✔
114
  end
115

116
  copas = setmetatable({},{
302✔
117
    __index = function(self, key)
118
      if submodules[key] then
60✔
119
        self[key] = require("copas."..key)
61✔
120
        submodules[key] = nil
60✔
121
        return rawget(self, key)
60✔
122
      end
123
    end,
124
    __call = function(self, ...)
125
      return self.loop(...)
5✔
126
    end,
127
  })
151✔
128
end
129

130

131
-- Meta information is public even if beginning with an "_"
132
copas._COPYRIGHT   = "Copyright (C) 2005-2013 Kepler Project, 2015-2024 Thijs Schreijer"
151✔
133
copas._DESCRIPTION = "Coroutine Oriented Portable Asynchronous Services"
151✔
134
copas._VERSION     = "Copas 4.7.1"
151✔
135

136
-- Close the socket associated with the current connection after the handler finishes
137
copas.autoclose = true
151✔
138

139
-- indicator for the loop running
140
copas.running = false
151✔
141

142
-- gettime method from either LuaSocket or LuaSystem: time in (fractional) seconds, since epoch.
143
copas.gettime = gettime
151✔
144

145
-------------------------------------------------------------------------------
146
-- Object names, to track names of thread/coroutines and sockets
147
-------------------------------------------------------------------------------
148
local object_names = setmetatable({}, {
302✔
149
  __mode = "k",
122✔
150
  __index = function(self, key)
151
    local name = tostring(key)
145✔
152
    if key ~= nil then
145✔
153
      rawset(self, key, name)
145✔
154
    end
155
    return name
145✔
156
  end
157
})
158

159
-------------------------------------------------------------------------------
160
-- Simple set implementation
161
-- adds a FIFO queue for each socket in the set
162
-------------------------------------------------------------------------------
163

164
local function newsocketset()
165
  local set = {}
453✔
166

167
  do  -- set implementation
168
    local reverse = {}
453✔
169

170
    -- Adds a socket to the set, does nothing if it exists
171
    -- @return skt if added, or nil if it existed
172
    function set:insert(skt)
453✔
173
      if not reverse[skt] then
892✔
174
        self[#self + 1] = skt
892✔
175
        reverse[skt] = #self
892✔
176
        return skt
892✔
177
      end
178
    end
179

180
    -- Removes socket from the set, does nothing if not found
181
    -- @return skt if removed, or nil if it wasn't in the set
182
    function set:remove(skt)
453✔
183
      local index = reverse[skt]
1,287✔
184
      if index then
1,287✔
185
        reverse[skt] = nil
887✔
186
        local top = self[#self]
887✔
187
        self[#self] = nil
887✔
188
        if top ~= skt then
887✔
189
          reverse[top] = index
78✔
190
          self[index] = top
78✔
191
        end
192
        return skt
887✔
193
      end
194
    end
195

196
  end
197

198
  do  -- queues implementation
199
    local fifo_queues = setmetatable({},{
906✔
200
      __mode = "k",                 -- auto collect queue if socket is gone
366✔
201
      __index = function(self, skt) -- auto create fifo queue if not found
202
        local newfifo = {}
329✔
203
        self[skt] = newfifo
329✔
204
        return newfifo
329✔
205
      end,
206
    })
207

208
    -- pushes an item in the fifo queue for the socket.
209
    function set:push(skt, itm)
453✔
210
      local queue = fifo_queues[skt]
817✔
211
      queue[#queue + 1] = itm
817✔
212
    end
213

214
    -- pops an item from the fifo queue for the socket
215
    function set:pop(skt)
453✔
216
      local queue = fifo_queues[skt]
757✔
217
      return table.remove(queue, 1)
757✔
218
    end
219

220
  end
221

222
  return set
453✔
223
end
224

225

226

227
-- Threads immediately resumable
228
local _resumable = {} do
151✔
229
  local resumelist = {}
151✔
230

231
  function _resumable:push(co)
151✔
232
    resumelist[#resumelist + 1] = co
182,838✔
233
  end
234

235
  function _resumable:clear_resumelist()
151✔
236
    local lst = resumelist
175,121✔
237
    resumelist = {}
175,121✔
238
    return lst
175,121✔
239
  end
240

241
  function _resumable:done()
151✔
242
    return resumelist[1] == nil
177,580✔
243
  end
244

245
  function _resumable:count()
151✔
246
    return #resumelist + #_resumable
×
247
  end
248

249
end
250

251

252

253
-- Similar to the socket set above, but tailored for the use of
254
-- sleeping threads
255
local _sleeping = {} do
151✔
256

257
  local heap = binaryheap.minUnique()
151✔
258
  local lethargy = setmetatable({}, { __mode = "k" }) -- list of coroutines sleeping without a wakeup time
151✔
259

260

261
  -- Required base implementation
262
  -----------------------------------------
263
  _sleeping.insert = fnil
151✔
264
  _sleeping.remove = fnil
151✔
265

266
  -- push a new timer on the heap
267
  function _sleeping:push(sleeptime, co)
151✔
268
    if sleeptime < 0 then
182,951✔
269
      lethargy[co] = true
2,605✔
270
    elseif sleeptime == 0 then
180,346✔
271
      _resumable:push(co)
231,894✔
272
    else
273
      heap:insert(gettime() + sleeptime, co)
5,649✔
274
    end
275
  end
276

277
  -- find the thread that should wake up to the time, if any
278
  function _sleeping:pop(time)
151✔
279
    if time < (heap:peekValue() or math.huge) then
238,992✔
280
      return
175,121✔
281
    end
282
    return heap:pop()
5,481✔
283
  end
284

285
  -- additional methods for time management
286
  -----------------------------------------
287
  function _sleeping:getnext()  -- returns delay until next sleep expires, or nil if there is none
151✔
288
    local t = heap:peekValue()
4,474✔
289
    if t then
4,474✔
290
      -- never report less than 0, because select() might block
291
      return math.max(t - gettime(), 0)
4,474✔
292
    end
293
  end
294

295
  function _sleeping:wakeup(co)
151✔
296
    if lethargy[co] then
2,615✔
297
      lethargy[co] = nil
2,590✔
298
      _resumable:push(co)
2,590✔
299
      return
2,590✔
300
    end
301
    if heap:remove(co) then
30✔
302
      _resumable:push(co)
10✔
303
    end
304
  end
305

306
  function _sleeping:cancel(co)
151✔
307
    lethargy[co] = nil
35✔
308
    heap:remove(co)
35✔
309
  end
310

311
  -- @param tos number of timeouts running
312
  function _sleeping:done(tos)
151✔
313
    -- return true if we have nothing more to do
314
    -- the timeout task doesn't qualify as work (fallbacks only),
315
    -- the lethargy also doesn't qualify as work ('dead' tasks),
316
    -- but the combination of a timeout + a lethargy can be work
317
    return heap:size() == 1       -- 1 means only the timeout-timer task is running
2,193✔
318
           and not (tos > 0 and next(lethargy))
1,826✔
319
  end
320

321
  -- gets number of threads in binaryheap and lethargy
322
  function _sleeping:status()
151✔
323
    local c = 0
×
324
    for _ in pairs(lethargy) do c = c + 1 end
×
325

326
    return heap:size(), c
×
327
  end
328

329
end   -- _sleeping
330

331

332

333
-------------------------------------------------------------------------------
334
-- Tracking coroutines and sockets
335
-------------------------------------------------------------------------------
336

337
local _servers = newsocketset() -- servers being handled
151✔
338
local _threads = setmetatable({}, {__mode = "k"})  -- registered threads added with addthread()
151✔
339
local _canceled = setmetatable({}, {__mode = "k"}) -- threads that are canceled and pending removal
151✔
340
local _autoclose = setmetatable({}, {__mode = "kv"}) -- sockets (value) to close when a thread (key) exits
151✔
341
local _autoclose_r = setmetatable({}, {__mode = "kv"}) -- reverse: sockets (key) to close when a thread (value) exits
151✔
342

343

344
-- for each socket we log the last read and last write times to enable the
345
-- watchdog to follow up if it takes too long.
346
-- tables contain the time, indexed by the socket
347
local _reading_log = {}
151✔
348
local _writing_log = {}
151✔
349

350
local _closed = {} -- track sockets that have been closed (list/array)
151✔
351

352
local _reading = newsocketset() -- sockets currently being read
151✔
353
local _writing = newsocketset() -- sockets currently being written
151✔
354
local _isSocketTimeout = { -- set of errors indicating a socket-timeout
151✔
355
  ["timeout"] = true,      -- default LuaSocket timeout
122✔
356
  ["wantread"] = true,     -- LuaSec specific timeout
122✔
357
  ["wantwrite"] = true,    -- LuaSec specific timeout
122✔
358
}
359

360
-------------------------------------------------------------------------------
361
-- Coroutine based socket timeouts.
362
-------------------------------------------------------------------------------
363
local user_timeouts_connect
364
local user_timeouts_send
365
local user_timeouts_receive
366
do
367
  local timeout_mt = {
151✔
368
    __mode = "k",
122✔
369
    __index = function(self, skt)
370
      -- if there is no timeout found, we insert one automatically, to block forever
371
      self[skt] = math.huge
345✔
372
      return self[skt]
345✔
373
    end,
374
  }
375

376
  user_timeouts_connect = setmetatable({}, timeout_mt)
151✔
377
  user_timeouts_send = setmetatable({}, timeout_mt)
151✔
378
  user_timeouts_receive = setmetatable({}, timeout_mt)
151✔
379
end
380

381
local useSocketTimeoutErrors = setmetatable({},{ __mode = "k" })
151✔
382

383

384
-- sto = socket-time-out
385
local sto_timeout, sto_timed_out, sto_change_queue, sto_error do
151✔
386

387
  local socket_register = setmetatable({}, { __mode = "k" })    -- socket by coroutine
151✔
388
  local operation_register = setmetatable({}, { __mode = "k" }) -- operation "read"/"write" by coroutine
151✔
389
  local timeout_flags = setmetatable({}, { __mode = "k" })      -- true if timedout, by coroutine
151✔
390

391

392
  local function socket_callback(co)
393
    local skt = socket_register[co]
60✔
394
    local queue = operation_register[co]
60✔
395

396
    -- flag the timeout and resume the coroutine
397
    timeout_flags[co] = true
60✔
398
    _resumable:push(co)
60✔
399

400
    -- clear the socket from the current queue
401
    if queue == "read" then
60✔
402
      _reading:remove(skt)
60✔
403
    elseif queue == "write" then
10✔
404
      _writing:remove(skt)
12✔
405
    else
406
      error("bad queue name; expected 'read'/'write', got: "..tostring(queue))
×
407
    end
408
  end
409

410

411
  -- Sets a socket timeout.
412
  -- Calling it as `sto_timeout()` will cancel the timeout.
413
  -- @param queue (string) the queue the socket is currently in, must be either "read" or "write"
414
  -- @param skt (socket) the socket on which to operate
415
  -- @param use_connect_to (bool) timeout to use is determined based on queue (read/write) or if this
416
  -- is truthy, it is the connect timeout.
417
  -- @return true
418
  function sto_timeout(skt, queue, use_connect_to)
122✔
419
    local co = coroutine_running()
3,321,432✔
420
    socket_register[co] = skt
3,321,432✔
421
    operation_register[co] = queue
3,321,432✔
422
    timeout_flags[co] = nil
3,321,432✔
423
    if skt then
3,321,432✔
424
      local to = (use_connect_to and user_timeouts_connect[skt]) or
1,660,757✔
425
                 (queue == "read" and user_timeouts_receive[skt]) or
1,660,493✔
426
                 user_timeouts_send[skt]
12,298✔
427
      copas.timeout(to, socket_callback)
2,212,588✔
428
    else
429
      copas.timeout(0)
1,660,686✔
430
    end
431
    return true
3,321,432✔
432
  end
433

434

435
  -- Changes the timeout to a different queue (read/write).
436
  -- Only usefull with ssl-handshakes and "wantread", "wantwrite" errors, when
437
  -- the queue has to be changed, so the timeout handler knows where to find the socket.
438
  -- @param queue (string) the new queue the socket is in, must be either "read" or "write"
439
  -- @return true
440
  function sto_change_queue(queue)
122✔
441
    operation_register[coroutine_running()] = queue
697✔
442
    return true
697✔
443
  end
444

445

446
  -- Responds with `true` if the operation timed-out.
447
  function sto_timed_out()
122✔
448
    return timeout_flags[coroutine_running()]
877✔
449
  end
450

451

452
  -- Returns the proper timeout error
453
  function sto_error(err)
122✔
454
    return useSocketTimeoutErrors[coroutine_running()] and err or "timeout"
60✔
455
  end
456
end
457

458

459

460
-------------------------------------------------------------------------------
461
-- Coroutine based socket I/O functions.
462
-------------------------------------------------------------------------------
463

464
-- Returns "tcp"" for plain TCP and "ssl" for ssl-wrapped sockets, so truthy
465
-- for tcp based, and falsy for udp based.
466
local isTCP do
151✔
467
  local lookup = {
151✔
468
    tcp = "tcp",
122✔
469
    SSL = "ssl",
122✔
470
  }
471

472
  function isTCP(socket)
122✔
473
    return lookup[tostring(socket):sub(1,3)]
618✔
474
  end
475
end
476

477
function copas.close(skt, ...)
151✔
478
  _closed[#_closed+1] = skt
175✔
479
  return skt:close(...)
175✔
480
end
481

482

483

484
-- nil or negative is indefinitly
485
function copas.settimeout(skt, timeout)
151✔
486
  timeout = timeout or -1
170✔
487
  if type(timeout) ~= "number" then
170✔
488
    return nil, "timeout must be 'nil' or a number"
15✔
489
  end
490

491
  return copas.settimeouts(skt, timeout, timeout, timeout)
155✔
492
end
493

494
-- negative is indefinitly, nil means do not change
495
function copas.settimeouts(skt, connect, send, read)
151✔
496

497
  if connect ~= nil and type(connect) ~= "number" then
340✔
498
    return nil, "connect timeout must be 'nil' or a number"
×
499
  end
500
  if connect then
340✔
501
    if connect < 0 then
340✔
502
      connect = nil
×
503
    end
504
    user_timeouts_connect[skt] = connect
340✔
505
  end
506

507

508
  if send ~= nil and type(send) ~= "number" then
340✔
509
    return nil, "send timeout must be 'nil' or a number"
×
510
  end
511
  if send then
340✔
512
    if send < 0 then
340✔
513
      send = nil
×
514
    end
515
    user_timeouts_send[skt] = send
340✔
516
  end
517

518

519
  if read ~= nil and type(read) ~= "number" then
340✔
520
    return nil, "read timeout must be 'nil' or a number"
×
521
  end
522
  if read then
340✔
523
    if read < 0 then
340✔
524
      read = nil
×
525
    end
526
    user_timeouts_receive[skt] = read
340✔
527
  end
528

529

530
  return true
340✔
531
end
532

533
-- reads a pattern from a client and yields to the reading set on timeouts
534
-- UDP: a UDP socket expects a second argument to be a number, so it MUST
535
-- be provided as the 'pattern' below defaults to a string. Will throw a
536
-- 'bad argument' error if omitted.
537
function copas.receive(client, pattern, part)
151✔
538
  local s, err
539
  pattern = pattern or "*l"
1,648,163✔
540
  local current_log = _reading_log
1,648,163✔
541
  sto_timeout(client, "read")
1,648,163✔
542

543
  repeat
544
    s, err, part = client:receive(pattern, part)
1,648,589✔
545

546
    -- guarantees that high throughput doesn't take other threads to starvation
547
    if (math.random(100) > 90) then
1,648,589✔
548
      copas.pause()
164,867✔
549
    end
550

551
    if s then
1,648,589✔
552
      current_log[client] = nil
1,648,078✔
553
      sto_timeout()
1,648,078✔
554
      return s, err, part
1,648,078✔
555

556
    elseif not _isSocketTimeout[err] then
511✔
557
      current_log[client] = nil
40✔
558
      sto_timeout()
40✔
559
      return s, err, part
40✔
560

561
    elseif sto_timed_out() then
570✔
562
      current_log[client] = nil
45✔
563
      return nil, sto_error(err), part
54✔
564
    end
565

566
    if err == "wantwrite" then -- wantwrite may be returned during SSL renegotiations
426✔
567
      current_log = _writing_log
×
568
      current_log[client] = gettime()
×
569
      sto_change_queue("write")
×
570
      coroutine_yield(client, _writing)
×
571
    else
572
      current_log = _reading_log
426✔
573
      current_log[client] = gettime()
426✔
574
      sto_change_queue("read")
426✔
575
      coroutine_yield(client, _reading)
426✔
576
    end
577
  until false
426✔
578
end
579

580
-- receives data from a client over UDP. Not available for TCP.
581
-- (this is a copy of receive() method, adapted for receivefrom() use)
582
function copas.receivefrom(client, size)
151✔
583
  local s, err, port
584
  size = size or UDP_DATAGRAM_MAX
20✔
585
  sto_timeout(client, "read")
20✔
586

587
  repeat
588
    s, err, port = client:receivefrom(size) -- upon success err holds ip address
40✔
589

590
    -- garantees that high throughput doesn't take other threads to starvation
591
    if (math.random(100) > 90) then
40✔
592
      copas.pause()
2✔
593
    end
594

595
    if s then
40✔
596
      _reading_log[client] = nil
15✔
597
      sto_timeout()
15✔
598
      return s, err, port
15✔
599

600
    elseif err ~= "timeout" then
25✔
601
      _reading_log[client] = nil
×
602
      sto_timeout()
×
603
      return s, err, port
×
604

605
    elseif sto_timed_out() then
30✔
606
      _reading_log[client] = nil
5✔
607
      return nil, sto_error(err), port
6✔
608
    end
609

610
    _reading_log[client] = gettime()
20✔
611
    coroutine_yield(client, _reading)
20✔
612
  until false
20✔
613
end
614

615
-- same as above but with special treatment when reading chunks,
616
-- unblocks on any data received.
617
function copas.receivepartial(client, pattern, part)
151✔
618
  local s, err
619
  pattern = pattern or "*l"
10✔
620
  local orig_size = #(part or "")
10✔
621
  local current_log = _reading_log
10✔
622
  sto_timeout(client, "read")
10✔
623

624
  repeat
625
    s, err, part = client:receive(pattern, part)
10✔
626

627
    -- guarantees that high throughput doesn't take other threads to starvation
628
    if (math.random(100) > 90) then
10✔
629
      copas.pause()
×
630
    end
631

632
    if s or (type(part) == "string" and #part > orig_size) then
10✔
633
      current_log[client] = nil
10✔
634
      sto_timeout()
10✔
635
      return s, err, part
10✔
636

637
    elseif not _isSocketTimeout[err] then
×
638
      current_log[client] = nil
×
639
      sto_timeout()
×
640
      return s, err, part
×
641

642
    elseif sto_timed_out() then
×
643
      current_log[client] = nil
×
644
      return nil, sto_error(err), part
×
645
    end
646

647
    if err == "wantwrite" then
×
648
      current_log = _writing_log
×
649
      current_log[client] = gettime()
×
650
      sto_change_queue("write")
×
651
      coroutine_yield(client, _writing)
×
652
    else
653
      current_log = _reading_log
×
654
      current_log[client] = gettime()
×
655
      sto_change_queue("read")
×
656
      coroutine_yield(client, _reading)
×
657
    end
658
  until false
×
659
end
660
copas.receivePartial = copas.receivepartial  -- compat: receivePartial is deprecated
151✔
661

662
-- sends data to a client. The operation is buffered and
663
-- yields to the writing set on timeouts
664
-- Note: from and to parameters will be ignored by/for UDP sockets
665
function copas.send(client, data, from, to)
151✔
666
  local s, err
667
  from = from or 1
12,298✔
668
  local lastIndex = from - 1
12,298✔
669
  local current_log = _writing_log
12,298✔
670
  sto_timeout(client, "write")
12,298✔
671

672
  repeat
673
    s, err, lastIndex = client:send(data, lastIndex + 1, to)
12,444✔
674

675
    -- guarantees that high throughput doesn't take other threads to starvation
676
    if (math.random(100) > 90) then
12,444✔
677
      copas.pause()
1,198✔
678
    end
679

680
    if s then
12,444✔
681
      current_log[client] = nil
12,278✔
682
      sto_timeout()
12,278✔
683
      return s, err, lastIndex
12,278✔
684

685
    elseif not _isSocketTimeout[err] then
166✔
686
      current_log[client] = nil
20✔
687
      sto_timeout()
20✔
688
      return s, err, lastIndex
20✔
689

690
    elseif sto_timed_out() then
175✔
691
      current_log[client] = nil
×
692
      return nil, sto_error(err), lastIndex
×
693
    end
694

695
    if err == "wantread" then
146✔
696
      current_log = _reading_log
×
697
      current_log[client] = gettime()
×
698
      sto_change_queue("read")
×
699
      coroutine_yield(client, _reading)
×
700
    else
701
      current_log = _writing_log
146✔
702
      current_log[client] = gettime()
146✔
703
      sto_change_queue("write")
146✔
704
      coroutine_yield(client, _writing)
146✔
705
    end
706
  until false
146✔
707
end
708

709
function copas.sendto(client, data, ip, port)
151✔
710
  -- deprecated; for backward compatibility only, since UDP doesn't block on sending
711
  return client:sendto(data, ip, port)
×
712
end
713

714
-- waits until connection is completed
715
function copas.connect(skt, host, port)
151✔
716
  skt:settimeout(0)
171✔
717
  local ret, err, tried_more_than_once
718
  sto_timeout(skt, "write", true)
170✔
719

720
  repeat
721
    ret, err = skt:connect(host, port)
274✔
722

723
    -- non-blocking connect on Windows results in error "Operation already
724
    -- in progress" to indicate that it is completing the request async. So essentially
725
    -- it is the same as "timeout"
726
    if ret or (err ~= "timeout" and err ~= "Operation already in progress") then
270✔
727
      _writing_log[skt] = nil
160✔
728
      sto_timeout()
160✔
729
      -- Once the async connect completes, Windows returns the error "already connected"
730
      -- to indicate it is done, so that error should be ignored. Except when it is the
731
      -- first call to connect, then it was already connected to something else and the
732
      -- error should be returned
733
      if (not ret) and (err == "already connected" and tried_more_than_once) then
160✔
734
        return 1
×
735
      end
736
      return ret, err
160✔
737

738
    elseif sto_timed_out() then
132✔
739
      _writing_log[skt] = nil
10✔
740
      return nil, sto_error(err)
12✔
741
    end
742

743
    tried_more_than_once = tried_more_than_once or true
100✔
744
    _writing_log[skt] = gettime()
100✔
745
    coroutine_yield(skt, _writing)
100✔
746
  until false
100✔
747
end
748

749

750
-- Wraps a tcp socket in an ssl socket and configures it. If the socket was
751
-- already wrapped, it does nothing and returns the socket.
752
-- @param wrap_params the parameters for the ssl-context
753
-- @return wrapped socket, or throws an error
754
local function ssl_wrap(skt, wrap_params)
755
  if isTCP(skt) == "ssl" then return skt end -- was already wrapped
180✔
756
  if not wrap_params then
85✔
757
    error("cannot wrap socket into a secure socket (using 'ssl.wrap()') without parameters/context")
×
758
  end
759

760
  ssl = ssl or require("ssl")
85✔
761
  local nskt = assert(ssl.wrap(skt, wrap_params)) -- assert, because we do not want to silently ignore this one!!
102✔
762

763
  nskt:settimeout(0)  -- non-blocking on the ssl-socket
85✔
764
  copas.settimeouts(nskt, user_timeouts_connect[skt],
170✔
765
    user_timeouts_send[skt], user_timeouts_receive[skt]) -- copy copas user-timeout to newly wrapped one
89✔
766

767
  local co = _autoclose_r[skt]
85✔
768
  if co then
85✔
769
    -- socket registered for autoclose, move registration to wrapped one
770
    _autoclose[co] = nskt
20✔
771
    _autoclose_r[skt] = nil
20✔
772
    _autoclose_r[nskt] = co
20✔
773
  end
774

775
  local sock_name = object_names[skt]
85✔
776
  if sock_name ~= tostring(skt) then
85✔
777
    -- socket had a custom name, so copy it over
778
    object_names[nskt] = sock_name
30✔
779
  end
780
  return nskt
85✔
781
end
782

783

784
-- For each luasec method we have a subtable, allows for future extension.
785
-- Required structure:
786
-- {
787
--   wrap = ... -- parameter to 'wrap()'; the ssl parameter table, or the context object
788
--   sni = {                  -- parameters to 'sni()'
789
--     names = string | table -- 1st parameter
790
--     strict = bool          -- 2nd parameter
791
--   }
792
-- }
793
local function normalize_sslt(sslt)
794
  local t = type(sslt)
270✔
795
  local r = setmetatable({}, {
540✔
796
    __index = function(self, key)
797
      -- a bug if this happens, here as a sanity check, just being careful since
798
      -- this is security stuff
799
      error("accessing unknown 'ssl_params' table key: "..tostring(key))
×
800
    end,
801
  })
802
  if t == "nil" then
270✔
803
    r.wrap = false
185✔
804
    r.sni = false
185✔
805

806
  elseif t == "table" then
85✔
807
    if sslt.mode or sslt.protocol then
85✔
808
      -- has the mandatory fields for the ssl-params table for handshake
809
      -- backward compatibility
810
      r.wrap = sslt
20✔
811
      r.sni = false
20✔
812
    else
813
      -- has the target definition, copy our known keys
814
      r.wrap = sslt.wrap or false -- 'or false' because we do not want nils
65✔
815
      r.sni = sslt.sni or false -- 'or false' because we do not want nils
65✔
816
    end
817

818
  elseif t == "userdata" then
×
819
    -- it's an ssl-context object for the handshake
820
    -- backward compatibility
821
    r.wrap = sslt
×
822
    r.sni = false
×
823

824
  else
825
    error("ssl parameters; did not expect type "..tostring(sslt))
×
826
  end
827

828
  return r
270✔
829
end
830

831

832
---
833
-- Peforms an (async) ssl handshake on a connected TCP client socket.
834
-- NOTE: if not ssl-wrapped already, then replace all previous socket references, with the returned new ssl wrapped socket
835
-- Throws error and does not return nil+error, as that might silently fail
836
-- in code like this;
837
--   copas.addserver(s1, function(skt)
838
--       skt = copas.wrap(skt, sparams)
839
--       skt:dohandshake()   --> without explicit error checking, this fails silently and
840
--       skt:send(body)      --> continues unencrypted
841
-- @param skt Regular LuaSocket CLIENT socket object
842
-- @param wrap_params Table with ssl parameters
843
-- @return wrapped ssl socket, or throws an error
844
function copas.dohandshake(skt, wrap_params)
151✔
845
  ssl = ssl or require("ssl")
85✔
846

847
  local nskt = ssl_wrap(skt, wrap_params)
85✔
848

849
  sto_timeout(nskt, "write", true)
85✔
850
  local queue
851

852
  repeat
853
    local success, err = nskt:dohandshake()
210✔
854

855
    if success then
210✔
856
      sto_timeout()
75✔
857
      return nskt
75✔
858

859
    elseif not _isSocketTimeout[err] then
135✔
860
      sto_timeout()
10✔
861
      error("TLS/SSL handshake failed: " .. tostring(err))
10✔
862

863
    elseif sto_timed_out() then
150✔
864
      return nil, sto_error(err)
×
865

866
    elseif err == "wantwrite" then
125✔
867
      sto_change_queue("write")
×
868
      queue = _writing
×
869

870
    elseif err == "wantread" then
125✔
871
      sto_change_queue("read")
125✔
872
      queue = _reading
125✔
873

874
    else
875
      error("TLS/SSL handshake failed: " .. tostring(err))
×
876
    end
877

878
    coroutine_yield(nskt, queue)
125✔
879
  until false
125✔
880
end
881

882
-- flushes a client write buffer (deprecated)
883
function copas.flush()
151✔
884
end
885

886
-- wraps a TCP socket to use Copas methods (send, receive, flush and settimeout)
887
local _skt_mt_tcp = {
151✔
888
      __tostring = function(self)
889
        return tostring(self.socket).." (copas wrapped)"
15✔
890
      end,
891

892
      __index = {
151✔
893
        send = function (self, data, from, to)
894
          return copas.send (self.socket, data, from, to)
12,293✔
895
        end,
896

897
        receive = function (self, pattern, prefix)
898
          if user_timeouts_receive[self.socket] == 0 then
1,648,159✔
899
            return copas.receivepartial(self.socket, pattern, prefix)
10✔
900
          end
901
          return copas.receive(self.socket, pattern, prefix)
1,648,148✔
902
        end,
903

904
        receivepartial = function (self, pattern, prefix)
905
          return copas.receivepartial(self.socket, pattern, prefix)
×
906
        end,
907

908
        flush = function (self)
909
          return copas.flush(self.socket)
×
910
        end,
911

912
        settimeout = function (self, time)
913
          return copas.settimeout(self.socket, time)
155✔
914
        end,
915

916
        settimeouts = function (self, connect, send, receive)
917
          return copas.settimeouts(self.socket, connect, send, receive)
×
918
        end,
919

920
        -- TODO: socket.connect is a shortcut, and must be provided with an alternative
921
        -- if ssl parameters are available, it will also include a handshake
922
        connect = function(self, ...)
923
          local res, err = copas.connect(self.socket, ...)
170✔
924
          if res then
170✔
925
            if self.ssl_params.sni then self:sni() end
155✔
926
            if self.ssl_params.wrap then res, err = self:dohandshake() end
167✔
927
          end
928
          return res, err
165✔
929
        end,
930

931
        close = function(self, ...)
932
          return copas.close(self.socket, ...)
175✔
933
        end,
934

935
        -- TODO: socket.bind is a shortcut, and must be provided with an alternative
936
        bind = function(self, ...) return self.socket:bind(...) end,
151✔
937

938
        -- TODO: is this DNS related? hence blocking?
939
        getsockname = function(self, ...)
940
          local ok, ip, port, family = pcall(self.socket.getsockname, self.socket, ...)
×
941
          if ok then
×
942
            return ip, port, family
×
943
          else
944
            return nil, "not implemented by LuaSec"
×
945
          end
946
        end,
947

948
        getstats = function(self, ...) return self.socket:getstats(...) end,
151✔
949

950
        setstats = function(self, ...) return self.socket:setstats(...) end,
151✔
951

952
        listen = function(self, ...) return self.socket:listen(...) end,
151✔
953

954
        accept = function(self, ...) return self.socket:accept(...) end,
151✔
955

956
        setoption = function(self, ...)
957
          local ok, res, err = pcall(self.socket.setoption, self.socket, ...)
×
958
          if ok then
×
959
            return res, err
×
960
          else
961
            return nil, "not implemented by LuaSec"
×
962
          end
963
        end,
964

965
        getoption = function(self, ...)
966
          local ok, val, err = pcall(self.socket.getoption, self.socket, ...)
×
967
          if ok then
×
968
            return val, err
×
969
          else
970
            return nil, "not implemented by LuaSec"
×
971
          end
972
        end,
973

974
        -- TODO: is this DNS related? hence blocking?
975
        getpeername = function(self, ...)
976
          local ok, ip, port, family = pcall(self.socket.getpeername, self.socket, ...)
×
977
          if ok then
×
978
            return ip, port, family
×
979
          else
980
            return nil, "not implemented by LuaSec"
×
981
          end
982
        end,
983

984
        shutdown = function(self, ...) return self.socket:shutdown(...) end,
151✔
985

986
        sni = function(self, names, strict)
987
          local sslp = self.ssl_params
65✔
988
          self.socket = ssl_wrap(self.socket, sslp.wrap)
78✔
989
          if names == nil then
65✔
990
            names = sslp.sni.names
55✔
991
            strict = sslp.sni.strict
55✔
992
          end
993
          return self.socket:sni(names, strict)
65✔
994
        end,
995

996
        dohandshake = function(self, wrap_params)
997
          local nskt, err = copas.dohandshake(self.socket, wrap_params or self.ssl_params.wrap)
85✔
998
          if not nskt then return nskt, err end
75✔
999
          self.socket = nskt  -- replace internal socket with the newly wrapped ssl one
75✔
1000
          return self
75✔
1001
        end,
1002

1003
        getalpn = function(self, ...)
1004
          local ok, proto, err = pcall(self.socket.getalpn, self.socket, ...)
×
1005
          if ok then
×
1006
            return proto, err
×
1007
          else
1008
            return nil, "not a tls socket"
×
1009
          end
1010
        end,
1011

1012
        getsniname = function(self, ...)
1013
          local ok, name, err = pcall(self.socket.getsniname, self.socket, ...)
×
1014
          if ok then
×
1015
            return name, err
×
1016
          else
1017
            return nil, "not a tls socket"
×
1018
          end
1019
        end,
1020
      }
151✔
1021
}
1022

1023
-- wraps a UDP socket, copy of TCP one adapted for UDP.
1024
local _skt_mt_udp = {__index = { }}
151✔
1025
for k,v in pairs(_skt_mt_tcp) do _skt_mt_udp[k] = _skt_mt_udp[k] or v end
453✔
1026
for k,v in pairs(_skt_mt_tcp.__index) do _skt_mt_udp.__index[k] = v end
3,473✔
1027

1028
_skt_mt_udp.__index.send        = function(self, ...) return self.socket:send(...) end
156✔
1029

1030
_skt_mt_udp.__index.sendto      = function(self, ...) return self.socket:sendto(...) end
166✔
1031

1032

1033
_skt_mt_udp.__index.receive =     function (self, size)
151✔
1034
                                    return copas.receive (self.socket, (size or UDP_DATAGRAM_MAX))
10✔
1035
                                  end
1036

1037
_skt_mt_udp.__index.receivefrom = function (self, size)
151✔
1038
                                    return copas.receivefrom (self.socket, (size or UDP_DATAGRAM_MAX))
20✔
1039
                                  end
1040

1041
                                  -- TODO: is this DNS related? hence blocking?
1042
_skt_mt_udp.__index.setpeername = function(self, ...) return self.socket:setpeername(...) end
156✔
1043

1044
_skt_mt_udp.__index.setsockname = function(self, ...) return self.socket:setsockname(...) end
151✔
1045

1046
                                    -- do not close client, as it is also the server for udp.
1047
_skt_mt_udp.__index.close       = function(self, ...) return true end
161✔
1048

1049
_skt_mt_udp.__index.settimeouts = function (self, connect, send, receive)
151✔
1050
                                    return copas.settimeouts(self.socket, connect, send, receive)
×
1051
                                  end
1052

1053

1054

1055
---
1056
-- Wraps a LuaSocket socket object in an async Copas based socket object.
1057
-- @param skt The socket to wrap
1058
-- @sslt (optional) Table with ssl parameters, use an empty table to use ssl with defaults
1059
-- @return wrapped socket object
1060
function copas.wrap (skt, sslt)
151✔
1061
  if (getmetatable(skt) == _skt_mt_tcp) or (getmetatable(skt) == _skt_mt_udp) then
290✔
1062
    return skt -- already wrapped
×
1063
  end
1064

1065
  skt:settimeout(0)
291✔
1066

1067
  if isTCP(skt) then
348✔
1068
    return setmetatable ({socket = skt, ssl_params = normalize_sslt(sslt)}, _skt_mt_tcp)
324✔
1069
  else
1070
    return setmetatable ({socket = skt}, _skt_mt_udp)
20✔
1071
  end
1072
end
1073

1074
--- Wraps a handler in a function that deals with wrapping the socket and doing the
1075
-- optional ssl handshake.
1076
function copas.handler(handler, sslparams)
151✔
1077
  -- TODO: pass a timeout value to set, and use during handshake
1078
  return function (skt, ...)
1079
    skt = copas.wrap(skt, sslparams) -- this call will normalize the sslparams table
96✔
1080
    local sslp = skt.ssl_params
80✔
1081
    if sslp.sni then skt:sni(sslp.sni.names, sslp.sni.strict) end
80✔
1082
    if sslp.wrap then skt:dohandshake(sslp.wrap) end
80✔
1083
    return handler(skt, ...)
75✔
1084
  end
1085
end
1086

1087

1088
--------------------------------------------------
1089
-- Error handling
1090
--------------------------------------------------
1091

1092
local _errhandlers = setmetatable({}, { __mode = "k" })   -- error handler per coroutine
151✔
1093

1094

1095
function copas.gettraceback(msg, co, skt)
151✔
1096
  local co_str = co == nil and "nil" or copas.getthreadname(co)
31✔
1097
  local skt_str = skt == nil and "nil" or copas.getsocketname(skt)
31✔
1098
  local msg_str = msg == nil and "" or tostring(msg)
31✔
1099
  if msg_str == "" then
31✔
1100
    msg_str = ("(coroutine: %s, socket: %s)"):format(msg_str, co_str, skt_str)
×
1101
  else
1102
    msg_str = ("%s (coroutine: %s, socket: %s)"):format(msg_str, co_str, skt_str)
31✔
1103
  end
1104

1105
  if type(co) == "thread" then
31✔
1106
    -- regular Copas coroutine
1107
    return debug.traceback(co, msg_str)
31✔
1108
  end
1109
  -- not a coroutine, but the main thread, this happens if a timeout callback
1110
  -- (see `copas.timeout` causes an error (those callbacks run on the main thread).
1111
  return debug.traceback(msg_str, 2)
×
1112
end
1113

1114

1115
local function _deferror(msg, co, skt)
1116
  print(copas.gettraceback(msg, co, skt))
24✔
1117
end
1118

1119

1120
function copas.seterrorhandler(err, default)
151✔
1121
  assert(err == nil or type(err) == "function", "Expected the handler to be a function, or nil")
50✔
1122
  if default then
50✔
1123
    assert(err ~= nil, "Expected the handler to be a function when setting the default")
35✔
1124
    _deferror = err
35✔
1125
  else
1126
    _errhandlers[coroutine_running()] = err
15✔
1127
  end
1128
end
1129
copas.setErrorHandler = copas.seterrorhandler  -- deprecated; old casing
151✔
1130

1131

1132
function copas.geterrorhandler(co)
151✔
1133
  co = co or coroutine_running()
10✔
1134
  return _errhandlers[co] or _deferror
10✔
1135
end
1136

1137

1138
-- if `bool` is truthy, then the original socket errors will be returned in case of timeouts;
1139
-- `timeout, wantread, wantwrite, Operation already in progress`. If falsy, it will always
1140
-- return `timeout`.
1141
function copas.useSocketTimeoutErrors(bool)
151✔
1142
  useSocketTimeoutErrors[coroutine_running()] = not not bool -- force to a boolean
5✔
1143
end
1144

1145
-------------------------------------------------------------------------------
1146
-- Thread handling
1147
-------------------------------------------------------------------------------
1148

1149
local function _doTick (co, skt, ...)
1150
  if not co then return end
188,010✔
1151

1152
  -- if a coroutine was canceled/removed, don't resume it
1153
  if _canceled[co] then
188,010✔
1154
    _canceled[co] = nil -- also clean up the registry
11✔
1155
    _threads[co] = nil
11✔
1156
    return
11✔
1157
  end
1158

1159
  -- res: the socket (being read/write on) or the time to sleep
1160
  -- new_q: either _writing, _reading, or _sleeping
1161
  -- local time_before = gettime()
1162
  local ok, res, new_q = coroutine_resume(co, skt, ...)
187,999✔
1163
  -- local duration = gettime() - time_before
1164
  -- if duration > 1 then
1165
  --   duration = math.floor(duration * 1000)
1166
  --   pcall(_errhandlers[co] or _deferror, "task ran for "..tostring(duration).." milliseconds.", co, skt)
1167
  -- end
1168

1169
  if new_q == _reading or new_q == _writing or new_q == _sleeping then
187,994✔
1170
    -- we're yielding to a new queue
1171
    new_q:insert (res)
183,768✔
1172
    new_q:push (res, co)
183,768✔
1173
    return
183,768✔
1174
  end
1175

1176
  -- coroutine is terminating
1177

1178
  if ok and coroutine_status(co) ~= "dead" then
4,226✔
1179
    -- it called coroutine.yield from a non-Copas function which is unexpected
1180
    ok = false
5✔
1181
    res = "coroutine.yield was called without a resume first, user-code cannot yield to Copas"
5✔
1182
  end
1183

1184
  if not ok then
4,226✔
1185
    local k, e = pcall(_errhandlers[co] or _deferror, res, co, skt)
38✔
1186
    if not k then
38✔
1187
      print("Failed executing error handler: " .. tostring(e))
×
1188
    end
1189
  end
1190

1191
  local skt_to_close = _autoclose[co]
4,226✔
1192
  if skt_to_close then
4,226✔
1193
    skt_to_close:close()
95✔
1194
    _autoclose[co] = nil
95✔
1195
    _autoclose_r[skt_to_close] = nil
95✔
1196
  end
1197

1198
  _errhandlers[co] = nil
4,226✔
1199
end
1200

1201

1202
local _accept do
151✔
1203
  local client_counters = setmetatable({}, { __mode = "k" })
151✔
1204

1205
  -- accepts a connection on socket input
1206
  function _accept(server_skt, handler)
122✔
1207
    local client_skt = server_skt:accept()
100✔
1208
    if client_skt then
100✔
1209
      local count = (client_counters[server_skt] or 0) + 1
100✔
1210
      client_counters[server_skt] = count
100✔
1211
      object_names[client_skt] = object_names[server_skt] .. ":client_" .. count
113✔
1212

1213
      client_skt:settimeout(0)
100✔
1214
      copas.settimeouts(client_skt, user_timeouts_connect[server_skt],  -- copy server socket timeout settings
200✔
1215
        user_timeouts_send[server_skt], user_timeouts_receive[server_skt])
113✔
1216

1217
      local co = coroutine_create(handler)
100✔
1218
      object_names[co] = object_names[server_skt] .. ":handler_" .. count
100✔
1219

1220
      if copas.autoclose then
100✔
1221
        _autoclose[co] = client_skt
100✔
1222
        _autoclose_r[client_skt] = co
100✔
1223
      end
1224

1225
      _doTick(co, client_skt)
100✔
1226
    end
1227
  end
1228
end
1229

1230
-------------------------------------------------------------------------------
1231
-- Adds a server/handler pair to Copas dispatcher
1232
-------------------------------------------------------------------------------
1233

1234
do
1235
  local function addTCPserver(server, handler, timeout, name)
1236
    server:settimeout(0)
75✔
1237
    if name then
75✔
1238
      object_names[server] = name
×
1239
    end
1240
    _servers[server] = handler
75✔
1241
    _reading:insert(server)
75✔
1242
    if timeout then
75✔
1243
      copas.settimeout(server, timeout)
15✔
1244
    end
1245
  end
1246

1247
  local function addUDPserver(server, handler, timeout, name)
1248
    server:settimeout(0)
×
1249
    local co = coroutine_create(handler)
×
1250
    if name then
×
1251
      object_names[server] = name
×
1252
    end
1253
    object_names[co] = object_names[server]..":handler"
×
1254
    _reading:insert(server)
×
1255
    if timeout then
×
1256
      copas.settimeout(server, timeout)
×
1257
    end
1258
    _doTick(co, server)
×
1259
  end
1260

1261

1262
  function copas.addserver(server, handler, timeout, name)
151✔
1263
    if isTCP(server) then
90✔
1264
      addTCPserver(server, handler, timeout, name)
90✔
1265
    else
1266
      addUDPserver(server, handler, timeout, name)
×
1267
    end
1268
  end
1269
end
1270

1271

1272
function copas.removeserver(server, keep_open)
151✔
1273
  local skt = server
70✔
1274
  local mt = getmetatable(server)
70✔
1275
  if mt == _skt_mt_tcp or mt == _skt_mt_udp then
70✔
1276
    skt = server.socket
×
1277
  end
1278

1279
  _servers:remove(skt)
70✔
1280
  _reading:remove(skt)
70✔
1281

1282
  if keep_open then
70✔
1283
    return true
15✔
1284
  end
1285
  return server:close()
55✔
1286
end
1287

1288

1289

1290
-------------------------------------------------------------------------------
1291
-- Adds an new coroutine thread to Copas dispatcher
1292
-------------------------------------------------------------------------------
1293
function copas.addnamedthread(name, handler, ...)
151✔
1294
  if type(name) == "function" and type(handler) == "string" then
4,322✔
1295
    -- old call, flip args for compatibility
1296
    name, handler = handler, name
×
1297
  end
1298

1299
  -- create a coroutine that skips the first argument, which is always the socket
1300
  -- passed by the scheduler, but `nil` in case of a task/thread
1301
  local thread = coroutine_create(function(_, ...)
8,644✔
1302
    copas.pause()
4,322✔
1303
    return handler(...)
4,317✔
1304
  end)
1305
  if name then
4,322✔
1306
    object_names[thread] = name
367✔
1307
  end
1308

1309
  _threads[thread] = true -- register this thread so it can be removed
4,322✔
1310
  _doTick (thread, nil, ...)
4,322✔
1311
  return thread
4,322✔
1312
end
1313

1314

1315
function copas.addthread(handler, ...)
151✔
1316
  return copas.addnamedthread(nil, handler, ...)
3,955✔
1317
end
1318

1319

1320
function copas.removethread(thread)
151✔
1321
  -- if the specified coroutine is registered, add it to the canceled table so
1322
  -- that next time it tries to resume it exits.
1323
  _canceled[thread] = _threads[thread or 0]
35✔
1324
  _sleeping:cancel(thread)
35✔
1325
end
1326

1327

1328

1329
-------------------------------------------------------------------------------
1330
-- Sleep/pause management functions
1331
-------------------------------------------------------------------------------
1332

1333
-- yields the current coroutine and wakes it after 'sleeptime' seconds.
1334
-- If sleeptime < 0 then it sleeps until explicitly woken up using 'wakeup'
1335
-- TODO: deprecated, remove in next major
1336
function copas.sleep(sleeptime)
151✔
1337
  coroutine_yield((sleeptime or 0), _sleeping)
×
1338
end
1339

1340

1341
-- yields the current coroutine and wakes it after 'sleeptime' seconds.
1342
-- if sleeptime < 0 then it sleeps 0 seconds.
1343
function copas.pause(sleeptime)
151✔
1344
  if sleeptime and sleeptime > 0 then
180,346✔
1345
    coroutine_yield(sleeptime, _sleeping)
6,747✔
1346
  else
1347
    coroutine_yield(0, _sleeping)
174,697✔
1348
  end
1349
end
1350

1351

1352
-- yields the current coroutine until explicitly woken up using 'wakeup'
1353
function copas.pauseforever()
151✔
1354
  coroutine_yield(-1, _sleeping)
2,605✔
1355
end
1356

1357

1358
-- Wakes up a sleeping coroutine 'co'.
1359
function copas.wakeup(co)
151✔
1360
  _sleeping:wakeup(co)
2,615✔
1361
end
1362

1363

1364

1365
-------------------------------------------------------------------------------
1366
-- Timeout management
1367
-------------------------------------------------------------------------------
1368

1369
do
1370
  local timeout_register = setmetatable({}, { __mode = "k" })
151✔
1371
  local time_out_thread
1372
  local timerwheel = require("timerwheel").new({
303✔
1373
      now = gettime,
151✔
1374
      precision = TIMEOUT_PRECISION,
151✔
1375
      ringsize = math.floor(60*60*24/TIMEOUT_PRECISION),  -- ring size 1 day
151✔
1376
      err_handler = function(err)
1377
        return _deferror(err, time_out_thread)
13✔
1378
      end,
1379
    })
1380

1381
  time_out_thread = copas.addnamedthread("copas_core_timer", function()
302✔
1382
    while true do
1383
      copas.pause(TIMEOUT_PRECISION)
4,941✔
1384
      timerwheel:step()
5,755✔
1385
    end
1386
  end)
1387

1388
  -- get the number of timeouts running
1389
  function copas.gettimeouts()
151✔
1390
    return timerwheel:count()
1,826✔
1391
  end
1392

1393
  --- Sets the timeout for the current coroutine.
1394
  -- @param delay delay (seconds), use 0 (or math.huge) to cancel the timerout
1395
  -- @param callback function with signature: `function(coroutine)` where coroutine is the routine that timed-out
1396
  -- @return true
1397
  function copas.timeout(delay, callback)
151✔
1398
    local co = coroutine_running()
3,325,355✔
1399
    local existing_timer = timeout_register[co]
3,325,355✔
1400

1401
    if existing_timer then
3,325,355✔
1402
      timerwheel:cancel(existing_timer)
3,542✔
1403
    end
1404

1405
    if delay > 0 and delay ~= math.huge then
3,325,355✔
1406
      timeout_register[co] = timerwheel:set(delay, callback, co)
5,816✔
1407
    elseif delay == 0 or delay == math.huge then
3,320,500✔
1408
      timeout_register[co] = nil
3,320,500✔
1409
    else
1410
      error("timout value must be greater than or equal to 0, got: "..tostring(delay))
×
1411
    end
1412

1413
    return true
3,325,355✔
1414
  end
1415

1416
end
1417

1418

1419
-------------------------------------------------------------------------------
1420
-- main tasks: manage readable and writable socket sets
1421
-------------------------------------------------------------------------------
1422
-- a task is an object with a required method `step()` that deals with a
1423
-- single step for that task.
1424

1425
local _tasks = {} do
151✔
1426
  function _tasks:add(tsk)
151✔
1427
    _tasks[#_tasks + 1] = tsk
604✔
1428
  end
1429
end
1430

1431

1432
-- a task to check ready to read events
1433
local _readable_task = {} do
151✔
1434

1435
  _readable_task._events = {}
151✔
1436

1437
  local function tick(skt)
1438
    local handler = _servers[skt]
621✔
1439
    if handler then
621✔
1440
      _accept(skt, handler)
120✔
1441
    else
1442
      _reading:remove(skt)
521✔
1443
      _doTick(_reading:pop(skt), skt)
630✔
1444
    end
1445
  end
1446

1447
  function _readable_task:step()
151✔
1448
    for _, skt in ipairs(self._events) do
175,745✔
1449
      tick(skt)
621✔
1450
    end
1451
  end
1452

1453
  _tasks:add(_readable_task)
180✔
1454
end
1455

1456

1457
-- a task to check ready to write events
1458
local _writable_task = {} do
151✔
1459

1460
  _writable_task._events = {}
151✔
1461

1462
  local function tick(skt)
1463
    _writing:remove(skt)
236✔
1464
    _doTick(_writing:pop(skt), skt)
283✔
1465
  end
1466

1467
  function _writable_task:step()
151✔
1468
    for _, skt in ipairs(self._events) do
175,357✔
1469
      tick(skt)
236✔
1470
    end
1471
  end
1472

1473
  _tasks:add(_writable_task)
180✔
1474
end
1475

1476

1477

1478
-- sleeping threads task
1479
local _sleeping_task = {} do
151✔
1480

1481
  function _sleeping_task:step()
151✔
1482
    local now = gettime()
175,121✔
1483

1484
    local co = _sleeping:pop(now)
175,121✔
1485
    while co do
180,602✔
1486
      -- we're pushing them to _resumable, since that list will be replaced before
1487
      -- executing. This prevents tasks running twice in a row with pause(0) for example.
1488
      -- So here we won't execute, but at _resumable step which is next
1489
      _resumable:push(co)
5,481✔
1490
      co = _sleeping:pop(now)
6,580✔
1491
    end
1492
  end
1493

1494
  _tasks:add(_sleeping_task)
151✔
1495
end
1496

1497

1498

1499
-- resumable threads task
1500
local _resumable_task = {} do
151✔
1501

1502
  function _resumable_task:step()
151✔
1503
    -- replace the resume list before iterating, so items placed in there
1504
    -- will indeed end up in the next copas step, not in this one, and not
1505
    -- create a loop
1506
    local resumelist = _resumable:clear_resumelist()
175,121✔
1507

1508
    for _, co in ipairs(resumelist) do
357,950✔
1509
      _doTick(co)
182,831✔
1510
    end
1511
  end
1512

1513
  _tasks:add(_resumable_task)
151✔
1514
end
1515

1516

1517
-------------------------------------------------------------------------------
1518
-- Checks for reads and writes on sockets
1519
-------------------------------------------------------------------------------
1520
local _select_plain do
151✔
1521

1522
  local last_cleansing = 0
151✔
1523
  local duration = function(t2, t1) return t2-t1 end
175,205✔
1524

1525
  if not socket then
151✔
1526
    -- socket module unavailable, switch to luasystem sleep
1527
    _select_plain = system.sleep
5✔
1528
  else
1529
    -- use socket.select to handle socket-io
1530
    _select_plain = function(timeout)
1531
      local err
1532
      local now = gettime()
175,054✔
1533

1534
      -- remove any closed sockets to prevent select from hanging on them
1535
      if _closed[1] then
175,054✔
1536
        for i, skt in ipairs(_closed) do
329✔
1537
          _closed[i] = { _reading:remove(skt), _writing:remove(skt) }
231✔
1538
        end
1539
      end
1540

1541
      _readable_task._events, _writable_task._events, err = socket.select(_reading, _writing, timeout)
175,054✔
1542
      local r_events, w_events = _readable_task._events, _writable_task._events
175,054✔
1543

1544
      -- inject closed sockets in readable/writeable task so they can error out properly
1545
      if _closed[1] then
175,054✔
1546
        for i, skts in ipairs(_closed) do
329✔
1547
          _closed[i] = nil
165✔
1548
          r_events[#r_events+1] = skts[1]
165✔
1549
          w_events[#w_events+1] = skts[2]
165✔
1550
        end
1551
      end
1552

1553
      if duration(now, last_cleansing) > WATCH_DOG_TIMEOUT then
232,332✔
1554
        last_cleansing = now
141✔
1555

1556
        -- Check all sockets selected for reading, and check how long they have been waiting
1557
        -- for data already, without select returning them as readable
1558
        for skt,time in pairs(_reading_log) do
141✔
1559
          if not r_events[skt] and duration(now, time) > WATCH_DOG_TIMEOUT then
×
1560
            -- This one timedout while waiting to become readable, so move
1561
            -- it in the readable list and try and read anyway, despite not
1562
            -- having been returned by select
1563
            _reading_log[skt] = nil
×
1564
            r_events[#r_events + 1] = skt
×
1565
            r_events[skt] = #r_events
×
1566
          end
1567
        end
1568

1569
        -- Do the same for writing
1570
        for skt,time in pairs(_writing_log) do
141✔
1571
          if not w_events[skt] and duration(now, time) > WATCH_DOG_TIMEOUT then
×
1572
            _writing_log[skt] = nil
×
1573
            w_events[#w_events + 1] = skt
×
1574
            w_events[skt] = #w_events
×
1575
          end
1576
        end
1577
      end
1578

1579
      if err == "timeout" and #r_events + #w_events > 0 then
175,054✔
1580
        return nil
5✔
1581
      else
1582
        return err
175,049✔
1583
      end
1584
    end
1585
  end
1586
end
1587

1588

1589

1590
-------------------------------------------------------------------------------
1591
-- Dispatcher loop step.
1592
-- Listen to client requests and handles them
1593
-- Returns false if no socket-data was handled, or true if there was data
1594
-- handled (or nil + error message)
1595
-------------------------------------------------------------------------------
1596

1597
local copas_stats
1598
local min_ever, max_ever
1599

1600
local _select = _select_plain
151✔
1601

1602
-- instrumented version of _select() to collect stats
1603
local _select_instrumented = function(timeout)
1604
  if copas_stats then
×
1605
    local step_duration = gettime() - copas_stats.step_start
×
1606
    copas_stats.duration_max = math.max(copas_stats.duration_max, step_duration)
×
1607
    copas_stats.duration_min = math.min(copas_stats.duration_min, step_duration)
×
1608
    copas_stats.duration_tot = copas_stats.duration_tot + step_duration
×
1609
    copas_stats.steps = copas_stats.steps + 1
×
1610
  else
1611
    copas_stats = {
×
1612
      duration_max = -1,
1613
      duration_min = 999999,
1614
      duration_tot = 0,
1615
      steps = 0,
1616
    }
1617
  end
1618

1619
  local err = _select_plain(timeout)
×
1620

1621
  local now = gettime()
×
1622
  copas_stats.time_start = copas_stats.time_start or now
×
1623
  copas_stats.step_start = now
×
1624

1625
  return err
×
1626
end
1627

1628

1629
function copas.step(timeout)
151✔
1630
  -- Need to wake up the select call in time for the next sleeping event
1631
  if not _resumable:done() then
232,416✔
1632
    timeout = 0
170,650✔
1633
  else
1634
    timeout = math.min(_sleeping:getnext(), timeout or math.huge)
5,367✔
1635
  end
1636

1637
  local err = _select(timeout)
175,124✔
1638

1639
  for _, tsk in ipairs(_tasks) do
875,609✔
1640
    tsk:step()
700,490✔
1641
  end
1642

1643
  if err then
175,119✔
1644
    if err == "timeout" then
174,322✔
1645
      if timeout + 0.01 > TIMEOUT_PRECISION and math.random(100) > 90 then
174,252✔
1646
        -- we were idle, so occasionally do a GC sweep to ensure lingering
1647
        -- sockets are closed, and we don't accidentally block the loop from
1648
        -- exiting
1649
        collectgarbage()
336✔
1650
      end
1651
      return false
174,252✔
1652
    end
1653
    return nil, err
70✔
1654
  end
1655

1656
  return true
797✔
1657
end
1658

1659

1660
-------------------------------------------------------------------------------
1661
-- Check whether there is something to do.
1662
-- returns false if there are no sockets for read/write nor tasks scheduled
1663
-- (which means Copas is in an empty spin)
1664
-------------------------------------------------------------------------------
1665
function copas.finished()
151✔
1666
  return #_reading == 0 and #_writing == 0 and _resumable:done() and _sleeping:done(copas.gettimeouts())
176,573✔
1667
end
1668

1669
local _getstats do
151✔
1670
  local _getstats_instrumented, _getstats_plain
1671

1672

1673
  function _getstats_plain(enable)
122✔
1674
    -- this function gets hit if turned off, so turn on if true
1675
    if enable == true then
×
1676
      _select = _select_instrumented
×
1677
      _getstats = _getstats_instrumented
×
1678
      -- reset stats
1679
      min_ever = nil
×
1680
      max_ever = nil
×
1681
      copas_stats = nil
×
1682
    end
1683
    return {}
×
1684
  end
1685

1686

1687
  -- convert from seconds to millisecs, with microsec precision
1688
  local function useconds(t)
1689
    return math.floor((t * 1000000) + 0.5) / 1000
×
1690
  end
1691
  -- convert from seconds to seconds, with millisec precision
1692
  local function mseconds(t)
1693
    return math.floor((t * 1000) + 0.5) / 1000
×
1694
  end
1695

1696

1697
  function _getstats_instrumented(enable)
122✔
1698
    if enable == false then
×
1699
      _select = _select_plain
×
1700
      _getstats = _getstats_plain
×
1701
      -- instrumentation disabled, so switch to the plain implementation
1702
      return _getstats(enable)
×
1703
    end
1704
    if (not copas_stats) or (copas_stats.step == 0) then
×
1705
      return {}
×
1706
    end
1707
    local stats = copas_stats
×
1708
    copas_stats = nil
×
1709
    min_ever = math.min(min_ever or 9999999, stats.duration_min)
×
1710
    max_ever = math.max(max_ever or 0, stats.duration_max)
×
1711
    stats.duration_min_ever = min_ever
×
1712
    stats.duration_max_ever = max_ever
×
1713
    stats.duration_avg = stats.duration_tot / stats.steps
×
1714
    stats.step_start = nil
×
1715
    stats.time_end = gettime()
×
1716
    stats.time_tot = stats.time_end - stats.time_start
×
1717
    stats.time_avg = stats.time_tot / stats.steps
×
1718

1719
    stats.duration_avg = useconds(stats.duration_avg)
×
1720
    stats.duration_max = useconds(stats.duration_max)
×
1721
    stats.duration_max_ever = useconds(stats.duration_max_ever)
×
1722
    stats.duration_min = useconds(stats.duration_min)
×
1723
    stats.duration_min_ever = useconds(stats.duration_min_ever)
×
1724
    stats.duration_tot = useconds(stats.duration_tot)
×
1725
    stats.time_avg = useconds(stats.time_avg)
×
1726
    stats.time_start = mseconds(stats.time_start)
×
1727
    stats.time_end = mseconds(stats.time_end)
×
1728
    stats.time_tot = mseconds(stats.time_tot)
×
1729
    return stats
×
1730
  end
1731

1732
  _getstats = _getstats_plain
151✔
1733
end
1734

1735

1736
function copas.status(enable_stats)
151✔
1737
  local res = _getstats(enable_stats)
×
1738
  res.running = not not copas.running
×
1739
  res.timeout = copas.gettimeouts()
×
1740
  res.timer, res.inactive = _sleeping:status()
×
1741
  res.read = #_reading
×
1742
  res.write = #_writing
×
1743
  res.active = _resumable:count()
×
1744
  return res
×
1745
end
1746

1747

1748
-------------------------------------------------------------------------------
1749
-- Dispatcher endless loop.
1750
-- Listen to client requests and handles them forever
1751
-------------------------------------------------------------------------------
1752
function copas.loop(initializer, timeout)
151✔
1753
  if type(initializer) == "function" then
236✔
1754
    copas.addnamedthread("copas_initializer", initializer)
114✔
1755
  else
1756
    timeout = initializer or timeout
140✔
1757
  end
1758

1759
  copas.running = true
236✔
1760
  while not copas.finished() do copas.step(timeout) end
289,983✔
1761
  copas.running = false
231✔
1762
end
1763

1764

1765
-------------------------------------------------------------------------------
1766
-- Naming sockets and coroutines.
1767
-------------------------------------------------------------------------------
1768
do
1769
  local function realsocket(skt)
1770
    local mt = getmetatable(skt)
75✔
1771
    if mt == _skt_mt_tcp or mt == _skt_mt_udp then
75✔
1772
      return skt.socket
75✔
1773
    else
1774
      return skt
×
1775
    end
1776
  end
1777

1778

1779
  function copas.setsocketname(name, skt)
151✔
1780
    assert(type(name) == "string", "expected arg #1 to be a string")
75✔
1781
    skt = assert(realsocket(skt), "expected arg #2 to be a socket")
90✔
1782
    object_names[skt] = name
75✔
1783
  end
1784

1785

1786
  function copas.getsocketname(skt)
151✔
1787
    skt = assert(realsocket(skt), "expected arg #1 to be a socket")
×
1788
    return object_names[skt]
×
1789
  end
1790
end
1791

1792

1793
function copas.setthreadname(name, coro)
151✔
1794
  assert(type(name) == "string", "expected arg #1 to be a string")
50✔
1795
  coro = coro or coroutine_running()
50✔
1796
  assert(type(coro) == "thread", "expected arg #2 to be a coroutine or nil")
50✔
1797
  object_names[coro] = name
50✔
1798
end
1799

1800

1801
function copas.getthreadname(coro)
180✔
1802
  coro = coro or coroutine_running()
31✔
1803
  assert(type(coro) == "thread", "expected arg #1 to be a coroutine or nil")
31✔
1804
  return object_names[coro]
33✔
1805
end
1806

1807
-------------------------------------------------------------------------------
1808
-- Debug functionality.
1809
-------------------------------------------------------------------------------
1810
do
1811
  copas.debug = {}
151✔
1812

1813
  local log_core    -- if truthy, the core-timer will also be logged
1814
  local debug_log   -- function used as logger
1815

1816

1817
  local debug_yield = function(skt, queue)
1818
    local name = object_names[coroutine_running()]
1,984✔
1819

1820
    if log_core or name ~= "copas_core_timer" then
1,984✔
1821
      if queue == _sleeping then
1,975✔
1822
        debug_log("yielding '", name, "' to SLEEP for ", skt," seconds")
1,954✔
1823

1824
      elseif queue == _writing then
21✔
1825
        debug_log("yielding '", name, "' to WRITE on '", object_names[skt], "'")
6✔
1826

1827
      elseif queue == _reading then
16✔
1828
        debug_log("yielding '", name, "' to READ on '", object_names[skt], "'")
17✔
1829

1830
      else
1831
        debug_log("thread '", name, "' yielding to unexpected queue; ", tostring(queue), " (", type(queue), ")", debug.traceback())
×
1832
      end
1833
    end
1834

1835
    return coroutine.yield(skt, queue)
1,984✔
1836
  end
1837

1838

1839
  local debug_resume = function(coro, skt, ...)
1840
    local name = object_names[coro]
1,994✔
1841

1842
    if skt then
1,994✔
1843
      debug_log("resuming '", name, "' for socket '", object_names[skt], "'")
21✔
1844
    else
1845
      if log_core or name ~= "copas_core_timer" then
1,973✔
1846
        debug_log("resuming '", name, "'")
1,964✔
1847
      end
1848
    end
1849
    return coroutine.resume(coro, skt, ...)
1,994✔
1850
  end
1851

1852

1853
  local debug_create = function(f)
1854
    local f_wrapped = function(...)
1855
      local results = pack(f(...))
12✔
1856
      debug_log("exiting '", object_names[coroutine_running()], "'")
10✔
1857
      return unpack(results)
10✔
1858
    end
1859

1860
    return coroutine.create(f_wrapped)
10✔
1861
  end
1862

1863

1864
  debug_log = fnil
151✔
1865

1866

1867
  -- enables debug output for all coroutine operations.
1868
  function copas.debug.start(logger, core)
302✔
1869
    log_core = core
5✔
1870
    debug_log = logger or print
5✔
1871
    coroutine_yield = debug_yield
5✔
1872
    coroutine_resume = debug_resume
5✔
1873
    coroutine_create = debug_create
5✔
1874
  end
1875

1876

1877
  -- disables debug output for coroutine operations.
1878
  function copas.debug.stop()
302✔
1879
    debug_log = fnil
×
1880
    coroutine_yield = coroutine.yield
×
1881
    coroutine_resume = coroutine.resume
×
1882
    coroutine_create = coroutine.create
×
1883
  end
1884

1885
  do
1886
    local call_id = 0
151✔
1887

1888
    -- Description table of socket functions for debug output.
1889
    -- each socket function name has TWO entries;
1890
    -- 'name_in' and 'name_out', each being an array of names/descriptions of respectively
1891
    -- input parameters and return values.
1892
    -- If either table has a 'callback' key, then that is a function that will be called
1893
    -- with the parameters/return-values for further inspection.
1894
    local args = {
151✔
1895
      settimeout_in = {
151✔
1896
        "socket ",
122✔
1897
        "seconds",
122✔
1898
        "mode   ",
1899
      },
151✔
1900
      settimeout_out = {
151✔
1901
        "success",
122✔
1902
        "error  ",
1903
      },
151✔
1904
      connect_in = {
151✔
1905
        "socket ",
122✔
1906
        "address",
122✔
1907
        "port   ",
1908
      },
151✔
1909
      connect_out = {
151✔
1910
        "success",
122✔
1911
        "error  ",
1912
      },
151✔
1913
      getfd_in = {
151✔
1914
        "socket ",
1915
        -- callback = function(...)
1916
        --   print(debug.traceback("called from:", 4))
1917
        -- end,
1918
      },
151✔
1919
      getfd_out = {
151✔
1920
        "fd",
1921
      },
151✔
1922
      send_in = {
151✔
1923
        "socket   ",
122✔
1924
        "data     ",
122✔
1925
        "idx-start",
122✔
1926
        "idx-end  ",
1927
      },
151✔
1928
      send_out = {
151✔
1929
        "last-idx-send    ",
122✔
1930
        "error            ",
122✔
1931
        "err-last-idx-send",
1932
      },
151✔
1933
      receive_in = {
151✔
1934
        "socket ",
122✔
1935
        "pattern",
122✔
1936
        "prefix ",
1937
      },
151✔
1938
      receive_out = {
151✔
1939
        "received    ",
122✔
1940
        "error       ",
122✔
1941
        "partial data",
1942
      },
151✔
1943
      dirty_in = {
151✔
1944
        "socket",
1945
        -- callback = function(...)
1946
        --   print(debug.traceback("called from:", 4))
1947
        -- end,
1948
      },
151✔
1949
      dirty_out = {
151✔
1950
        "data in read-buffer",
1951
      },
151✔
1952
      close_in = {
151✔
1953
        "socket",
1954
        -- callback = function(...)
1955
        --   print(debug.traceback("called from:", 4))
1956
        -- end,
1957
      },
151✔
1958
      close_out = {
151✔
1959
        "success",
122✔
1960
        "error",
1961
      },
151✔
1962
    }
1963
    local function print_call(func, msg, ...)
1964
      print(msg)
240✔
1965
      local arg = pack(...)
240✔
1966
      local desc = args[func] or {}
240✔
1967
      for i = 1, math.max(arg.n, #desc) do
540✔
1968
        local value = arg[i]
300✔
1969
        if type(value) == "string" then
300✔
1970
          local xvalue = value:sub(1,30)
15✔
1971
          if xvalue ~= value then
15✔
1972
            xvalue = xvalue .."(...truncated)"
×
1973
          end
1974
          print("\t"..(desc[i] or i)..": '"..tostring(xvalue).."' ("..type(value).." #"..#value..")")
15✔
1975
        else
1976
          print("\t"..(desc[i] or i)..": '"..tostring(value).."' ("..type(value)..")")
285✔
1977
        end
1978
      end
1979
      if desc.callback then
240✔
1980
        desc.callback(...)
×
1981
      end
1982
    end
1983

1984
    local debug_mt = {
151✔
1985
      __index = function(self, key)
1986
        local value = self.__original_socket[key]
120✔
1987
        if type(value) ~= "function" then
120✔
1988
          return value
×
1989
        end
1990
        return function(self2, ...)
1991
            local my_id = call_id + 1
120✔
1992
            call_id = my_id
120✔
1993
            local results
1994

1995
            if self2 ~= self then
120✔
1996
              -- there is no self
1997
              print_call(tostring(key).."_in", my_id .. "-calling '"..tostring(key) .. "' with; ", self, ...)
×
1998
              results = pack(value(self, ...))
×
1999
            else
2000
              print_call(tostring(key).."_in", my_id .. "-calling '" .. tostring(key) .. "' with; ", self.__original_socket, ...)
120✔
2001
              results = pack(value(self.__original_socket, ...))
155✔
2002
            end
2003
            print_call(tostring(key).."_out", my_id .. "-results '"..tostring(key) .. "' returned; ", unpack(results))
155✔
2004
            return unpack(results)
120✔
2005
          end
2006
      end,
2007
      __tostring = function(self)
2008
        return tostring(self.__original_socket)
20✔
2009
      end
2010
    }
2011

2012

2013
    -- wraps a socket (copas or luasocket) in a debug version printing all calls
2014
    -- and their parameters/return values. Extremely noisy!
2015
    -- returns the wrapped socket.
2016
    -- NOTE: only for plain sockets, will not support TLS
2017
    function copas.debug.socket(original_skt)
302✔
2018
      if (getmetatable(original_skt) == _skt_mt_tcp) or (getmetatable(original_skt) == _skt_mt_udp) then
5✔
2019
        -- already wrapped as Copas socket, so recurse with the original luasocket one
2020
        original_skt.socket = copas.debug.socket(original_skt.socket)
×
2021
        return original_skt
×
2022
      end
2023

2024
      local proxy = setmetatable({
10✔
2025
        __original_socket = original_skt
5✔
2026
      }, debug_mt)
5✔
2027

2028
      return proxy
5✔
2029
    end
2030
  end
2031
end
2032

2033

2034
return copas
151✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc