• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lunarmodules / copas / 10622180872

29 Aug 2024 08:28PM UTC coverage: 84.576% (+0.2%) from 84.343%
10622180872

push

github

web-flow
feat(exit): add exit signalling (#172)

17 of 17 new or added lines in 1 file covered. (100.0%)

5 existing lines in 2 files now uncovered.

1327 of 1569 relevant lines covered (84.58%)

33560.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.51
/src/copas.lua
1
-------------------------------------------------------------------------------
2
-- Copas - Coroutine Oriented Portable Asynchronous Services
3
--
4
-- A dispatcher based on coroutines that can be used by TCP/IP servers.
5
-- Uses LuaSocket as the interface with the TCP/IP stack.
6
--
7
-- Authors: Andre Carregal, Javier Guerra, and Fabio Mascarenhas
8
-- Contributors: Diego Nehab, Mike Pall, David Burgess, Leonardo Godinho,
9
--               Thomas Harning Jr., and Gary NG
10
--
11
-- Copyright 2005-2013 - Kepler Project (www.keplerproject.org), 2015-2024 Thijs Schreijer
12
--
13
-- $Id: copas.lua,v 1.37 2009/04/07 22:09:52 carregal Exp $
14
-------------------------------------------------------------------------------
15

16
if package.loaded["socket.http"] and (_VERSION=="Lua 5.1") then     -- obsolete: only for Lua 5.1 compatibility
122✔
17
  error("you must require copas before require'ing socket.http")
×
18
end
19
if package.loaded["copas.http"] and (_VERSION=="Lua 5.1") then     -- obsolete: only for Lua 5.1 compatibility
122✔
20
  error("you must require copas before require'ing copas.http")
×
21
end
22

23
-- load either LuaSocket, or LuaSystem
24
local socket, system do
122✔
25
  if pcall(require, "socket") then
122✔
26
    -- found LuaSocket
27
    socket = require "socket"
118✔
28
  else
29
    -- fallback to LuaSystem
30
    if pcall(require, "system") then
4✔
31
      system = require "system"
4✔
32
    else
33
      error("Neither LuaSocket nor LuaSystem found, Copas requires at least one of them")
×
34
    end
35
  end
36
end
37

38
local binaryheap = require "binaryheap"
122✔
39
local gettime = (socket or system).gettime
122✔
40
local ssl -- only loaded upon demand
41

42
local WATCH_DOG_TIMEOUT = 120
122✔
43
local UDP_DATAGRAM_MAX = (socket or {})._DATAGRAMSIZE or 8192
122✔
44
local TIMEOUT_PRECISION = 0.1  -- 100ms
122✔
45
local fnil = function() end
129,389✔
46

47

48
local coroutine_create = coroutine.create
122✔
49
local coroutine_running = coroutine.running
122✔
50
local coroutine_yield = coroutine.yield
122✔
51
local coroutine_resume = coroutine.resume
122✔
52
local coroutine_status = coroutine.status
122✔
53

54

55
-- nil-safe versions for pack/unpack
56
local _unpack = unpack or table.unpack
122✔
57
local unpack = function(t, i, j) return _unpack(t, i or 1, j or t.n or #t) end
280✔
58
local pack = function(...) return { n = select("#", ...), ...} end
355✔
59

60

61
local pcall = pcall
122✔
62
if _VERSION=="Lua 5.1" and not jit then     -- obsolete: only for Lua 5.1 compatibility
122✔
UNCOV
63
  pcall = require("coxpcall").pcall
29✔
UNCOV
64
  coroutine_running = require("coxpcall").running
29✔
65
end
66

67

68
if socket then
122✔
69
  -- Redefines LuaSocket functions with coroutine safe versions (pure Lua)
70
  -- (this allows the use of socket.http from within copas)
71
  local err_mt = {
118✔
72
    __tostring = function (self)
73
      return "Copas 'try' error intermediate table: '"..tostring(self[1].."'")
×
74
    end,
75
  }
76

77
  local function statusHandler(status, ...)
78
    if status then return ... end
56✔
79
    local err = (...)
20✔
80
    if type(err) == "table" and getmetatable(err) == err_mt then
20✔
81
      return nil, err[1]
20✔
82
    else
83
      error(err)
×
84
    end
85
  end
86

87
  function socket.protect(func)
118✔
88
    return function (...)
89
            return statusHandler(pcall(func, ...))
56✔
90
          end
91
  end
92

93
  function socket.newtry(finalizer)
118✔
94
    return function (...)
95
            local status = (...)
876✔
96
            if not status then
876✔
97
              pcall(finalizer or fnil, select(2, ...))
20✔
98
              error(setmetatable({ (select(2, ...)) }, err_mt), 0)
20✔
99
            end
100
            return ...
856✔
101
          end
102
  end
103

104
  socket.try = socket.newtry()
118✔
105
end
106

107

108
-- Setup the Copas meta table to auto-load submodules and define a default method
109
local copas do
122✔
110
  local submodules = { "ftp", "http", "lock", "queue", "semaphore", "smtp", "timer" }
122✔
111
  for i, key in ipairs(submodules) do
976✔
112
    submodules[key] = true
854✔
113
    submodules[i] = nil
854✔
114
  end
115

116
  copas = setmetatable({},{
244✔
117
    __index = function(self, key)
118
      if submodules[key] then
158✔
119
        self[key] = require("copas."..key)
158✔
120
        submodules[key] = nil
158✔
121
        return rawget(self, key)
158✔
122
      end
123
    end,
124
    __call = function(self, ...)
125
      return self.loop(...)
4✔
126
    end,
127
  })
122✔
128
end
129

130

131
-- Meta information is public even if beginning with an "_"
132
copas._COPYRIGHT   = "Copyright (C) 2005-2013 Kepler Project, 2015-2024 Thijs Schreijer"
122✔
133
copas._DESCRIPTION = "Coroutine Oriented Portable Asynchronous Services"
122✔
134
copas._VERSION     = "Copas 4.7.1"
122✔
135

136
-- Close the socket associated with the current connection after the handler finishes
137
copas.autoclose = true
122✔
138

139
-- indicator for the loop running
140
copas.running = false
122✔
141

142
-- gettime method from either LuaSocket or LuaSystem: time in (fractional) seconds, since epoch.
143
copas.gettime = gettime
122✔
144

145
-------------------------------------------------------------------------------
146
-- Object names, to track names of thread/coroutines and sockets
147
-------------------------------------------------------------------------------
148
local object_names = setmetatable({}, {
244✔
149
  __mode = "k",
122✔
150
  __index = function(self, key)
151
    local name = tostring(key)
116✔
152
    if key ~= nil then
116✔
153
      rawset(self, key, name)
116✔
154
    end
155
    return name
116✔
156
  end
157
})
158

159
-------------------------------------------------------------------------------
160
-- Simple set implementation
161
-- adds a FIFO queue for each socket in the set
162
-------------------------------------------------------------------------------
163

164
local function newsocketset()
165
  local set = {}
366✔
166

167
  do  -- set implementation
168
    local reverse = {}
366✔
169

170
    -- Adds a socket to the set, does nothing if it exists
171
    -- @return skt if added, or nil if it existed
172
    function set:insert(skt)
366✔
173
      if not reverse[skt] then
712✔
174
        self[#self + 1] = skt
712✔
175
        reverse[skt] = #self
712✔
176
        return skt
712✔
177
      end
178
    end
179

180
    -- Removes socket from the set, does nothing if not found
181
    -- @return skt if removed, or nil if it wasn't in the set
182
    function set:remove(skt)
366✔
183
      local index = reverse[skt]
1,044✔
184
      if index then
1,044✔
185
        reverse[skt] = nil
708✔
186
        local top = self[#self]
708✔
187
        self[#self] = nil
708✔
188
        if top ~= skt then
708✔
189
          reverse[top] = index
62✔
190
          self[index] = top
62✔
191
        end
192
        return skt
708✔
193
      end
194
    end
195

196
  end
197

198
  do  -- queues implementation
199
    local fifo_queues = setmetatable({},{
732✔
200
      __mode = "k",                 -- auto collect queue if socket is gone
366✔
201
      __index = function(self, skt) -- auto create fifo queue if not found
202
        local newfifo = {}
264✔
203
        self[skt] = newfifo
264✔
204
        return newfifo
264✔
205
      end,
206
    })
207

208
    -- pushes an item in the fifo queue for the socket.
209
    function set:push(skt, itm)
366✔
210
      local queue = fifo_queues[skt]
652✔
211
      queue[#queue + 1] = itm
652✔
212
    end
213

214
    -- pops an item from the fifo queue for the socket
215
    function set:pop(skt)
366✔
216
      local queue = fifo_queues[skt]
604✔
217
      return table.remove(queue, 1)
604✔
218
    end
219

220
  end
221

222
  return set
366✔
223
end
224

225

226

227
-- Threads immediately resumable
228
local _resumable = {} do
122✔
229
  local resumelist = {}
122✔
230

231
  function _resumable:push(co)
122✔
232
    resumelist[#resumelist + 1] = co
129,173✔
233
  end
234

235
  function _resumable:clear_resumelist()
122✔
236
    local lst = resumelist
123,000✔
237
    resumelist = {}
123,000✔
238
    return lst
123,000✔
239
  end
240

241
  function _resumable:done()
122✔
242
    return resumelist[1] == nil
125,091✔
243
  end
244

245
  function _resumable:count()
122✔
246
    return #resumelist + #_resumable
×
247
  end
248

249
end
250

251

252

253
-- Similar to the socket set above, but tailored for the use of
254
-- sleeping threads
255
local _sleeping = {} do
122✔
256

257
  local heap = binaryheap.minUnique()
122✔
258
  local lethargy = setmetatable({}, { __mode = "k" }) -- list of coroutines sleeping without a wakeup time
122✔
259

260

261
  -- Required base implementation
262
  -----------------------------------------
263
  _sleeping.insert = fnil
122✔
264
  _sleeping.remove = fnil
122✔
265

266
  -- push a new timer on the heap
267
  function _sleeping:push(sleeptime, co)
122✔
268
    if sleeptime < 0 then
129,263✔
269
      lethargy[co] = true
2,092✔
270
    elseif sleeptime == 0 then
127,171✔
271
      _resumable:push(co)
122,438✔
272
    else
273
      heap:insert(gettime() + sleeptime, co)
4,733✔
274
    end
275
  end
276

277
  -- find the thread that should wake up to the time, if any
278
  function _sleeping:pop(time)
122✔
279
    if time < (heap:peekValue() or math.huge) then
127,599✔
280
      return
123,000✔
281
    end
282
    return heap:pop()
4,599✔
283
  end
284

285
  -- additional methods for time management
286
  -----------------------------------------
287
  function _sleeping:getnext()  -- returns delay until next sleep expires, or nil if there is none
122✔
288
    local t = heap:peekValue()
3,817✔
289
    if t then
3,817✔
290
      -- never report less than 0, because select() might block
291
      return math.max(t - gettime(), 0)
3,817✔
292
    end
293
  end
294

295
  function _sleeping:wakeup(co)
122✔
296
    if lethargy[co] then
2,100✔
297
      lethargy[co] = nil
2,080✔
298
      _resumable:push(co)
2,080✔
299
      return
2,080✔
300
    end
301
    if heap:remove(co) then
20✔
302
      _resumable:push(co)
8✔
303
    end
304
  end
305

306
  function _sleeping:cancel(co)
122✔
307
    lethargy[co] = nil
28✔
308
    heap:remove(co)
28✔
309
  end
310

311
  -- @param tos number of timeouts running
312
  function _sleeping:done(tos)
122✔
313
    -- return true if we have nothing more to do
314
    -- the timeout task doesn't qualify as work (fallbacks only),
315
    -- the lethargy also doesn't qualify as work ('dead' tasks),
316
    -- but the combination of a timeout + a lethargy can be work
317
    return heap:size() == 1       -- 1 means only the timeout-timer task is running
1,721✔
318
           and not (tos > 0 and next(lethargy))
1,721✔
319
  end
320

321
  -- gets number of threads in binaryheap and lethargy
322
  function _sleeping:status()
122✔
323
    local c = 0
×
324
    for _ in pairs(lethargy) do c = c + 1 end
×
325

326
    return heap:size(), c
×
327
  end
328

329
end   -- _sleeping
330

331

332

333
-------------------------------------------------------------------------------
334
-- Tracking coroutines and sockets
335
-------------------------------------------------------------------------------
336

337
local _servers = newsocketset() -- servers being handled
122✔
338
local _threads = setmetatable({}, {__mode = "k"})  -- registered threads added with addthread()
122✔
339
local _canceled = setmetatable({}, {__mode = "k"}) -- threads that are canceled and pending removal
122✔
340
local _autoclose = setmetatable({}, {__mode = "kv"}) -- sockets (value) to close when a thread (key) exits
122✔
341
local _autoclose_r = setmetatable({}, {__mode = "kv"}) -- reverse: sockets (key) to close when a thread (value) exits
122✔
342

343

344
-- for each socket we log the last read and last write times to enable the
345
-- watchdog to follow up if it takes too long.
346
-- tables contain the time, indexed by the socket
347
local _reading_log = {}
122✔
348
local _writing_log = {}
122✔
349

350
local _closed = {} -- track sockets that have been closed (list/array)
122✔
351

352
local _reading = newsocketset() -- sockets currently being read
122✔
353
local _writing = newsocketset() -- sockets currently being written
122✔
354
local _isSocketTimeout = { -- set of errors indicating a socket-timeout
122✔
355
  ["timeout"] = true,      -- default LuaSocket timeout
122✔
356
  ["wantread"] = true,     -- LuaSec specific timeout
122✔
357
  ["wantwrite"] = true,    -- LuaSec specific timeout
122✔
358
}
359

360
-------------------------------------------------------------------------------
361
-- Coroutine based socket timeouts.
362
-------------------------------------------------------------------------------
363
local user_timeouts_connect
364
local user_timeouts_send
365
local user_timeouts_receive
366
do
367
  local timeout_mt = {
122✔
368
    __mode = "k",
122✔
369
    __index = function(self, skt)
370
      -- if there is no timeout found, we insert one automatically, to block forever
371
      self[skt] = math.huge
276✔
372
      return self[skt]
276✔
373
    end,
374
  }
375

376
  user_timeouts_connect = setmetatable({}, timeout_mt)
122✔
377
  user_timeouts_send = setmetatable({}, timeout_mt)
122✔
378
  user_timeouts_receive = setmetatable({}, timeout_mt)
122✔
379
end
380

381
local useSocketTimeoutErrors = setmetatable({},{ __mode = "k" })
122✔
382

383

384
-- sto = socket-time-out
385
local sto_timeout, sto_timed_out, sto_change_queue, sto_error do
122✔
386

387
  local socket_register = setmetatable({}, { __mode = "k" })    -- socket by coroutine
122✔
388
  local operation_register = setmetatable({}, { __mode = "k" }) -- operation "read"/"write" by coroutine
122✔
389
  local timeout_flags = setmetatable({}, { __mode = "k" })      -- true if timedout, by coroutine
122✔
390

391

392
  local function socket_callback(co)
393
    local skt = socket_register[co]
48✔
394
    local queue = operation_register[co]
48✔
395

396
    -- flag the timeout and resume the coroutine
397
    timeout_flags[co] = true
48✔
398
    _resumable:push(co)
48✔
399

400
    -- clear the socket from the current queue
401
    if queue == "read" then
48✔
402
      _reading:remove(skt)
40✔
403
    elseif queue == "write" then
8✔
404
      _writing:remove(skt)
8✔
405
    else
406
      error("bad queue name; expected 'read'/'write', got: "..tostring(queue))
×
407
    end
408
  end
409

410

411
  -- Sets a socket timeout.
412
  -- Calling it as `sto_timeout()` will cancel the timeout.
413
  -- @param queue (string) the queue the socket is currently in, must be either "read" or "write"
414
  -- @param skt (socket) the socket on which to operate
415
  -- @param use_connect_to (bool) timeout to use is determined based on queue (read/write) or if this
416
  -- is truthy, it is the connect timeout.
417
  -- @return true
418
  function sto_timeout(skt, queue, use_connect_to)
93✔
419
    local co = coroutine_running()
2,186,086✔
420
    socket_register[co] = skt
2,186,086✔
421
    operation_register[co] = queue
2,186,086✔
422
    timeout_flags[co] = nil
2,186,086✔
423
    if skt then
2,186,086✔
424
      local to = (use_connect_to and user_timeouts_connect[skt]) or
1,093,067✔
425
                 (queue == "read" and user_timeouts_receive[skt]) or
1,092,863✔
426
                 user_timeouts_send[skt]
9,836✔
427
      copas.timeout(to, socket_callback)
1,093,067✔
428
    else
429
      copas.timeout(0)
1,093,019✔
430
    end
431
    return true
2,186,086✔
432
  end
433

434

435
  -- Changes the timeout to a different queue (read/write).
436
  -- Only usefull with ssl-handshakes and "wantread", "wantwrite" errors, when
437
  -- the queue has to be changed, so the timeout handler knows where to find the socket.
438
  -- @param queue (string) the new queue the socket is in, must be either "read" or "write"
439
  -- @return true
440
  function sto_change_queue(queue)
93✔
441
    operation_register[coroutine_running()] = queue
556✔
442
    return true
556✔
443
  end
444

445

446
  -- Responds with `true` if the operation timed-out.
447
  function sto_timed_out()
93✔
448
    return timeout_flags[coroutine_running()]
700✔
449
  end
450

451

452
  -- Returns the proper timeout error
453
  function sto_error(err)
93✔
454
    return useSocketTimeoutErrors[coroutine_running()] and err or "timeout"
48✔
455
  end
456
end
457

458

459

460
-------------------------------------------------------------------------------
461
-- Coroutine based socket I/O functions.
462
-------------------------------------------------------------------------------
463

464
-- Returns "tcp"" for plain TCP and "ssl" for ssl-wrapped sockets, so truthy
465
-- for tcp based, and falsy for udp based.
466
local isTCP do
122✔
467
  local lookup = {
122✔
468
    tcp = "tcp",
122✔
469
    SSL = "ssl",
122✔
470
  }
471

472
  function isTCP(socket)
93✔
473
    return lookup[tostring(socket):sub(1,3)]
412✔
474
  end
475
end
476

477
function copas.close(skt, ...)
122✔
478
  _closed[#_closed+1] = skt
140✔
479
  return skt:close(...)
140✔
480
end
481

482

483

484
-- nil or negative is indefinitly
485
function copas.settimeout(skt, timeout)
122✔
486
  timeout = timeout or -1
136✔
487
  if type(timeout) ~= "number" then
136✔
488
    return nil, "timeout must be 'nil' or a number"
12✔
489
  end
490

491
  return copas.settimeouts(skt, timeout, timeout, timeout)
124✔
492
end
493

494
-- negative is indefinitly, nil means do not change
495
function copas.settimeouts(skt, connect, send, read)
122✔
496

497
  if connect ~= nil and type(connect) ~= "number" then
272✔
498
    return nil, "connect timeout must be 'nil' or a number"
×
499
  end
500
  if connect then
272✔
501
    if connect < 0 then
272✔
502
      connect = nil
×
503
    end
504
    user_timeouts_connect[skt] = connect
272✔
505
  end
506

507

508
  if send ~= nil and type(send) ~= "number" then
272✔
509
    return nil, "send timeout must be 'nil' or a number"
×
510
  end
511
  if send then
272✔
512
    if send < 0 then
272✔
513
      send = nil
×
514
    end
515
    user_timeouts_send[skt] = send
272✔
516
  end
517

518

519
  if read ~= nil and type(read) ~= "number" then
272✔
520
    return nil, "read timeout must be 'nil' or a number"
×
521
  end
522
  if read then
272✔
523
    if read < 0 then
272✔
524
      read = nil
×
525
    end
526
    user_timeouts_receive[skt] = read
272✔
527
  end
528

529

530
  return true
272✔
531
end
532

533
-- reads a pattern from a client and yields to the reading set on timeouts
534
-- UDP: a UDP socket expects a second argument to be a number, so it MUST
535
-- be provided as the 'pattern' below defaults to a string. Will throw a
536
-- 'bad argument' error if omitted.
537
function copas.receive(client, pattern, part)
122✔
538
  local s, err
539
  pattern = pattern or "*l"
1,083,003✔
540
  local current_log = _reading_log
1,083,003✔
541
  sto_timeout(client, "read")
1,083,003✔
542

543
  repeat
544
    s, err, part = client:receive(pattern, part)
1,083,343✔
545

546
    -- guarantees that high throughput doesn't take other threads to starvation
547
    if (math.random(100) > 90) then
1,083,343✔
548
      copas.pause()
108,609✔
549
    end
550

551
    if s then
1,083,343✔
552
      current_log[client] = nil
1,082,935✔
553
      sto_timeout()
1,082,935✔
554
      return s, err, part
1,082,935✔
555

556
    elseif not _isSocketTimeout[err] then
408✔
557
      current_log[client] = nil
32✔
558
      sto_timeout()
32✔
559
      return s, err, part
32✔
560

561
    elseif sto_timed_out() then
376✔
562
      current_log[client] = nil
36✔
563
      return nil, sto_error(err), part
36✔
564
    end
565

566
    if err == "wantwrite" then -- wantwrite may be returned during SSL renegotiations
340✔
567
      current_log = _writing_log
×
568
      current_log[client] = gettime()
×
569
      sto_change_queue("write")
×
570
      coroutine_yield(client, _writing)
×
571
    else
572
      current_log = _reading_log
340✔
573
      current_log[client] = gettime()
340✔
574
      sto_change_queue("read")
340✔
575
      coroutine_yield(client, _reading)
340✔
576
    end
577
  until false
340✔
578
end
579

580
-- receives data from a client over UDP. Not available for TCP.
581
-- (this is a copy of receive() method, adapted for receivefrom() use)
582
function copas.receivefrom(client, size)
122✔
583
  local s, err, port
584
  size = size or UDP_DATAGRAM_MAX
16✔
585
  sto_timeout(client, "read")
16✔
586

587
  repeat
588
    s, err, port = client:receivefrom(size) -- upon success err holds ip address
32✔
589

590
    -- garantees that high throughput doesn't take other threads to starvation
591
    if (math.random(100) > 90) then
32✔
592
      copas.pause()
4✔
593
    end
594

595
    if s then
32✔
596
      _reading_log[client] = nil
12✔
597
      sto_timeout()
12✔
598
      return s, err, port
12✔
599

600
    elseif err ~= "timeout" then
20✔
601
      _reading_log[client] = nil
×
602
      sto_timeout()
×
603
      return s, err, port
×
604

605
    elseif sto_timed_out() then
20✔
606
      _reading_log[client] = nil
4✔
607
      return nil, sto_error(err), port
4✔
608
    end
609

610
    _reading_log[client] = gettime()
16✔
611
    coroutine_yield(client, _reading)
16✔
612
  until false
16✔
613
end
614

615
-- same as above but with special treatment when reading chunks,
616
-- unblocks on any data received.
617
function copas.receivepartial(client, pattern, part)
122✔
618
  local s, err
619
  pattern = pattern or "*l"
8✔
620
  local orig_size = #(part or "")
8✔
621
  local current_log = _reading_log
8✔
622
  sto_timeout(client, "read")
8✔
623

624
  repeat
625
    s, err, part = client:receive(pattern, part)
8✔
626

627
    -- guarantees that high throughput doesn't take other threads to starvation
628
    if (math.random(100) > 90) then
8✔
UNCOV
629
      copas.pause()
1✔
630
    end
631

632
    if s or (type(part) == "string" and #part > orig_size) then
8✔
633
      current_log[client] = nil
8✔
634
      sto_timeout()
8✔
635
      return s, err, part
8✔
636

637
    elseif not _isSocketTimeout[err] then
×
638
      current_log[client] = nil
×
639
      sto_timeout()
×
640
      return s, err, part
×
641

642
    elseif sto_timed_out() then
×
643
      current_log[client] = nil
×
644
      return nil, sto_error(err), part
×
645
    end
646

647
    if err == "wantwrite" then
×
648
      current_log = _writing_log
×
649
      current_log[client] = gettime()
×
650
      sto_change_queue("write")
×
651
      coroutine_yield(client, _writing)
×
652
    else
653
      current_log = _reading_log
×
654
      current_log[client] = gettime()
×
655
      sto_change_queue("read")
×
656
      coroutine_yield(client, _reading)
×
657
    end
658
  until false
×
659
end
660
copas.receivePartial = copas.receivepartial  -- compat: receivePartial is deprecated
122✔
661

662
-- sends data to a client. The operation is buffered and
663
-- yields to the writing set on timeouts
664
-- Note: from and to parameters will be ignored by/for UDP sockets
665
function copas.send(client, data, from, to)
122✔
666
  local s, err
667
  from = from or 1
9,836✔
668
  local lastIndex = from - 1
9,836✔
669
  local current_log = _writing_log
9,836✔
670
  sto_timeout(client, "write")
9,836✔
671

672
  repeat
673
    s, err, lastIndex = client:send(data, lastIndex + 1, to)
9,952✔
674

675
    -- guarantees that high throughput doesn't take other threads to starvation
676
    if (math.random(100) > 90) then
9,952✔
677
      copas.pause()
967✔
678
    end
679

680
    if s then
9,952✔
681
      current_log[client] = nil
9,820✔
682
      sto_timeout()
9,820✔
683
      return s, err, lastIndex
9,820✔
684

685
    elseif not _isSocketTimeout[err] then
132✔
686
      current_log[client] = nil
16✔
687
      sto_timeout()
16✔
688
      return s, err, lastIndex
16✔
689

690
    elseif sto_timed_out() then
116✔
691
      current_log[client] = nil
×
692
      return nil, sto_error(err), lastIndex
×
693
    end
694

695
    if err == "wantread" then
116✔
696
      current_log = _reading_log
×
697
      current_log[client] = gettime()
×
698
      sto_change_queue("read")
×
699
      coroutine_yield(client, _reading)
×
700
    else
701
      current_log = _writing_log
116✔
702
      current_log[client] = gettime()
116✔
703
      sto_change_queue("write")
116✔
704
      coroutine_yield(client, _writing)
116✔
705
    end
706
  until false
116✔
707
end
708

709
function copas.sendto(client, data, ip, port)
122✔
710
  -- deprecated; for backward compatibility only, since UDP doesn't block on sending
711
  return client:sendto(data, ip, port)
×
712
end
713

714
-- waits until connection is completed
715
function copas.connect(skt, host, port)
122✔
716
  skt:settimeout(0)
136✔
717
  local ret, err, tried_more_than_once
718
  sto_timeout(skt, "write", true)
136✔
719

720
  repeat
721
    ret, err = skt:connect(host, port)
216✔
722

723
    -- non-blocking connect on Windows results in error "Operation already
724
    -- in progress" to indicate that it is completing the request async. So essentially
725
    -- it is the same as "timeout"
726
    if ret or (err ~= "timeout" and err ~= "Operation already in progress") then
216✔
727
      _writing_log[skt] = nil
128✔
728
      sto_timeout()
128✔
729
      -- Once the async connect completes, Windows returns the error "already connected"
730
      -- to indicate it is done, so that error should be ignored. Except when it is the
731
      -- first call to connect, then it was already connected to something else and the
732
      -- error should be returned
733
      if (not ret) and (err == "already connected" and tried_more_than_once) then
128✔
734
        return 1
×
735
      end
736
      return ret, err
128✔
737

738
    elseif sto_timed_out() then
88✔
739
      _writing_log[skt] = nil
8✔
740
      return nil, sto_error(err)
8✔
741
    end
742

743
    tried_more_than_once = tried_more_than_once or true
80✔
744
    _writing_log[skt] = gettime()
80✔
745
    coroutine_yield(skt, _writing)
80✔
746
  until false
80✔
747
end
748

749

750
-- Wraps a tcp socket in an ssl socket and configures it. If the socket was
751
-- already wrapped, it does nothing and returns the socket.
752
-- @param wrap_params the parameters for the ssl-context
753
-- @return wrapped socket, or throws an error
754
local function ssl_wrap(skt, wrap_params)
755
  if isTCP(skt) == "ssl" then return skt end -- was already wrapped
120✔
756
  if not wrap_params then
68✔
757
    error("cannot wrap socket into a secure socket (using 'ssl.wrap()') without parameters/context")
×
758
  end
759

760
  ssl = ssl or require("ssl")
68✔
761
  local nskt = assert(ssl.wrap(skt, wrap_params)) -- assert, because we do not want to silently ignore this one!!
68✔
762

763
  nskt:settimeout(0)  -- non-blocking on the ssl-socket
68✔
764
  copas.settimeouts(nskt, user_timeouts_connect[skt],
136✔
765
    user_timeouts_send[skt], user_timeouts_receive[skt]) -- copy copas user-timeout to newly wrapped one
68✔
766

767
  local co = _autoclose_r[skt]
68✔
768
  if co then
68✔
769
    -- socket registered for autoclose, move registration to wrapped one
770
    _autoclose[co] = nskt
16✔
771
    _autoclose_r[skt] = nil
16✔
772
    _autoclose_r[nskt] = co
16✔
773
  end
774

775
  local sock_name = object_names[skt]
68✔
776
  if sock_name ~= tostring(skt) then
68✔
777
    -- socket had a custom name, so copy it over
778
    object_names[nskt] = sock_name
24✔
779
  end
780
  return nskt
68✔
781
end
782

783

784
-- For each luasec method we have a subtable, allows for future extension.
785
-- Required structure:
786
-- {
787
--   wrap = ... -- parameter to 'wrap()'; the ssl parameter table, or the context object
788
--   sni = {                  -- parameters to 'sni()'
789
--     names = string | table -- 1st parameter
790
--     strict = bool          -- 2nd parameter
791
--   }
792
-- }
793
local function normalize_sslt(sslt)
794
  local t = type(sslt)
216✔
795
  local r = setmetatable({}, {
432✔
796
    __index = function(self, key)
797
      -- a bug if this happens, here as a sanity check, just being careful since
798
      -- this is security stuff
799
      error("accessing unknown 'ssl_params' table key: "..tostring(key))
×
800
    end,
801
  })
802
  if t == "nil" then
216✔
803
    r.wrap = false
148✔
804
    r.sni = false
148✔
805

806
  elseif t == "table" then
68✔
807
    if sslt.mode or sslt.protocol then
68✔
808
      -- has the mandatory fields for the ssl-params table for handshake
809
      -- backward compatibility
810
      r.wrap = sslt
16✔
811
      r.sni = false
16✔
812
    else
813
      -- has the target definition, copy our known keys
814
      r.wrap = sslt.wrap or false -- 'or false' because we do not want nils
52✔
815
      r.sni = sslt.sni or false -- 'or false' because we do not want nils
52✔
816
    end
817

818
  elseif t == "userdata" then
×
819
    -- it's an ssl-context object for the handshake
820
    -- backward compatibility
821
    r.wrap = sslt
×
822
    r.sni = false
×
823

824
  else
825
    error("ssl parameters; did not expect type "..tostring(sslt))
×
826
  end
827

828
  return r
216✔
829
end
830

831

832
---
833
-- Peforms an (async) ssl handshake on a connected TCP client socket.
834
-- NOTE: if not ssl-wrapped already, then replace all previous socket references, with the returned new ssl wrapped socket
835
-- Throws error and does not return nil+error, as that might silently fail
836
-- in code like this;
837
--   copas.addserver(s1, function(skt)
838
--       skt = copas.wrap(skt, sparams)
839
--       skt:dohandshake()   --> without explicit error checking, this fails silently and
840
--       skt:send(body)      --> continues unencrypted
841
-- @param skt Regular LuaSocket CLIENT socket object
842
-- @param wrap_params Table with ssl parameters
843
-- @return wrapped ssl socket, or throws an error
844
function copas.dohandshake(skt, wrap_params)
122✔
845
  ssl = ssl or require("ssl")
68✔
846

847
  local nskt = ssl_wrap(skt, wrap_params)
68✔
848

849
  sto_timeout(nskt, "write", true)
68✔
850
  local queue
851

852
  repeat
853
    local success, err = nskt:dohandshake()
168✔
854

855
    if success then
168✔
856
      sto_timeout()
60✔
857
      return nskt
60✔
858

859
    elseif not _isSocketTimeout[err] then
108✔
860
      sto_timeout()
8✔
861
      error("TLS/SSL handshake failed: " .. tostring(err))
8✔
862

863
    elseif sto_timed_out() then
100✔
864
      return nil, sto_error(err)
×
865

866
    elseif err == "wantwrite" then
100✔
867
      sto_change_queue("write")
×
868
      queue = _writing
×
869

870
    elseif err == "wantread" then
100✔
871
      sto_change_queue("read")
100✔
872
      queue = _reading
100✔
873

874
    else
875
      error("TLS/SSL handshake failed: " .. tostring(err))
×
876
    end
877

878
    coroutine_yield(nskt, queue)
100✔
879
  until false
100✔
880
end
881

882
-- flushes a client write buffer (deprecated)
883
function copas.flush()
122✔
884
end
885

886
-- wraps a TCP socket to use Copas methods (send, receive, flush and settimeout)
887
local _skt_mt_tcp = {
122✔
888
      __tostring = function(self)
889
        return tostring(self.socket).." (copas wrapped)"
12✔
890
      end,
891

892
      __index = {
122✔
893
        send = function (self, data, from, to)
894
          return copas.send (self.socket, data, from, to)
9,832✔
895
        end,
896

897
        receive = function (self, pattern, prefix)
898
          if user_timeouts_receive[self.socket] == 0 then
1,082,999✔
899
            return copas.receivepartial(self.socket, pattern, prefix)
8✔
900
          end
901
          return copas.receive(self.socket, pattern, prefix)
1,082,991✔
902
        end,
903

904
        receivepartial = function (self, pattern, prefix)
905
          return copas.receivepartial(self.socket, pattern, prefix)
×
906
        end,
907

908
        flush = function (self)
909
          return copas.flush(self.socket)
×
910
        end,
911

912
        settimeout = function (self, time)
913
          return copas.settimeout(self.socket, time)
124✔
914
        end,
915

916
        settimeouts = function (self, connect, send, receive)
917
          return copas.settimeouts(self.socket, connect, send, receive)
×
918
        end,
919

920
        -- TODO: socket.connect is a shortcut, and must be provided with an alternative
921
        -- if ssl parameters are available, it will also include a handshake
922
        connect = function(self, ...)
923
          local res, err = copas.connect(self.socket, ...)
136✔
924
          if res then
136✔
925
            if self.ssl_params.sni then self:sni() end
124✔
926
            if self.ssl_params.wrap then res, err = self:dohandshake() end
124✔
927
          end
928
          return res, err
132✔
929
        end,
930

931
        close = function(self, ...)
932
          return copas.close(self.socket, ...)
140✔
933
        end,
934

935
        -- TODO: socket.bind is a shortcut, and must be provided with an alternative
936
        bind = function(self, ...) return self.socket:bind(...) end,
122✔
937

938
        -- TODO: is this DNS related? hence blocking?
939
        getsockname = function(self, ...)
940
          local ok, ip, port, family = pcall(self.socket.getsockname, self.socket, ...)
×
941
          if ok then
×
942
            return ip, port, family
×
943
          else
944
            return nil, "not implemented by LuaSec"
×
945
          end
946
        end,
947

948
        getstats = function(self, ...) return self.socket:getstats(...) end,
122✔
949

950
        setstats = function(self, ...) return self.socket:setstats(...) end,
122✔
951

952
        listen = function(self, ...) return self.socket:listen(...) end,
122✔
953

954
        accept = function(self, ...) return self.socket:accept(...) end,
122✔
955

956
        setoption = function(self, ...)
957
          local ok, res, err = pcall(self.socket.setoption, self.socket, ...)
×
958
          if ok then
×
959
            return res, err
×
960
          else
961
            return nil, "not implemented by LuaSec"
×
962
          end
963
        end,
964

965
        getoption = function(self, ...)
966
          local ok, val, err = pcall(self.socket.getoption, self.socket, ...)
×
967
          if ok then
×
968
            return val, err
×
969
          else
970
            return nil, "not implemented by LuaSec"
×
971
          end
972
        end,
973

974
        -- TODO: is this DNS related? hence blocking?
975
        getpeername = function(self, ...)
976
          local ok, ip, port, family = pcall(self.socket.getpeername, self.socket, ...)
×
977
          if ok then
×
978
            return ip, port, family
×
979
          else
980
            return nil, "not implemented by LuaSec"
×
981
          end
982
        end,
983

984
        shutdown = function(self, ...) return self.socket:shutdown(...) end,
122✔
985

986
        sni = function(self, names, strict)
987
          local sslp = self.ssl_params
52✔
988
          self.socket = ssl_wrap(self.socket, sslp.wrap)
52✔
989
          if names == nil then
52✔
990
            names = sslp.sni.names
44✔
991
            strict = sslp.sni.strict
44✔
992
          end
993
          return self.socket:sni(names, strict)
52✔
994
        end,
995

996
        dohandshake = function(self, wrap_params)
997
          local nskt, err = copas.dohandshake(self.socket, wrap_params or self.ssl_params.wrap)
68✔
998
          if not nskt then return nskt, err end
60✔
999
          self.socket = nskt  -- replace internal socket with the newly wrapped ssl one
60✔
1000
          return self
60✔
1001
        end,
1002

1003
        getalpn = function(self, ...)
1004
          local ok, proto, err = pcall(self.socket.getalpn, self.socket, ...)
×
1005
          if ok then
×
1006
            return proto, err
×
1007
          else
1008
            return nil, "not a tls socket"
×
1009
          end
1010
        end,
1011

1012
        getsniname = function(self, ...)
1013
          local ok, name, err = pcall(self.socket.getsniname, self.socket, ...)
×
1014
          if ok then
×
1015
            return name, err
×
1016
          else
1017
            return nil, "not a tls socket"
×
1018
          end
1019
        end,
1020
      }
122✔
1021
}
1022

1023
-- wraps a UDP socket, copy of TCP one adapted for UDP.
1024
local _skt_mt_udp = {__index = { }}
122✔
1025
for k,v in pairs(_skt_mt_tcp) do _skt_mt_udp[k] = _skt_mt_udp[k] or v end
366✔
1026
for k,v in pairs(_skt_mt_tcp.__index) do _skt_mt_udp.__index[k] = v end
2,806✔
1027

1028
_skt_mt_udp.__index.send        = function(self, ...) return self.socket:send(...) end
126✔
1029

1030
_skt_mt_udp.__index.sendto      = function(self, ...) return self.socket:sendto(...) end
134✔
1031

1032

1033
_skt_mt_udp.__index.receive =     function (self, size)
122✔
1034
                                    return copas.receive (self.socket, (size or UDP_DATAGRAM_MAX))
8✔
1035
                                  end
1036

1037
_skt_mt_udp.__index.receivefrom = function (self, size)
122✔
1038
                                    return copas.receivefrom (self.socket, (size or UDP_DATAGRAM_MAX))
16✔
1039
                                  end
1040

1041
                                  -- TODO: is this DNS related? hence blocking?
1042
_skt_mt_udp.__index.setpeername = function(self, ...) return self.socket:setpeername(...) end
126✔
1043

1044
_skt_mt_udp.__index.setsockname = function(self, ...) return self.socket:setsockname(...) end
122✔
1045

1046
                                    -- do not close client, as it is also the server for udp.
1047
_skt_mt_udp.__index.close       = function(self, ...) return true end
130✔
1048

1049
_skt_mt_udp.__index.settimeouts = function (self, connect, send, receive)
122✔
1050
                                    return copas.settimeouts(self.socket, connect, send, receive)
×
1051
                                  end
1052

1053

1054

1055
---
1056
-- Wraps a LuaSocket socket object in an async Copas based socket object.
1057
-- @param skt The socket to wrap
1058
-- @sslt (optional) Table with ssl parameters, use an empty table to use ssl with defaults
1059
-- @return wrapped socket object
1060
function copas.wrap (skt, sslt)
122✔
1061
  if (getmetatable(skt) == _skt_mt_tcp) or (getmetatable(skt) == _skt_mt_udp) then
232✔
1062
    return skt -- already wrapped
×
1063
  end
1064

1065
  skt:settimeout(0)
232✔
1066

1067
  if isTCP(skt) then
232✔
1068
    return setmetatable ({socket = skt, ssl_params = normalize_sslt(sslt)}, _skt_mt_tcp)
216✔
1069
  else
1070
    return setmetatable ({socket = skt}, _skt_mt_udp)
16✔
1071
  end
1072
end
1073

1074
--- Wraps a handler in a function that deals with wrapping the socket and doing the
1075
-- optional ssl handshake.
1076
function copas.handler(handler, sslparams)
122✔
1077
  -- TODO: pass a timeout value to set, and use during handshake
1078
  return function (skt, ...)
1079
    skt = copas.wrap(skt, sslparams) -- this call will normalize the sslparams table
64✔
1080
    local sslp = skt.ssl_params
64✔
1081
    if sslp.sni then skt:sni(sslp.sni.names, sslp.sni.strict) end
64✔
1082
    if sslp.wrap then skt:dohandshake(sslp.wrap) end
64✔
1083
    return handler(skt, ...)
60✔
1084
  end
1085
end
1086

1087

1088
--------------------------------------------------
1089
-- Error handling
1090
--------------------------------------------------
1091

1092
local _errhandlers = setmetatable({}, { __mode = "k" })   -- error handler per coroutine
122✔
1093

1094

1095
function copas.gettraceback(msg, co, skt)
122✔
1096
  local co_str = co == nil and "nil" or copas.getthreadname(co)
26✔
1097
  local skt_str = skt == nil and "nil" or copas.getsocketname(skt)
26✔
1098
  local msg_str = msg == nil and "" or tostring(msg)
26✔
1099
  if msg_str == "" then
26✔
1100
    msg_str = ("(coroutine: %s, socket: %s)"):format(msg_str, co_str, skt_str)
×
1101
  else
1102
    msg_str = ("%s (coroutine: %s, socket: %s)"):format(msg_str, co_str, skt_str)
26✔
1103
  end
1104

1105
  if type(co) == "thread" then
26✔
1106
    -- regular Copas coroutine
1107
    return debug.traceback(co, msg_str)
26✔
1108
  end
1109
  -- not a coroutine, but the main thread, this happens if a timeout callback
1110
  -- (see `copas.timeout` causes an error (those callbacks run on the main thread).
1111
  return debug.traceback(msg_str, 2)
×
1112
end
1113

1114

1115
local function _deferror(msg, co, skt)
1116
  print(copas.gettraceback(msg, co, skt))
18✔
1117
end
1118

1119

1120
function copas.seterrorhandler(err, default)
122✔
1121
  assert(err == nil or type(err) == "function", "Expected the handler to be a function, or nil")
40✔
1122
  if default then
40✔
1123
    assert(err ~= nil, "Expected the handler to be a function when setting the default")
28✔
1124
    _deferror = err
28✔
1125
  else
1126
    _errhandlers[coroutine_running()] = err
12✔
1127
  end
1128
end
1129
copas.setErrorHandler = copas.seterrorhandler  -- deprecated; old casing
122✔
1130

1131

1132
function copas.geterrorhandler(co)
122✔
1133
  co = co or coroutine_running()
8✔
1134
  return _errhandlers[co] or _deferror
8✔
1135
end
1136

1137

1138
-- if `bool` is truthy, then the original socket errors will be returned in case of timeouts;
1139
-- `timeout, wantread, wantwrite, Operation already in progress`. If falsy, it will always
1140
-- return `timeout`.
1141
function copas.useSocketTimeoutErrors(bool)
122✔
1142
  useSocketTimeoutErrors[coroutine_running()] = not not bool -- force to a boolean
4✔
1143
end
1144

1145
-------------------------------------------------------------------------------
1146
-- Thread handling
1147
-------------------------------------------------------------------------------
1148

1149
local function _doTick (co, skt, ...)
1150
  if not co then return end
133,333✔
1151

1152
  -- if a coroutine was canceled/removed, don't resume it
1153
  if _canceled[co] then
133,333✔
1154
    _canceled[co] = nil -- also clean up the registry
8✔
1155
    _threads[co] = nil
8✔
1156
    return
8✔
1157
  end
1158

1159
  -- res: the socket (being read/write on) or the time to sleep
1160
  -- new_q: either _writing, _reading, or _sleeping
1161
  -- local time_before = gettime()
1162
  local ok, res, new_q = coroutine_resume(co, skt, ...)
133,325✔
1163
  -- local duration = gettime() - time_before
1164
  -- if duration > 1 then
1165
  --   duration = math.floor(duration * 1000)
1166
  --   pcall(_errhandlers[co] or _deferror, "task ran for "..tostring(duration).." milliseconds.", co, skt)
1167
  -- end
1168

1169
  if new_q == _reading or new_q == _writing or new_q == _sleeping then
133,321✔
1170
    -- we're yielding to a new queue
1171
    new_q:insert (res)
129,915✔
1172
    new_q:push (res, co)
129,915✔
1173
    return
129,915✔
1174
  end
1175

1176
  -- coroutine is terminating
1177

1178
  if ok and coroutine_status(co) ~= "dead" then
3,406✔
1179
    -- it called coroutine.yield from a non-Copas function which is unexpected
1180
    ok = false
4✔
1181
    res = "coroutine.yield was called without a resume first, user-code cannot yield to Copas"
4✔
1182
  end
1183

1184
  if not ok then
3,406✔
1185
    local k, e = pcall(_errhandlers[co] or _deferror, res, co, skt)
31✔
1186
    if not k then
31✔
1187
      print("Failed executing error handler: " .. tostring(e))
×
1188
    end
1189
  end
1190

1191
  local skt_to_close = _autoclose[co]
3,406✔
1192
  if skt_to_close then
3,406✔
1193
    skt_to_close:close()
76✔
1194
    _autoclose[co] = nil
76✔
1195
    _autoclose_r[skt_to_close] = nil
76✔
1196
  end
1197

1198
  _errhandlers[co] = nil
3,406✔
1199
end
1200

1201

1202
local _accept do
122✔
1203
  local client_counters = setmetatable({}, { __mode = "k" })
122✔
1204

1205
  -- accepts a connection on socket input
1206
  function _accept(server_skt, handler)
93✔
1207
    local client_skt = server_skt:accept()
80✔
1208
    if client_skt then
80✔
1209
      local count = (client_counters[server_skt] or 0) + 1
80✔
1210
      client_counters[server_skt] = count
80✔
1211
      object_names[client_skt] = object_names[server_skt] .. ":client_" .. count
80✔
1212

1213
      client_skt:settimeout(0)
80✔
1214
      copas.settimeouts(client_skt, user_timeouts_connect[server_skt],  -- copy server socket timeout settings
160✔
1215
        user_timeouts_send[server_skt], user_timeouts_receive[server_skt])
80✔
1216

1217
      local co = coroutine_create(handler)
80✔
1218
      object_names[co] = object_names[server_skt] .. ":handler_" .. count
80✔
1219

1220
      if copas.autoclose then
80✔
1221
        _autoclose[co] = client_skt
80✔
1222
        _autoclose_r[client_skt] = co
80✔
1223
      end
1224

1225
      _doTick(co, client_skt)
80✔
1226
    end
1227
  end
1228
end
1229

1230
-------------------------------------------------------------------------------
1231
-- Adds a server/handler pair to Copas dispatcher
1232
-------------------------------------------------------------------------------
1233

1234
do
1235
  local function addTCPserver(server, handler, timeout, name)
1236
    server:settimeout(0)
60✔
1237
    if name then
60✔
1238
      object_names[server] = name
×
1239
    end
1240
    _servers[server] = handler
60✔
1241
    _reading:insert(server)
60✔
1242
    if timeout then
60✔
1243
      copas.settimeout(server, timeout)
12✔
1244
    end
1245
  end
1246

1247
  local function addUDPserver(server, handler, timeout, name)
1248
    server:settimeout(0)
×
1249
    local co = coroutine_create(handler)
×
1250
    if name then
×
1251
      object_names[server] = name
×
1252
    end
1253
    object_names[co] = object_names[server]..":handler"
×
1254
    _reading:insert(server)
×
1255
    if timeout then
×
1256
      copas.settimeout(server, timeout)
×
1257
    end
1258
    _doTick(co, server)
×
1259
  end
1260

1261

1262
  function copas.addserver(server, handler, timeout, name)
122✔
1263
    if isTCP(server) then
60✔
1264
      addTCPserver(server, handler, timeout, name)
60✔
1265
    else
1266
      addUDPserver(server, handler, timeout, name)
×
1267
    end
1268
  end
1269
end
1270

1271

1272
function copas.removeserver(server, keep_open)
122✔
1273
  local skt = server
56✔
1274
  local mt = getmetatable(server)
56✔
1275
  if mt == _skt_mt_tcp or mt == _skt_mt_udp then
56✔
1276
    skt = server.socket
×
1277
  end
1278

1279
  _servers:remove(skt)
56✔
1280
  _reading:remove(skt)
56✔
1281

1282
  if keep_open then
56✔
1283
    return true
12✔
1284
  end
1285
  return server:close()
44✔
1286
end
1287

1288

1289

1290
-------------------------------------------------------------------------------
1291
-- Adds an new coroutine thread to Copas dispatcher
1292
-------------------------------------------------------------------------------
1293
function copas.addnamedthread(name, handler, ...)
122✔
1294
  if type(name) == "function" and type(handler) == "string" then
3,484✔
1295
    -- old call, flip args for compatibility
1296
    name, handler = handler, name
×
1297
  end
1298

1299
  -- create a coroutine that skips the first argument, which is always the socket
1300
  -- passed by the scheduler, but `nil` in case of a task/thread
1301
  local thread = coroutine_create(function(_, ...)
6,968✔
1302
    copas.pause()
3,484✔
1303
    return handler(...)
3,480✔
1304
  end)
1305
  if name then
3,484✔
1306
    object_names[thread] = name
296✔
1307
  end
1308

1309
  _threads[thread] = true -- register this thread so it can be removed
3,484✔
1310
  _doTick (thread, nil, ...)
3,484✔
1311
  return thread
3,484✔
1312
end
1313

1314

1315
function copas.addthread(handler, ...)
122✔
1316
  return copas.addnamedthread(nil, handler, ...)
3,188✔
1317
end
1318

1319

1320
function copas.removethread(thread)
122✔
1321
  -- if the specified coroutine is registered, add it to the canceled table so
1322
  -- that next time it tries to resume it exits.
1323
  _canceled[thread] = _threads[thread or 0]
28✔
1324
  _sleeping:cancel(thread)
28✔
1325
end
1326

1327

1328

1329
-------------------------------------------------------------------------------
1330
-- Sleep/pause management functions
1331
-------------------------------------------------------------------------------
1332

1333
-- yields the current coroutine and wakes it after 'sleeptime' seconds.
1334
-- If sleeptime < 0 then it sleeps until explicitly woken up using 'wakeup'
1335
-- TODO: deprecated, remove in next major
1336
function copas.sleep(sleeptime)
122✔
1337
  coroutine_yield((sleeptime or 0), _sleeping)
×
1338
end
1339

1340

1341
-- yields the current coroutine and wakes it after 'sleeptime' seconds.
1342
-- if sleeptime < 0 then it sleeps 0 seconds.
1343
function copas.pause(sleeptime)
122✔
1344
  if sleeptime and sleeptime > 0 then
127,171✔
1345
    coroutine_yield(sleeptime, _sleeping)
4,733✔
1346
  else
1347
    coroutine_yield(0, _sleeping)
122,438✔
1348
  end
1349
end
1350

1351

1352
-- yields the current coroutine until explicitly woken up using 'wakeup'
1353
function copas.pauseforever()
122✔
1354
  coroutine_yield(-1, _sleeping)
2,092✔
1355
end
1356

1357

1358
-- Wakes up a sleeping coroutine 'co'.
1359
function copas.wakeup(co)
122✔
1360
  _sleeping:wakeup(co)
2,100✔
1361
end
1362

1363

1364

1365
-------------------------------------------------------------------------------
1366
-- Timeout management
1367
-------------------------------------------------------------------------------
1368

1369
do
1370
  local timeout_register = setmetatable({}, { __mode = "k" })
122✔
1371
  local time_out_thread
1372
  local timerwheel = require("timerwheel").new({
244✔
1373
      now = gettime,
122✔
1374
      precision = TIMEOUT_PRECISION,
122✔
1375
      ringsize = math.floor(60*60*24/TIMEOUT_PRECISION),  -- ring size 1 day
122✔
1376
      err_handler = function(err)
1377
        return _deferror(err, time_out_thread)
11✔
1378
      end,
1379
    })
1380

1381
  time_out_thread = copas.addnamedthread("copas_core_timer", function()
244✔
1382
    while true do
1383
      copas.pause(TIMEOUT_PRECISION)
4,152✔
1384
      timerwheel:step()
4,034✔
1385
    end
1386
  end)
1387

1388
  -- get the number of timeouts running
1389
  function copas.gettimeouts()
122✔
1390
    return timerwheel:count()
1,721✔
1391
  end
1392

1393
  --- Sets the timeout for the current coroutine.
1394
  -- @param delay delay (seconds), use 0 (or math.huge) to cancel the timerout
1395
  -- @param callback function with signature: `function(coroutine)` where coroutine is the routine that timed-out
1396
  -- @return true
1397
  function copas.timeout(delay, callback)
122✔
1398
    local co = coroutine_running()
2,189,241✔
1399
    local existing_timer = timeout_register[co]
2,189,241✔
1400

1401
    if existing_timer then
2,189,241✔
1402
      timerwheel:cancel(existing_timer)
2,784✔
1403
    end
1404

1405
    if delay > 0 and delay ~= math.huge then
2,189,241✔
1406
      timeout_register[co] = timerwheel:set(delay, callback, co)
3,835✔
1407
    elseif delay == 0 or delay == math.huge then
2,185,406✔
1408
      timeout_register[co] = nil
2,185,406✔
1409
    else
1410
      error("timout value must be greater than or equal to 0, got: "..tostring(delay))
×
1411
    end
1412

1413
    return true
2,189,241✔
1414
  end
1415

1416
end
1417

1418

1419
-------------------------------------------------------------------------------
1420
-- main tasks: manage readable and writable socket sets
1421
-------------------------------------------------------------------------------
1422
-- a task is an object with a required method `step()` that deals with a
1423
-- single step for that task.
1424

1425
local _tasks = {} do
122✔
1426
  function _tasks:add(tsk)
122✔
1427
    _tasks[#_tasks + 1] = tsk
488✔
1428
  end
1429
end
1430

1431

1432
-- a task to check ready to read events
1433
local _readable_task = {} do
122✔
1434

1435
  _readable_task._events = {}
122✔
1436

1437
  local function tick(skt)
1438
    local handler = _servers[skt]
496✔
1439
    if handler then
496✔
1440
      _accept(skt, handler)
80✔
1441
    else
1442
      _reading:remove(skt)
416✔
1443
      _doTick(_reading:pop(skt), skt)
416✔
1444
    end
1445
  end
1446

1447
  function _readable_task:step()
122✔
1448
    for _, skt in ipairs(self._events) do
123,496✔
1449
      tick(skt)
496✔
1450
    end
1451
  end
1452

1453
  _tasks:add(_readable_task)
122✔
1454
end
1455

1456

1457
-- a task to check ready to write events
1458
local _writable_task = {} do
122✔
1459

1460
  _writable_task._events = {}
122✔
1461

1462
  local function tick(skt)
1463
    _writing:remove(skt)
188✔
1464
    _doTick(_writing:pop(skt), skt)
188✔
1465
  end
1466

1467
  function _writable_task:step()
122✔
1468
    for _, skt in ipairs(self._events) do
123,188✔
1469
      tick(skt)
188✔
1470
    end
1471
  end
1472

1473
  _tasks:add(_writable_task)
122✔
1474
end
1475

1476

1477

1478
-- sleeping threads task
1479
local _sleeping_task = {} do
122✔
1480

1481
  function _sleeping_task:step()
122✔
1482
    local now = gettime()
123,000✔
1483

1484
    local co = _sleeping:pop(now)
123,000✔
1485
    while co do
127,599✔
1486
      -- we're pushing them to _resumable, since that list will be replaced before
1487
      -- executing. This prevents tasks running twice in a row with pause(0) for example.
1488
      -- So here we won't execute, but at _resumable step which is next
1489
      _resumable:push(co)
4,599✔
1490
      co = _sleeping:pop(now)
4,599✔
1491
    end
1492
  end
1493

1494
  _tasks:add(_sleeping_task)
122✔
1495
end
1496

1497

1498

1499
-- resumable threads task
1500
local _resumable_task = {} do
122✔
1501

1502
  function _resumable_task:step()
122✔
1503
    -- replace the resume list before iterating, so items placed in there
1504
    -- will indeed end up in the next copas step, not in this one, and not
1505
    -- create a loop
1506
    local resumelist = _resumable:clear_resumelist()
123,000✔
1507

1508
    for _, co in ipairs(resumelist) do
252,161✔
1509
      _doTick(co)
129,165✔
1510
    end
1511
  end
1512

1513
  _tasks:add(_resumable_task)
122✔
1514
end
1515

1516

1517
-------------------------------------------------------------------------------
1518
-- Checks for reads and writes on sockets
1519
-------------------------------------------------------------------------------
1520
local _select_plain do
122✔
1521

1522
  local last_cleansing = 0
122✔
1523
  local duration = function(t2, t1) return t2-t1 end
123,062✔
1524

1525
  if not socket then
122✔
1526
    -- socket module unavailable, switch to luasystem sleep
1527
    _select_plain = system.sleep
4✔
1528
  else
1529
    -- use socket.select to handle socket-io
1530
    _select_plain = function(timeout)
1531
      local err
1532
      local now = gettime()
122,940✔
1533

1534
      -- remove any closed sockets to prevent select from hanging on them
1535
      if _closed[1] then
122,940✔
1536
        for i, skt in ipairs(_closed) do
279✔
1537
          _closed[i] = { _reading:remove(skt), _writing:remove(skt) }
140✔
1538
        end
1539
      end
1540

1541
      _readable_task._events, _writable_task._events, err = socket.select(_reading, _writing, timeout)
122,940✔
1542
      local r_events, w_events = _readable_task._events, _writable_task._events
122,940✔
1543

1544
      -- inject closed sockets in readable/writeable task so they can error out properly
1545
      if _closed[1] then
122,940✔
1546
        for i, skts in ipairs(_closed) do
279✔
1547
          _closed[i] = nil
140✔
1548
          r_events[#r_events+1] = skts[1]
140✔
1549
          w_events[#w_events+1] = skts[2]
140✔
1550
        end
1551
      end
1552

1553
      if duration(now, last_cleansing) > WATCH_DOG_TIMEOUT then
122,940✔
1554
        last_cleansing = now
114✔
1555

1556
        -- Check all sockets selected for reading, and check how long they have been waiting
1557
        -- for data already, without select returning them as readable
1558
        for skt,time in pairs(_reading_log) do
114✔
1559
          if not r_events[skt] and duration(now, time) > WATCH_DOG_TIMEOUT then
×
1560
            -- This one timedout while waiting to become readable, so move
1561
            -- it in the readable list and try and read anyway, despite not
1562
            -- having been returned by select
1563
            _reading_log[skt] = nil
×
1564
            r_events[#r_events + 1] = skt
×
1565
            r_events[skt] = #r_events
×
1566
          end
1567
        end
1568

1569
        -- Do the same for writing
1570
        for skt,time in pairs(_writing_log) do
114✔
1571
          if not w_events[skt] and duration(now, time) > WATCH_DOG_TIMEOUT then
×
1572
            _writing_log[skt] = nil
×
1573
            w_events[#w_events + 1] = skt
×
1574
            w_events[skt] = #w_events
×
1575
          end
1576
        end
1577
      end
1578

1579
      if err == "timeout" and #r_events + #w_events > 0 then
122,940✔
1580
        return nil
4✔
1581
      else
1582
        return err
122,936✔
1583
      end
1584
    end
1585
  end
1586
end
1587

1588

1589

1590
-------------------------------------------------------------------------------
1591
-- Dispatcher loop step.
1592
-- Listen to client requests and handles them
1593
-- Returns false if no socket-data was handled, or true if there was data
1594
-- handled (or nil + error message)
1595
-------------------------------------------------------------------------------
1596

1597
local copas_stats
1598
local min_ever, max_ever
1599

1600
local _select = _select_plain
122✔
1601

1602
-- instrumented version of _select() to collect stats
1603
local _select_instrumented = function(timeout)
1604
  if copas_stats then
×
1605
    local step_duration = gettime() - copas_stats.step_start
×
1606
    copas_stats.duration_max = math.max(copas_stats.duration_max, step_duration)
×
1607
    copas_stats.duration_min = math.min(copas_stats.duration_min, step_duration)
×
1608
    copas_stats.duration_tot = copas_stats.duration_tot + step_duration
×
1609
    copas_stats.steps = copas_stats.steps + 1
×
1610
  else
1611
    copas_stats = {
×
1612
      duration_max = -1,
1613
      duration_min = 999999,
1614
      duration_tot = 0,
1615
      steps = 0,
1616
    }
1617
  end
1618

1619
  local err = _select_plain(timeout)
×
1620

1621
  local now = gettime()
×
1622
  copas_stats.time_start = copas_stats.time_start or now
×
1623
  copas_stats.step_start = now
×
1624

1625
  return err
×
1626
end
1627

1628

1629
function copas.step(timeout)
122✔
1630
  -- Need to wake up the select call in time for the next sleeping event
1631
  if not _resumable:done() then
123,000✔
1632
    timeout = 0
119,183✔
1633
  else
1634
    timeout = math.min(_sleeping:getnext(), timeout or math.huge)
3,817✔
1635
  end
1636

1637
  local err = _select(timeout)
123,000✔
1638

1639
  for _, tsk in ipairs(_tasks) do
614,996✔
1640
    tsk:step()
492,000✔
1641
  end
1642

1643
  if err then
122,996✔
1644
    if err == "timeout" then
122,360✔
1645
      if timeout + 0.01 > TIMEOUT_PRECISION and math.random(100) > 90 then
122,300✔
1646
        -- we were idle, so occasionally do a GC sweep to ensure lingering
1647
        -- sockets are closed, and we don't accidentally block the loop from
1648
        -- exiting
1649
        collectgarbage()
291✔
1650
      end
1651
      return false
122,300✔
1652
    end
1653
    return nil, err
60✔
1654
  end
1655

1656
  return true
636✔
1657
end
1658

1659

1660
-------------------------------------------------------------------------------
1661
-- Check whether there is something to do.
1662
-- returns false if there are no sockets for read/write nor tasks scheduled
1663
-- (which means Copas is in an empty spin)
1664
-------------------------------------------------------------------------------
1665
function copas.finished()
122✔
1666
  return #_reading == 0 and #_writing == 0 and _resumable:done() and _sleeping:done(copas.gettimeouts())
122,996✔
1667
end
1668

1669

1670
local resetexit do
122✔
1671
  local exit_semaphore, exiting
1672

1673
  function resetexit()
93✔
1674
    exit_semaphore = copas.semaphore.new(1, 0, math.huge)
198✔
1675
    exiting = false
198✔
1676
  end
1677

1678
  -- Signals tasks to exit. But only if they check for it. By calling `copas.exiting`
1679
  -- they can check if they should exit. Or by calling `copas.waitforexit` they can
1680
  -- wait until the exit signal is given.
1681
  function copas.exit()
122✔
1682
    if exiting then return end
194✔
1683
    exiting = true
194✔
1684
    exit_semaphore:destroy()
194✔
1685
  end
1686

1687
  -- returns whether Copas is in the process of exiting. Exit can be started by
1688
  -- calling `copas.exit()`.
1689
  function copas.exiting()
122✔
1690
    return exiting
384✔
1691
  end
1692

1693
  -- Pauses the current coroutine until Copas is exiting. To be used as an exit
1694
  -- signal for tasks that need to clean up before exiting.
1695
  function copas.waitforexit()
122✔
1696
    exit_semaphore:take(1)
8✔
1697
  end
1698
end
1699

1700

1701
local _getstats do
122✔
1702
  local _getstats_instrumented, _getstats_plain
1703

1704

1705
  function _getstats_plain(enable)
93✔
1706
    -- this function gets hit if turned off, so turn on if true
1707
    if enable == true then
×
1708
      _select = _select_instrumented
×
1709
      _getstats = _getstats_instrumented
×
1710
      -- reset stats
1711
      min_ever = nil
×
1712
      max_ever = nil
×
1713
      copas_stats = nil
×
1714
    end
1715
    return {}
×
1716
  end
1717

1718

1719
  -- convert from seconds to millisecs, with microsec precision
1720
  local function useconds(t)
1721
    return math.floor((t * 1000000) + 0.5) / 1000
×
1722
  end
1723
  -- convert from seconds to seconds, with millisec precision
1724
  local function mseconds(t)
1725
    return math.floor((t * 1000) + 0.5) / 1000
×
1726
  end
1727

1728

1729
  function _getstats_instrumented(enable)
93✔
1730
    if enable == false then
×
1731
      _select = _select_plain
×
1732
      _getstats = _getstats_plain
×
1733
      -- instrumentation disabled, so switch to the plain implementation
1734
      return _getstats(enable)
×
1735
    end
1736
    if (not copas_stats) or (copas_stats.step == 0) then
×
1737
      return {}
×
1738
    end
1739
    local stats = copas_stats
×
1740
    copas_stats = nil
×
1741
    min_ever = math.min(min_ever or 9999999, stats.duration_min)
×
1742
    max_ever = math.max(max_ever or 0, stats.duration_max)
×
1743
    stats.duration_min_ever = min_ever
×
1744
    stats.duration_max_ever = max_ever
×
1745
    stats.duration_avg = stats.duration_tot / stats.steps
×
1746
    stats.step_start = nil
×
1747
    stats.time_end = gettime()
×
1748
    stats.time_tot = stats.time_end - stats.time_start
×
1749
    stats.time_avg = stats.time_tot / stats.steps
×
1750

1751
    stats.duration_avg = useconds(stats.duration_avg)
×
1752
    stats.duration_max = useconds(stats.duration_max)
×
1753
    stats.duration_max_ever = useconds(stats.duration_max_ever)
×
1754
    stats.duration_min = useconds(stats.duration_min)
×
1755
    stats.duration_min_ever = useconds(stats.duration_min_ever)
×
1756
    stats.duration_tot = useconds(stats.duration_tot)
×
1757
    stats.time_avg = useconds(stats.time_avg)
×
1758
    stats.time_start = mseconds(stats.time_start)
×
1759
    stats.time_end = mseconds(stats.time_end)
×
1760
    stats.time_tot = mseconds(stats.time_tot)
×
1761
    return stats
×
1762
  end
1763

1764
  _getstats = _getstats_plain
122✔
1765
end
1766

1767

1768
function copas.status(enable_stats)
122✔
1769
  local res = _getstats(enable_stats)
×
1770
  res.running = not not copas.running
×
1771
  res.timeout = copas.gettimeouts()
×
1772
  res.timer, res.inactive = _sleeping:status()
×
1773
  res.read = #_reading
×
1774
  res.write = #_writing
×
1775
  res.active = _resumable:count()
×
1776
  return res
×
1777
end
1778

1779

1780
-------------------------------------------------------------------------------
1781
-- Dispatcher endless loop.
1782
-- Listen to client requests and handles them forever
1783
-------------------------------------------------------------------------------
1784
function copas.loop(initializer, timeout)
122✔
1785
  if type(initializer) == "function" then
198✔
1786
    copas.addnamedthread("copas_initializer", initializer)
78✔
1787
  else
1788
    timeout = initializer or timeout
120✔
1789
  end
1790

1791
  resetexit()
198✔
1792
  copas.running = true
198✔
1793
  while true do
1794
    copas.step(timeout)
123,000✔
1795
    if copas.finished() then
122,996✔
1796
      if copas.exiting() then
384✔
1797
        break
47✔
1798
      end
1799
      copas.exit()
190✔
1800
    end
1801
  end
1802
  copas.running = false
194✔
1803
end
1804

1805

1806
-------------------------------------------------------------------------------
1807
-- Naming sockets and coroutines.
1808
-------------------------------------------------------------------------------
1809
do
1810
  local function realsocket(skt)
1811
    local mt = getmetatable(skt)
60✔
1812
    if mt == _skt_mt_tcp or mt == _skt_mt_udp then
60✔
1813
      return skt.socket
60✔
1814
    else
1815
      return skt
×
1816
    end
1817
  end
1818

1819

1820
  function copas.setsocketname(name, skt)
122✔
1821
    assert(type(name) == "string", "expected arg #1 to be a string")
60✔
1822
    skt = assert(realsocket(skt), "expected arg #2 to be a socket")
60✔
1823
    object_names[skt] = name
60✔
1824
  end
1825

1826

1827
  function copas.getsocketname(skt)
122✔
1828
    skt = assert(realsocket(skt), "expected arg #1 to be a socket")
×
1829
    return object_names[skt]
×
1830
  end
1831
end
1832

1833

1834
function copas.setthreadname(name, coro)
122✔
1835
  assert(type(name) == "string", "expected arg #1 to be a string")
40✔
1836
  coro = coro or coroutine_running()
40✔
1837
  assert(type(coro) == "thread", "expected arg #2 to be a coroutine or nil")
40✔
1838
  object_names[coro] = name
40✔
1839
end
1840

1841

1842
function copas.getthreadname(coro)
122✔
1843
  coro = coro or coroutine_running()
26✔
1844
  assert(type(coro) == "thread", "expected arg #1 to be a coroutine or nil")
26✔
1845
  return object_names[coro]
26✔
1846
end
1847

1848
-------------------------------------------------------------------------------
1849
-- Debug functionality.
1850
-------------------------------------------------------------------------------
1851
do
1852
  copas.debug = {}
122✔
1853

1854
  local log_core    -- if truthy, the core-timer will also be logged
1855
  local debug_log   -- function used as logger
1856

1857

1858
  local debug_yield = function(skt, queue)
1859
    local name = object_names[coroutine_running()]
2,227✔
1860

1861
    if log_core or name ~= "copas_core_timer" then
2,227✔
1862
      if queue == _sleeping then
2,215✔
1863
        debug_log("yielding '", name, "' to SLEEP for ", skt," seconds")
2,199✔
1864

1865
      elseif queue == _writing then
16✔
1866
        debug_log("yielding '", name, "' to WRITE on '", object_names[skt], "'")
4✔
1867

1868
      elseif queue == _reading then
12✔
1869
        debug_log("yielding '", name, "' to READ on '", object_names[skt], "'")
12✔
1870

1871
      else
1872
        debug_log("thread '", name, "' yielding to unexpected queue; ", tostring(queue), " (", type(queue), ")", debug.traceback())
×
1873
      end
1874
    end
1875

1876
    return coroutine.yield(skt, queue)
2,227✔
1877
  end
1878

1879

1880
  local debug_resume = function(coro, skt, ...)
1881
    local name = object_names[coro]
2,235✔
1882

1883
    if skt then
2,235✔
1884
      debug_log("resuming '", name, "' for socket '", object_names[skt], "'")
16✔
1885
    else
1886
      if log_core or name ~= "copas_core_timer" then
2,219✔
1887
        debug_log("resuming '", name, "'")
2,207✔
1888
      end
1889
    end
1890
    return coroutine.resume(coro, skt, ...)
2,235✔
1891
  end
1892

1893

1894
  local debug_create = function(f)
1895
    local f_wrapped = function(...)
1896
      local results = pack(f(...))
8✔
1897
      debug_log("exiting '", object_names[coroutine_running()], "'")
8✔
1898
      return unpack(results)
8✔
1899
    end
1900

1901
    return coroutine.create(f_wrapped)
8✔
1902
  end
1903

1904

1905
  debug_log = fnil
122✔
1906

1907

1908
  -- enables debug output for all coroutine operations.
1909
  function copas.debug.start(logger, core)
244✔
1910
    log_core = core
4✔
1911
    debug_log = logger or print
4✔
1912
    coroutine_yield = debug_yield
4✔
1913
    coroutine_resume = debug_resume
4✔
1914
    coroutine_create = debug_create
4✔
1915
  end
1916

1917

1918
  -- disables debug output for coroutine operations.
1919
  function copas.debug.stop()
244✔
1920
    debug_log = fnil
×
1921
    coroutine_yield = coroutine.yield
×
1922
    coroutine_resume = coroutine.resume
×
1923
    coroutine_create = coroutine.create
×
1924
  end
1925

1926
  do
1927
    local call_id = 0
122✔
1928

1929
    -- Description table of socket functions for debug output.
1930
    -- each socket function name has TWO entries;
1931
    -- 'name_in' and 'name_out', each being an array of names/descriptions of respectively
1932
    -- input parameters and return values.
1933
    -- If either table has a 'callback' key, then that is a function that will be called
1934
    -- with the parameters/return-values for further inspection.
1935
    local args = {
122✔
1936
      settimeout_in = {
122✔
1937
        "socket ",
122✔
1938
        "seconds",
122✔
1939
        "mode   ",
1940
      },
122✔
1941
      settimeout_out = {
122✔
1942
        "success",
122✔
1943
        "error  ",
1944
      },
122✔
1945
      connect_in = {
122✔
1946
        "socket ",
122✔
1947
        "address",
122✔
1948
        "port   ",
1949
      },
122✔
1950
      connect_out = {
122✔
1951
        "success",
122✔
1952
        "error  ",
1953
      },
122✔
1954
      getfd_in = {
122✔
1955
        "socket ",
1956
        -- callback = function(...)
1957
        --   print(debug.traceback("called from:", 4))
1958
        -- end,
1959
      },
122✔
1960
      getfd_out = {
122✔
1961
        "fd",
1962
      },
122✔
1963
      send_in = {
122✔
1964
        "socket   ",
122✔
1965
        "data     ",
122✔
1966
        "idx-start",
122✔
1967
        "idx-end  ",
1968
      },
122✔
1969
      send_out = {
122✔
1970
        "last-idx-send    ",
122✔
1971
        "error            ",
122✔
1972
        "err-last-idx-send",
1973
      },
122✔
1974
      receive_in = {
122✔
1975
        "socket ",
122✔
1976
        "pattern",
122✔
1977
        "prefix ",
1978
      },
122✔
1979
      receive_out = {
122✔
1980
        "received    ",
122✔
1981
        "error       ",
122✔
1982
        "partial data",
1983
      },
122✔
1984
      dirty_in = {
122✔
1985
        "socket",
1986
        -- callback = function(...)
1987
        --   print(debug.traceback("called from:", 4))
1988
        -- end,
1989
      },
122✔
1990
      dirty_out = {
122✔
1991
        "data in read-buffer",
1992
      },
122✔
1993
      close_in = {
122✔
1994
        "socket",
1995
        -- callback = function(...)
1996
        --   print(debug.traceback("called from:", 4))
1997
        -- end,
1998
      },
122✔
1999
      close_out = {
122✔
2000
        "success",
122✔
2001
        "error",
2002
      },
122✔
2003
    }
2004
    local function print_call(func, msg, ...)
2005
      print(msg)
150✔
2006
      local arg = pack(...)
150✔
2007
      local desc = args[func] or {}
150✔
2008
      for i = 1, math.max(arg.n, #desc) do
348✔
2009
        local value = arg[i]
198✔
2010
        if type(value) == "string" then
198✔
2011
          local xvalue = value:sub(1,30)
12✔
2012
          if xvalue ~= value then
12✔
2013
            xvalue = xvalue .."(...truncated)"
×
2014
          end
2015
          print("\t"..(desc[i] or i)..": '"..tostring(xvalue).."' ("..type(value).." #"..#value..")")
12✔
2016
        else
2017
          print("\t"..(desc[i] or i)..": '"..tostring(value).."' ("..type(value)..")")
186✔
2018
        end
2019
      end
2020
      if desc.callback then
150✔
2021
        desc.callback(...)
×
2022
      end
2023
    end
2024

2025
    local debug_mt = {
122✔
2026
      __index = function(self, key)
2027
        local value = self.__original_socket[key]
75✔
2028
        if type(value) ~= "function" then
75✔
2029
          return value
×
2030
        end
2031
        return function(self2, ...)
2032
            local my_id = call_id + 1
75✔
2033
            call_id = my_id
75✔
2034
            local results
2035

2036
            if self2 ~= self then
75✔
2037
              -- there is no self
2038
              print_call(tostring(key).."_in", my_id .. "-calling '"..tostring(key) .. "' with; ", self, ...)
×
2039
              results = pack(value(self, ...))
×
2040
            else
2041
              print_call(tostring(key).."_in", my_id .. "-calling '" .. tostring(key) .. "' with; ", self.__original_socket, ...)
75✔
2042
              results = pack(value(self.__original_socket, ...))
75✔
2043
            end
2044
            print_call(tostring(key).."_out", my_id .. "-results '"..tostring(key) .. "' returned; ", unpack(results))
75✔
2045
            return unpack(results)
75✔
2046
          end
2047
      end,
2048
      __tostring = function(self)
2049
        return tostring(self.__original_socket)
16✔
2050
      end
2051
    }
2052

2053

2054
    -- wraps a socket (copas or luasocket) in a debug version printing all calls
2055
    -- and their parameters/return values. Extremely noisy!
2056
    -- returns the wrapped socket.
2057
    -- NOTE: only for plain sockets, will not support TLS
2058
    function copas.debug.socket(original_skt)
244✔
2059
      if (getmetatable(original_skt) == _skt_mt_tcp) or (getmetatable(original_skt) == _skt_mt_udp) then
4✔
2060
        -- already wrapped as Copas socket, so recurse with the original luasocket one
2061
        original_skt.socket = copas.debug.socket(original_skt.socket)
×
2062
        return original_skt
×
2063
      end
2064

2065
      local proxy = setmetatable({
8✔
2066
        __original_socket = original_skt
4✔
2067
      }, debug_mt)
4✔
2068

2069
      return proxy
4✔
2070
    end
2071
  end
2072
end
2073

2074

2075
return copas
122✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc