• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / go-tarantool / 3777042206

25 Dec 2022 06:48PM UTC coverage: 75.471% (+0.3%) from 75.215%
3777042206

push

github

Georgy Moiseev
feature: support graceful shutdown

89 of 89 new or added lines in 1 file covered. (100.0%)

4123 of 5463 relevant lines covered (75.47%)

3432.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.32
/connection.go
1
// Package with implementation of methods and structures for work with
2
// Tarantool instance.
3
package tarantool
4

5
import (
6
        "bufio"
7
        "bytes"
8
        "context"
9
        "encoding/binary"
10
        "errors"
11
        "fmt"
12
        "io"
13
        "log"
14
        "math"
15
        "net"
16
        "runtime"
17
        "strings"
18
        "sync"
19
        "sync/atomic"
20
        "time"
21
)
22

23
const requestsMap = 128
24
const ignoreStreamId = 0
25
const (
26
        connDisconnected = 0
27
        connConnected    = 1
28
        connShutdown     = 2
29
        connClosed       = 3
30
)
31

32
const (
33
        connTransportNone = ""
34
        connTransportSsl  = "ssl"
35
)
36

37
const shutdownEventKey = "box.shutdown"
38

39
type ConnEventKind int
40
type ConnLogKind int
41

42
const (
43
        // Connected signals that connection is established or reestablished.
44
        Connected ConnEventKind = iota + 1
45
        // Disconnected signals that connection is broken.
46
        Disconnected
47
        // ReconnectFailed signals that attempt to reconnect has failed.
48
        ReconnectFailed
49
        // Either reconnect attempts exhausted, or explicit Close is called.
50
        Closed
51
        // Shutdown signals that shutdown callback is processing.
52
        Shutdown
53

54
        // LogReconnectFailed is logged when reconnect attempt failed.
55
        LogReconnectFailed ConnLogKind = iota + 1
56
        // LogLastReconnectFailed is logged when last reconnect attempt failed,
57
        // connection will be closed after that.
58
        LogLastReconnectFailed
59
        // LogUnexpectedResultId is logged when response with unknown id was received.
60
        // Most probably it is due to request timeout.
61
        LogUnexpectedResultId
62
        // LogWatchEventReadFailed is logged when failed to read a watch event.
63
        LogWatchEventReadFailed
64
)
65

66
// ConnEvent is sent throw Notify channel specified in Opts.
67
type ConnEvent struct {
68
        Conn *Connection
69
        Kind ConnEventKind
70
        When time.Time
71
}
72

73
// A raw watch event.
74
type connWatchEvent struct {
75
        key   string
76
        value interface{}
77
}
78

79
var epoch = time.Now()
80

81
// Logger is logger type expected to be passed in options.
82
type Logger interface {
83
        Report(event ConnLogKind, conn *Connection, v ...interface{})
84
}
85

86
type defaultLogger struct{}
87

88
func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interface{}) {
103✔
89
        switch event {
103✔
90
        case LogReconnectFailed:
103✔
91
                reconnects := v[0].(uint)
103✔
92
                err := v[1].(error)
103✔
93
                log.Printf("tarantool: reconnect (%d/%d) to %s failed: %s", reconnects, conn.opts.MaxReconnects, conn.addr, err)
103✔
94
        case LogLastReconnectFailed:
×
95
                err := v[0].(error)
×
96
                log.Printf("tarantool: last reconnect to %s failed: %s, giving it up", conn.addr, err)
×
97
        case LogUnexpectedResultId:
×
98
                resp := v[0].(*Response)
×
99
                log.Printf("tarantool: connection %s got unexpected resultId (%d) in response", conn.addr, resp.RequestId)
×
100
        case LogWatchEventReadFailed:
×
101
                err := v[0].(error)
×
102
                log.Printf("tarantool: unable to parse watch event: %s", err)
×
103
        default:
×
104
                args := append([]interface{}{"tarantool: unexpected event ", event, conn}, v...)
×
105
                log.Print(args...)
×
106
        }
107
}
108

109
// Connection is a handle with a single connection to a Tarantool instance.
110
//
111
// It is created and configured with Connect function, and could not be
112
// reconfigured later.
113
//
114
// Connection could be in three possible states:
115
//
116
// - In "Connected" state it sends queries to Tarantool.
117
//
118
// - In "Disconnected" state it rejects queries with ClientError{Code:
119
// ErrConnectionNotReady}
120
//
121
// - In "Closed" state it rejects queries with ClientError{Code:
122
// ErrConnectionClosed}. Connection could become "Closed" when
123
// Connection.Close() method called, or when Tarantool disconnected and
124
// Reconnect pause is not specified or MaxReconnects is specified and
125
// MaxReconnect reconnect attempts already performed.
126
//
127
// You may perform data manipulation operation by calling its methods:
128
// Call*, Insert*, Replace*, Update*, Upsert*, Call*, Eval*.
129
//
130
// In any method that accepts space you my pass either space number or space
131
// name (in this case it will be looked up in schema). Same is true for index.
132
//
133
// ATTENTION: tuple, key, ops and args arguments for any method should be
134
// and array or should serialize to msgpack array.
135
//
136
// ATTENTION: result argument for *Typed methods should deserialize from
137
// msgpack array, cause Tarantool always returns result as an array.
138
// For all space related methods and Call16* (but not Call17*) methods Tarantool
139
// always returns array of array (array of tuples for space related methods).
140
// For Eval* and Call* Tarantool always returns array, but does not forces
141
// array of arrays.
142
//
143
// If connected to Tarantool 2.10 or newer, connection supports server graceful
144
// shutdown. In this case, server will wait until all client requests will be
145
// finished and client disconnects before going down (server also may go down
146
// by timeout). Client reconnect will happen if connection options enable
147
// reconnect. Beware that graceful shutdown event initialization is asynchronous.
148
//
149
// More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
150
type Connection struct {
151
        addr  string
152
        c     net.Conn
153
        mutex sync.Mutex
154
        cond  *sync.Cond
155
        // Schema contains schema loaded on connection.
156
        Schema *Schema
157
        // requestId contains the last request ID for requests with nil context.
158
        requestId uint32
159
        // contextRequestId contains the last request ID for requests with context.
160
        contextRequestId uint32
161
        // Greeting contains first message sent by Tarantool.
162
        Greeting *Greeting
163

164
        shard      []connShard
165
        dirtyShard chan uint32
166

167
        control chan struct{}
168
        rlimit  chan struct{}
169
        opts    Opts
170
        state   uint32
171
        dec     *decoder
172
        lenbuf  [PacketLengthBytes]byte
173

174
        lastStreamId uint64
175

176
        serverProtocolInfo ProtocolInfo
177
        // watchMap is a map of key -> chan watchState.
178
        watchMap sync.Map
179

180
        // shutdownWatcher is the "box.shutdown" event watcher.
181
        shutdownWatcher Watcher
182
        // requestCnt is a counter of active requests.
183
        requestCnt int64
184
}
185

186
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
187

188
type futureList struct {
189
        first *Future
190
        last  **Future
191
}
192

193
func (list *futureList) findFuture(reqid uint32, fetch bool) *Future {
1,290✔
194
        root := &list.first
1,290✔
195
        for {
2,580✔
196
                fut := *root
1,290✔
197
                if fut == nil {
1,290✔
198
                        return nil
×
199
                }
×
200
                if fut.requestId == reqid {
2,580✔
201
                        if fetch {
2,580✔
202
                                *root = fut.next
1,290✔
203
                                if fut.next == nil {
2,580✔
204
                                        list.last = root
1,290✔
205
                                } else {
1,290✔
206
                                        fut.next = nil
×
207
                                }
×
208
                        }
209
                        return fut
1,290✔
210
                }
211
                root = &fut.next
×
212
        }
213
}
214

215
func (list *futureList) addFuture(fut *Future) {
1,279✔
216
        *list.last = fut
1,279✔
217
        list.last = &fut.next
1,279✔
218
}
1,279✔
219

220
func (list *futureList) clear(err error, conn *Connection) {
1,052,672✔
221
        fut := list.first
1,052,672✔
222
        list.first = nil
1,052,672✔
223
        list.last = &list.first
1,052,672✔
224
        for fut != nil {
1,052,674✔
225
                fut.SetError(err)
2✔
226
                conn.markDone(fut)
2✔
227
                fut, fut.next = fut.next, nil
2✔
228
        }
2✔
229
}
230

231
type connShard struct {
232
        rmut            sync.Mutex
233
        requests        [requestsMap]futureList
234
        requestsWithCtx [requestsMap]futureList
235
        bufmut          sync.Mutex
236
        buf             smallWBuf
237
        enc             *encoder
238
}
239

240
// Greeting is a message sent by Tarantool on connect.
241
type Greeting struct {
242
        Version string
243
        auth    string
244
}
245

246
// Opts is a way to configure Connection
247
type Opts struct {
248
        // Timeout for response to a particular request. The timeout is reset when
249
        // push messages are received. If Timeout is zero, any request can be
250
        // blocked infinitely.
251
        // Also used to setup net.TCPConn.Set(Read|Write)Deadline.
252
        //
253
        // Pay attention, when using contexts with request objects,
254
        // the timeout option for Connection does not affect the lifetime
255
        // of the request. For those purposes use context.WithTimeout() as
256
        // the root context.
257
        Timeout time.Duration
258
        // Timeout between reconnect attempts. If Reconnect is zero, no
259
        // reconnect attempts will be made.
260
        // If specified, then when Tarantool is not reachable or disconnected,
261
        // new connect attempt is performed after pause.
262
        // By default, no reconnection attempts are performed,
263
        // so once disconnected, connection becomes Closed.
264
        Reconnect time.Duration
265
        // Maximum number of reconnect failures; after that we give it up to
266
        // on. If MaxReconnects is zero, the client will try to reconnect
267
        // endlessly.
268
        // After MaxReconnects attempts Connection becomes closed.
269
        MaxReconnects uint
270
        // Username for logging in to Tarantool.
271
        User string
272
        // User password for logging in to Tarantool.
273
        Pass string
274
        // RateLimit limits number of 'in-fly' request, i.e. already put into
275
        // requests queue, but not yet answered by server or timeouted.
276
        // It is disabled by default.
277
        // See RLimitAction for possible actions when RateLimit.reached.
278
        RateLimit uint
279
        // RLimitAction tells what to do when RateLimit reached:
280
        //   RLimitDrop - immediately abort request,
281
        //   RLimitWait - wait during timeout period for some request to be answered.
282
        //                If no request answered during timeout period, this request
283
        //                is aborted.
284
        //                If no timeout period is set, it will wait forever.
285
        // It is required if RateLimit is specified.
286
        RLimitAction uint
287
        // Concurrency is amount of separate mutexes for request
288
        // queues and buffers inside of connection.
289
        // It is rounded up to nearest power of 2.
290
        // By default it is runtime.GOMAXPROCS(-1) * 4
291
        Concurrency uint32
292
        // SkipSchema disables schema loading. Without disabling schema loading,
293
        // there is no way to create Connection for currently not accessible Tarantool.
294
        SkipSchema bool
295
        // Notify is a channel which receives notifications about Connection status
296
        // changes.
297
        Notify chan<- ConnEvent
298
        // Handle is user specified value, that could be retrivied with
299
        // Handle() method.
300
        Handle interface{}
301
        // Logger is user specified logger used for error messages.
302
        Logger Logger
303
        // Transport is the connection type, by default the connection is unencrypted.
304
        Transport string
305
        // SslOpts is used only if the Transport == 'ssl' is set.
306
        Ssl SslOpts
307
        // RequiredProtocolInfo contains minimal protocol version and
308
        // list of protocol features that should be supported by
309
        // Tarantool server. By default there are no restrictions.
310
        RequiredProtocolInfo ProtocolInfo
311
}
312

313
// SslOpts is a way to configure ssl transport.
314
type SslOpts struct {
315
        // KeyFile is a path to a private SSL key file.
316
        KeyFile string
317
        // CertFile is a path to an SSL certificate file.
318
        CertFile string
319
        // CaFile is a path to a trusted certificate authorities (CA) file.
320
        CaFile string
321
        // Ciphers is a colon-separated (:) list of SSL cipher suites the connection
322
        // can use.
323
        //
324
        // We don't provide a list of supported ciphers. This is what OpenSSL
325
        // does. The only limitation is usage of TLSv1.2 (because other protocol
326
        // versions don't seem to support the GOST cipher). To add additional
327
        // ciphers (GOST cipher), you must configure OpenSSL.
328
        //
329
        // See also
330
        //
331
        // * https://www.openssl.org/docs/man1.1.1/man1/ciphers.html
332
        Ciphers string
333
}
334

335
// Clone returns a copy of the Opts object.
336
// Any changes in copy RequiredProtocolInfo will not affect the original
337
// RequiredProtocolInfo value.
338
func (opts Opts) Clone() Opts {
255✔
339
        optsCopy := opts
255✔
340
        optsCopy.RequiredProtocolInfo = opts.RequiredProtocolInfo.Clone()
255✔
341

255✔
342
        return optsCopy
255✔
343
}
255✔
344

345
// Connect creates and configures a new Connection.
346
//
347
// Address could be specified in following ways:
348
//
349
// - TCP connections (tcp://192.168.1.1:3013, tcp://my.host:3013,
350
// tcp:192.168.1.1:3013, tcp:my.host:3013, 192.168.1.1:3013, my.host:3013)
351
//
352
// - Unix socket, first '/' or '.' indicates Unix socket
353
// (unix:///abs/path/tnt.sock, unix:path/tnt.sock, /abs/path/tnt.sock,
354
// ./rel/path/tnt.sock, unix/:path/tnt.sock)
355
//
356
// Notes:
357
//
358
// - If opts.Reconnect is zero (default), then connection either already connected
359
// or error is returned.
360
//
361
// - If opts.Reconnect is non-zero, then error will be returned only if authorization
362
// fails. But if Tarantool is not reachable, then it will make an attempt to reconnect later
363
// and will not finish to make attempts on authorization failures.
364
func Connect(addr string, opts Opts) (conn *Connection, err error) {
242✔
365
        conn = &Connection{
242✔
366
                addr:             addr,
242✔
367
                requestId:        0,
242✔
368
                contextRequestId: 1,
242✔
369
                Greeting:         &Greeting{},
242✔
370
                control:          make(chan struct{}),
242✔
371
                opts:             opts.Clone(),
242✔
372
                dec:              newDecoder(&smallBuf{}),
242✔
373
        }
242✔
374
        maxprocs := uint32(runtime.GOMAXPROCS(-1))
242✔
375
        if conn.opts.Concurrency == 0 || conn.opts.Concurrency > maxprocs*128 {
483✔
376
                conn.opts.Concurrency = maxprocs * 4
241✔
377
        }
241✔
378
        if c := conn.opts.Concurrency; c&(c-1) != 0 {
242✔
379
                for i := uint(1); i < 32; i *= 2 {
×
380
                        c |= c >> i
×
381
                }
×
382
                conn.opts.Concurrency = c + 1
×
383
        }
384
        conn.dirtyShard = make(chan uint32, conn.opts.Concurrency*2)
242✔
385
        conn.shard = make([]connShard, conn.opts.Concurrency)
242✔
386
        for i := range conn.shard {
2,202✔
387
                shard := &conn.shard[i]
1,960✔
388
                requestsLists := []*[requestsMap]futureList{&shard.requests, &shard.requestsWithCtx}
1,960✔
389
                for _, requests := range requestsLists {
5,880✔
390
                        for j := range requests {
505,680✔
391
                                requests[j].last = &requests[j].first
501,760✔
392
                        }
501,760✔
393
                }
394
        }
395

396
        if conn.opts.RateLimit > 0 {
242✔
397
                conn.rlimit = make(chan struct{}, conn.opts.RateLimit)
×
398
                if conn.opts.RLimitAction != RLimitDrop && conn.opts.RLimitAction != RLimitWait {
×
399
                        return nil, errors.New("RLimitAction should be specified to RLimitDone nor RLimitWait")
×
400
                }
×
401
        }
402

403
        if conn.opts.Logger == nil {
484✔
404
                conn.opts.Logger = defaultLogger{}
242✔
405
        }
242✔
406

407
        conn.cond = sync.NewCond(&conn.mutex)
242✔
408

242✔
409
        if err = conn.createConnection(false); err != nil {
282✔
410
                ter, ok := err.(Error)
40✔
411
                if conn.opts.Reconnect <= 0 {
80✔
412
                        return nil, err
40✔
413
                } else if ok && (ter.Code == ErrNoSuchUser ||
40✔
414
                        ter.Code == ErrPasswordMismatch) {
×
415
                        // Reported auth errors immediately.
×
416
                        return nil, err
×
417
                } else {
×
418
                        // Without SkipSchema it is useless.
×
419
                        go func(conn *Connection) {
×
420
                                conn.mutex.Lock()
×
421
                                defer conn.mutex.Unlock()
×
422
                                if err := conn.createConnection(true); err != nil {
×
423
                                        conn.closeConnection(err, true)
×
424
                                }
×
425
                        }(conn)
426
                        err = nil
×
427
                }
428
        }
429

430
        go conn.pinger()
202✔
431
        if conn.opts.Timeout > 0 {
404✔
432
                go conn.timeouts()
202✔
433
        }
202✔
434

435
        // TODO: reload schema after reconnect.
436
        if !conn.opts.SkipSchema {
385✔
437
                if err = conn.loadSchema(); err != nil {
183✔
438
                        conn.mutex.Lock()
×
439
                        defer conn.mutex.Unlock()
×
440
                        conn.closeConnection(err, true)
×
441
                        return nil, err
×
442
                }
×
443
        }
444

445
        return conn, err
202✔
446
}
447

448
// ConnectedNow reports if connection is established at the moment.
449
func (conn *Connection) ConnectedNow() bool {
6✔
450
        return atomic.LoadUint32(&conn.state) == connConnected
6✔
451
}
6✔
452

453
// ClosedNow reports if connection is closed by user or after reconnect.
454
func (conn *Connection) ClosedNow() bool {
211✔
455
        return atomic.LoadUint32(&conn.state) == connClosed
211✔
456
}
211✔
457

458
// Close closes Connection.
459
// After this method called, there is no way to reopen this Connection.
460
func (conn *Connection) Close() error {
204✔
461
        err := ClientError{ErrConnectionClosed, "connection closed by client"}
204✔
462
        conn.mutex.Lock()
204✔
463
        defer conn.mutex.Unlock()
204✔
464
        return conn.closeConnection(err, true)
204✔
465
}
204✔
466

467
// Addr returns a configured address of Tarantool socket.
468
func (conn *Connection) Addr() string {
×
469
        return conn.addr
×
470
}
×
471

472
// RemoteAddr returns an address of Tarantool socket.
473
func (conn *Connection) RemoteAddr() string {
×
474
        conn.mutex.Lock()
×
475
        defer conn.mutex.Unlock()
×
476
        if conn.c == nil {
×
477
                return ""
×
478
        }
×
479
        return conn.c.RemoteAddr().String()
×
480
}
481

482
// LocalAddr returns an address of outgoing socket.
483
func (conn *Connection) LocalAddr() string {
×
484
        conn.mutex.Lock()
×
485
        defer conn.mutex.Unlock()
×
486
        if conn.c == nil {
×
487
                return ""
×
488
        }
×
489
        return conn.c.LocalAddr().String()
×
490
}
491

492
// Handle returns a user-specified handle from Opts.
493
func (conn *Connection) Handle() interface{} {
×
494
        return conn.opts.Handle
×
495
}
×
496

497
func (conn *Connection) cancelFuture(fut *Future, err error) {
2✔
498
        if fut = conn.fetchFuture(fut.requestId); fut != nil {
4✔
499
                fut.SetError(err)
2✔
500
                conn.markDone(fut)
2✔
501
        }
2✔
502
}
503

504
func (conn *Connection) dial() (err error) {
347✔
505
        var connection net.Conn
347✔
506
        network := "tcp"
347✔
507
        opts := conn.opts
347✔
508
        address := conn.addr
347✔
509
        timeout := opts.Reconnect / 2
347✔
510
        transport := opts.Transport
347✔
511
        if timeout == 0 {
480✔
512
                timeout = 500 * time.Millisecond
133✔
513
        } else if timeout > 5*time.Second {
347✔
514
                timeout = 5 * time.Second
×
515
        }
×
516
        // Unix socket connection
517
        addrLen := len(address)
347✔
518
        if addrLen > 0 && (address[0] == '.' || address[0] == '/') {
347✔
519
                network = "unix"
×
520
        } else if addrLen >= 7 && address[0:7] == "unix://" {
347✔
521
                network = "unix"
×
522
                address = address[7:]
×
523
        } else if addrLen >= 5 && address[0:5] == "unix:" {
347✔
524
                network = "unix"
×
525
                address = address[5:]
×
526
        } else if addrLen >= 6 && address[0:6] == "unix/:" {
347✔
527
                network = "unix"
×
528
                address = address[6:]
×
529
        } else if addrLen >= 6 && address[0:6] == "tcp://" {
347✔
530
                address = address[6:]
×
531
        } else if addrLen >= 4 && address[0:4] == "tcp:" {
347✔
532
                address = address[4:]
×
533
        }
×
534
        if transport == connTransportNone {
647✔
535
                connection, err = net.DialTimeout(network, address, timeout)
300✔
536
        } else if transport == connTransportSsl {
394✔
537
                connection, err = sslDialTimeout(network, address, timeout, opts.Ssl)
47✔
538
        } else {
47✔
539
                err = errors.New("An unsupported transport type: " + transport)
×
540
        }
×
541
        if err != nil {
490✔
542
                return
143✔
543
        }
143✔
544
        dc := &DeadlineIO{to: opts.Timeout, c: connection}
204✔
545
        r := bufio.NewReaderSize(dc, 128*1024)
204✔
546
        w := bufio.NewWriterSize(dc, 128*1024)
204✔
547
        greeting := make([]byte, 128)
204✔
548
        _, err = io.ReadFull(r, greeting)
204✔
549
        if err != nil {
204✔
550
                connection.Close()
×
551
                return
×
552
        }
×
553
        conn.Greeting.Version = bytes.NewBuffer(greeting[:64]).String()
204✔
554
        conn.Greeting.auth = bytes.NewBuffer(greeting[64:108]).String()
204✔
555

204✔
556
        // IPROTO_ID requests can be processed without authentication.
204✔
557
        // https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/requests/#iproto-id
204✔
558
        if err = conn.identify(w, r); err != nil {
204✔
559
                connection.Close()
×
560
                return err
×
561
        }
×
562

563
        if err = checkProtocolInfo(opts.RequiredProtocolInfo, conn.serverProtocolInfo); err != nil {
204✔
564
                connection.Close()
×
565
                return fmt.Errorf("identify: %w", err)
×
566
        }
×
567

568
        // Auth.
569
        if opts.User != "" {
408✔
570
                scr, err := scramble(conn.Greeting.auth, opts.Pass)
204✔
571
                if err != nil {
204✔
572
                        err = errors.New("auth: scrambling failure " + err.Error())
×
573
                        connection.Close()
×
574
                        return err
×
575
                }
×
576
                if err = conn.writeAuthRequest(w, scr); err != nil {
204✔
577
                        connection.Close()
×
578
                        return err
×
579
                }
×
580
                if err = conn.readAuthResponse(r); err != nil {
204✔
581
                        connection.Close()
×
582
                        return err
×
583
                }
×
584
        }
585

586
        // Watchers.
587
        conn.watchMap.Range(func(key, value interface{}) bool {
207✔
588
                st := value.(chan watchState)
3✔
589
                state := <-st
3✔
590
                if state.unready != nil {
3✔
591
                        return true
×
592
                }
×
593

594
                req := newWatchRequest(key.(string))
3✔
595
                if err = conn.writeRequest(w, req); err != nil {
3✔
596
                        st <- state
×
597
                        return false
×
598
                }
×
599
                state.ack = true
3✔
600

3✔
601
                st <- state
3✔
602
                return true
3✔
603
        })
604

605
        if err != nil {
204✔
606
                return fmt.Errorf("unable to register watch: %w", err)
×
607
        }
×
608

609
        // Only if connected and fully initialized.
610
        conn.lockShards()
204✔
611
        conn.c = connection
204✔
612
        atomic.StoreUint32(&conn.state, connConnected)
204✔
613
        conn.cond.Broadcast()
204✔
614
        conn.unlockShards()
204✔
615
        go conn.writer(w, connection)
204✔
616
        go conn.reader(r, connection)
204✔
617

204✔
618
        // Subscribe shutdown event to process graceful shutdown.
204✔
619
        if conn.shutdownWatcher == nil && conn.isFeatureInSlice(WatchersFeature, conn.serverProtocolInfo.Features) {
406✔
620
                watcher, werr := conn.newWatcherImpl(shutdownEventKey, shutdownEventCallback)
202✔
621
                if werr != nil {
202✔
622
                        return werr
×
623
                }
×
624
                conn.shutdownWatcher = watcher
202✔
625
        }
626

627
        return
204✔
628
}
629

630
func pack(h *smallWBuf, enc *encoder, reqid uint32,
631
        req Request, streamId uint64, res SchemaResolver) (err error) {
1,690✔
632
        const uint32Code = 0xce
1,690✔
633
        const uint64Code = 0xcf
1,690✔
634
        const streamBytesLenUint64 = 10
1,690✔
635
        const streamBytesLenUint32 = 6
1,690✔
636

1,690✔
637
        hl := h.Len()
1,690✔
638

1,690✔
639
        var streamBytesLen = 0
1,690✔
640
        var streamBytes [streamBytesLenUint64]byte
1,690✔
641
        hMapLen := byte(0x82) // 2 element map.
1,690✔
642
        if streamId != ignoreStreamId {
1,728✔
643
                hMapLen = byte(0x83) // 3 element map.
38✔
644
                streamBytes[0] = KeyStreamId
38✔
645
                if streamId > math.MaxUint32 {
40✔
646
                        streamBytesLen = streamBytesLenUint64
2✔
647
                        streamBytes[1] = uint64Code
2✔
648
                        binary.BigEndian.PutUint64(streamBytes[2:], streamId)
2✔
649
                } else {
38✔
650
                        streamBytesLen = streamBytesLenUint32
36✔
651
                        streamBytes[1] = uint32Code
36✔
652
                        binary.BigEndian.PutUint32(streamBytes[2:], uint32(streamId))
36✔
653
                }
36✔
654
        }
655

656
        hBytes := append([]byte{
1,690✔
657
                uint32Code, 0, 0, 0, 0, // Length.
1,690✔
658
                hMapLen,
1,690✔
659
                KeyCode, byte(req.Code()), // Request code.
1,690✔
660
                KeySync, uint32Code,
1,690✔
661
                byte(reqid >> 24), byte(reqid >> 16),
1,690✔
662
                byte(reqid >> 8), byte(reqid),
1,690✔
663
        }, streamBytes[:streamBytesLen]...)
1,690✔
664

1,690✔
665
        h.Write(hBytes)
1,690✔
666

1,690✔
667
        if err = req.Body(res, enc); err != nil {
1,690✔
668
                return
×
669
        }
×
670

671
        l := uint32(h.Len() - 5 - hl)
1,690✔
672
        h.b[hl+1] = byte(l >> 24)
1,690✔
673
        h.b[hl+2] = byte(l >> 16)
1,690✔
674
        h.b[hl+3] = byte(l >> 8)
1,690✔
675
        h.b[hl+4] = byte(l)
1,690✔
676

1,690✔
677
        return
1,690✔
678
}
679

680
func (conn *Connection) writeRequest(w *bufio.Writer, req Request) error {
411✔
681
        var packet smallWBuf
411✔
682
        err := pack(&packet, newEncoder(&packet), 0, req, ignoreStreamId, conn.Schema)
411✔
683

411✔
684
        if err != nil {
411✔
685
                return fmt.Errorf("pack error: %w", err)
×
686
        }
×
687
        if err = write(w, packet.b); err != nil {
411✔
688
                return fmt.Errorf("write error: %w", err)
×
689
        }
×
690
        if err = w.Flush(); err != nil {
411✔
691
                return fmt.Errorf("flush error: %w", err)
×
692
        }
×
693
        return err
411✔
694
}
695

696
func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) error {
204✔
697
        req := newAuthRequest(conn.opts.User, string(scramble))
204✔
698

204✔
699
        err := conn.writeRequest(w, req)
204✔
700
        if err != nil {
204✔
701
                return fmt.Errorf("auth: %w", err)
×
702
        }
×
703

704
        return nil
204✔
705
}
706

707
func (conn *Connection) writeIdRequest(w *bufio.Writer, protocolInfo ProtocolInfo) error {
204✔
708
        req := NewIdRequest(protocolInfo)
204✔
709

204✔
710
        err := conn.writeRequest(w, req)
204✔
711
        if err != nil {
204✔
712
                return fmt.Errorf("identify: %w", err)
×
713
        }
×
714

715
        return nil
204✔
716
}
717

718
func (conn *Connection) readResponse(r io.Reader) (Response, error) {
408✔
719
        respBytes, err := conn.read(r)
408✔
720
        if err != nil {
408✔
721
                return Response{}, fmt.Errorf("read error: %w", err)
×
722
        }
×
723

724
        resp := Response{buf: smallBuf{b: respBytes}}
408✔
725
        err = resp.decodeHeader(conn.dec)
408✔
726
        if err != nil {
408✔
727
                return resp, fmt.Errorf("decode response header error: %w", err)
×
728
        }
×
729
        err = resp.decodeBody()
408✔
730
        if err != nil {
408✔
731
                switch err.(type) {
×
732
                case Error:
×
733
                        return resp, err
×
734
                default:
×
735
                        return resp, fmt.Errorf("decode response body error: %w", err)
×
736
                }
737
        }
738
        return resp, nil
408✔
739
}
740

741
func (conn *Connection) readAuthResponse(r io.Reader) error {
204✔
742
        _, err := conn.readResponse(r)
204✔
743
        if err != nil {
204✔
744
                return fmt.Errorf("auth: %w", err)
×
745
        }
×
746

747
        return nil
204✔
748
}
749

750
func (conn *Connection) readIdResponse(r io.Reader) (Response, error) {
204✔
751
        resp, err := conn.readResponse(r)
204✔
752
        if err != nil {
204✔
753
                return resp, fmt.Errorf("identify: %w", err)
×
754
        }
×
755

756
        return resp, nil
204✔
757
}
758

759
func (conn *Connection) createConnection(reconnect bool) (err error) {
345✔
760
        var reconnects uint
345✔
761
        for conn.c == nil && conn.state == connDisconnected {
692✔
762
                now := time.Now()
347✔
763
                err = conn.dial()
347✔
764
                if err == nil || !reconnect {
591✔
765
                        if err == nil {
448✔
766
                                conn.notify(Connected)
204✔
767
                        }
204✔
768
                        return
244✔
769
                }
770
                if conn.opts.MaxReconnects > 0 && reconnects > conn.opts.MaxReconnects {
103✔
771
                        conn.opts.Logger.Report(LogLastReconnectFailed, conn, err)
×
772
                        err = ClientError{ErrConnectionClosed, "last reconnect failed"}
×
773
                        // mark connection as closed to avoid reopening by another goroutine
×
774
                        return
×
775
                }
×
776
                conn.opts.Logger.Report(LogReconnectFailed, conn, reconnects, err)
103✔
777
                conn.notify(ReconnectFailed)
103✔
778
                reconnects++
103✔
779
                conn.mutex.Unlock()
103✔
780
                time.Sleep(time.Until(now.Add(conn.opts.Reconnect)))
103✔
781
                conn.mutex.Lock()
103✔
782
        }
783
        if conn.state == connClosed {
202✔
784
                err = ClientError{ErrConnectionClosed, "using closed connection"}
101✔
785
        }
101✔
786
        return
101✔
787
}
788

789
func (conn *Connection) closeConnection(neterr error, forever bool) (err error) {
508✔
790
        conn.lockShards()
508✔
791
        defer conn.unlockShards()
508✔
792
        if forever {
913✔
793
                if conn.state != connClosed {
606✔
794
                        close(conn.control)
201✔
795
                        atomic.StoreUint32(&conn.state, connClosed)
201✔
796
                        conn.cond.Broadcast()
201✔
797
                        // Free the resources.
201✔
798
                        if conn.shutdownWatcher != nil {
402✔
799
                                go conn.shutdownWatcher.Unregister()
201✔
800
                                conn.shutdownWatcher = nil
201✔
801
                        }
201✔
802
                        conn.notify(Closed)
201✔
803
                }
804
        } else {
103✔
805
                atomic.StoreUint32(&conn.state, connDisconnected)
103✔
806
                conn.cond.Broadcast()
103✔
807
                conn.notify(Disconnected)
103✔
808
        }
103✔
809
        if conn.c != nil {
711✔
810
                err = conn.c.Close()
203✔
811
                conn.c = nil
203✔
812
        }
203✔
813
        for i := range conn.shard {
4,620✔
814
                conn.shard[i].buf.Reset()
4,112✔
815
                requestsLists := []*[requestsMap]futureList{&conn.shard[i].requests, &conn.shard[i].requestsWithCtx}
4,112✔
816
                for _, requests := range requestsLists {
12,336✔
817
                        for pos := range requests {
1,060,896✔
818
                                requests[pos].clear(neterr, conn)
1,052,672✔
819
                        }
1,052,672✔
820
                }
821
        }
822
        return
508✔
823
}
824

825
func (conn *Connection) reconnectImpl(neterr error, c net.Conn) {
360✔
826
        if conn.opts.Reconnect > 0 {
620✔
827
                if c == conn.c {
363✔
828
                        conn.closeConnection(neterr, false)
103✔
829
                        if err := conn.createConnection(true); err != nil {
204✔
830
                                conn.closeConnection(err, true)
101✔
831
                        }
101✔
832
                }
833
        } else {
100✔
834
                conn.closeConnection(neterr, true)
100✔
835
        }
100✔
836
}
837

838
func (conn *Connection) reconnect(neterr error, c net.Conn) {
259✔
839
        conn.mutex.Lock()
259✔
840
        defer conn.mutex.Unlock()
259✔
841
        conn.reconnectImpl(neterr, c)
259✔
842
        conn.cond.Broadcast()
259✔
843
}
259✔
844

845
func (conn *Connection) lockShards() {
712✔
846
        for i := range conn.shard {
6,480✔
847
                conn.shard[i].rmut.Lock()
5,768✔
848
                conn.shard[i].bufmut.Lock()
5,768✔
849
        }
5,768✔
850
}
851

852
func (conn *Connection) unlockShards() {
712✔
853
        for i := range conn.shard {
6,480✔
854
                conn.shard[i].rmut.Unlock()
5,768✔
855
                conn.shard[i].bufmut.Unlock()
5,768✔
856
        }
5,768✔
857
}
858

859
func (conn *Connection) pinger() {
202✔
860
        to := conn.opts.Timeout
202✔
861
        if to == 0 {
202✔
862
                to = 3 * time.Second
×
863
        }
×
864
        t := time.NewTicker(to / 3)
202✔
865
        defer t.Stop()
202✔
866
        for {
410✔
867
                select {
208✔
868
                case <-conn.control:
201✔
869
                        return
201✔
870
                case <-t.C:
6✔
871
                }
872
                conn.Ping()
6✔
873
        }
874
}
875

876
func (conn *Connection) notify(kind ConnEventKind) {
714✔
877
        if conn.opts.Notify != nil {
714✔
878
                select {
×
879
                case conn.opts.Notify <- ConnEvent{Kind: kind, Conn: conn, When: time.Now()}:
×
880
                default:
×
881
                }
882
        }
883
}
884

885
func (conn *Connection) writer(w *bufio.Writer, c net.Conn) {
204✔
886
        var shardn uint32
204✔
887
        var packet smallWBuf
204✔
888
        for atomic.LoadUint32(&conn.state) != connClosed {
1,677✔
889
                select {
1,473✔
890
                case shardn = <-conn.dirtyShard:
348✔
891
                default:
1,125✔
892
                        runtime.Gosched()
1,125✔
893
                        if len(conn.dirtyShard) == 0 {
2,120✔
894
                                if err := w.Flush(); err != nil {
1,051✔
895
                                        conn.reconnect(err, c)
56✔
896
                                        return
56✔
897
                                }
56✔
898
                        }
899
                        select {
1,069✔
900
                        case shardn = <-conn.dirtyShard:
931✔
901
                        case <-conn.control:
137✔
902
                                return
137✔
903
                        }
904
                }
905
                shard := &conn.shard[shardn]
1,279✔
906
                shard.bufmut.Lock()
1,279✔
907
                if conn.c != c {
1,289✔
908
                        conn.dirtyShard <- shardn
10✔
909
                        shard.bufmut.Unlock()
10✔
910
                        return
10✔
911
                }
10✔
912
                packet, shard.buf = shard.buf, packet
1,269✔
913
                shard.bufmut.Unlock()
1,269✔
914
                if packet.Len() == 0 {
1,269✔
915
                        continue
×
916
                }
917
                if err := write(w, packet.b); err != nil {
1,269✔
918
                        conn.reconnect(err, c)
×
919
                        return
×
920
                }
×
921
                packet.Reset()
1,269✔
922
        }
923
}
924

925
func readWatchEvent(reader io.Reader) (connWatchEvent, error) {
325✔
926
        keyExist := false
325✔
927
        event := connWatchEvent{}
325✔
928
        d := newDecoder(reader)
325✔
929

325✔
930
        l, err := d.DecodeMapLen()
325✔
931
        if err != nil {
325✔
932
                return event, err
×
933
        }
×
934

935
        for ; l > 0; l-- {
765✔
936
                cd, err := d.DecodeInt()
440✔
937
                if err != nil {
440✔
938
                        return event, err
×
939
                }
×
940

941
                switch cd {
440✔
942
                case KeyEvent:
325✔
943
                        if event.key, err = d.DecodeString(); err != nil {
325✔
944
                                return event, err
×
945
                        }
×
946
                        keyExist = true
325✔
947
                case KeyEventData:
115✔
948
                        if event.value, err = d.DecodeInterface(); err != nil {
115✔
949
                                return event, err
×
950
                        }
×
951
                default:
×
952
                        if err = d.Skip(); err != nil {
×
953
                                return event, err
×
954
                        }
×
955
                }
956
        }
957

958
        if !keyExist {
325✔
959
                return event, errors.New("watch event does not have a key")
×
960
        }
×
961

962
        return event, nil
325✔
963
}
964

965
func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
204✔
966
        events := make(chan connWatchEvent, 1024)
204✔
967
        defer close(events)
204✔
968

204✔
969
        go conn.eventer(events)
204✔
970

204✔
971
        for atomic.LoadUint32(&conn.state) != connClosed {
1,488✔
972
                respBytes, err := conn.read(r)
1,284✔
973
                if err != nil {
1,487✔
974
                        conn.reconnect(err, c)
203✔
975
                        return
203✔
976
                }
203✔
977
                resp := &Response{buf: smallBuf{b: respBytes}}
1,080✔
978
                err = resp.decodeHeader(conn.dec)
1,080✔
979
                if err != nil {
1,080✔
980
                        conn.reconnect(err, c)
×
981
                        return
×
982
                }
×
983

984
                var fut *Future = nil
1,080✔
985
                if resp.Code == EventCode {
1,405✔
986
                        if event, err := readWatchEvent(&resp.buf); err == nil {
650✔
987
                                events <- event
325✔
988
                        } else {
325✔
989
                                conn.opts.Logger.Report(LogWatchEventReadFailed, conn, err)
×
990
                        }
×
991
                        continue
325✔
992
                } else if resp.Code == PushCode {
768✔
993
                        if fut = conn.peekFuture(resp.RequestId); fut != nil {
26✔
994
                                fut.AppendPush(resp)
13✔
995
                        }
13✔
996
                } else {
742✔
997
                        if fut = conn.fetchFuture(resp.RequestId); fut != nil {
1,484✔
998
                                fut.SetResponse(resp)
742✔
999
                                conn.markDone(fut)
742✔
1000
                        }
742✔
1001
                }
1002

1003
                if fut == nil {
755✔
1004
                        conn.opts.Logger.Report(LogUnexpectedResultId, conn, resp)
×
1005
                }
×
1006
        }
1007
}
1008

1009
// eventer goroutine gets watch events and updates values for watchers.
1010
func (conn *Connection) eventer(events <-chan connWatchEvent) {
204✔
1011
        for event := range events {
529✔
1012
                if value, ok := conn.watchMap.Load(event.key); ok {
649✔
1013
                        st := value.(chan watchState)
324✔
1014
                        state := <-st
324✔
1015
                        state.value = event.value
324✔
1016
                        if state.version == math.MaxUint64 {
324✔
1017
                                state.version = initWatchEventVersion + 1
×
1018
                        } else {
324✔
1019
                                state.version += 1
324✔
1020
                        }
324✔
1021
                        state.ack = false
324✔
1022
                        if state.changed != nil {
648✔
1023
                                close(state.changed)
324✔
1024
                                state.changed = nil
324✔
1025
                        }
324✔
1026
                        st <- state
324✔
1027
                }
1028
                // It is possible to get IPROTO_EVENT after we already send
1029
                // IPROTO_UNWATCH due to processing on a Tarantool side or slow
1030
                // read from the network, so it looks like an expected behavior.
1031
        }
1032
}
1033

1034
func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
1,292✔
1035
        fut = NewFuture()
1,292✔
1036
        if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
1,292✔
1037
                select {
×
1038
                case conn.rlimit <- struct{}{}:
×
1039
                default:
×
1040
                        fut.err = ClientError{
×
1041
                                ErrRateLimited,
×
1042
                                "Request is rate limited on client",
×
1043
                        }
×
1044
                        fut.ready = nil
×
1045
                        fut.done = nil
×
1046
                        return
×
1047
                }
1048
        }
1049
        fut.requestId = conn.nextRequestId(ctx != nil)
1,292✔
1050
        shardn := fut.requestId & (conn.opts.Concurrency - 1)
1,292✔
1051
        shard := &conn.shard[shardn]
1,292✔
1052
        shard.rmut.Lock()
1,292✔
1053
        switch conn.state {
1,292✔
1054
        case connClosed:
10✔
1055
                fut.err = ClientError{
10✔
1056
                        ErrConnectionClosed,
10✔
1057
                        "using closed connection",
10✔
1058
                }
10✔
1059
                fut.ready = nil
10✔
1060
                fut.done = nil
10✔
1061
                shard.rmut.Unlock()
10✔
1062
                return
10✔
1063
        case connDisconnected:
×
1064
                fut.err = ClientError{
×
1065
                        ErrConnectionNotReady,
×
1066
                        "client connection is not ready",
×
1067
                }
×
1068
                fut.ready = nil
×
1069
                fut.done = nil
×
1070
                shard.rmut.Unlock()
×
1071
                return
×
1072
        case connShutdown:
3✔
1073
                fut.err = ClientError{
3✔
1074
                        ErrConnectionShutdown,
3✔
1075
                        "server shutdown in progress",
3✔
1076
                }
3✔
1077
                fut.ready = nil
3✔
1078
                fut.done = nil
3✔
1079
                shard.rmut.Unlock()
3✔
1080
                return
3✔
1081
        }
1082
        pos := (fut.requestId / conn.opts.Concurrency) & (requestsMap - 1)
1,279✔
1083
        if ctx != nil {
1,281✔
1084
                select {
2✔
1085
                case <-ctx.Done():
×
1086
                        fut.SetError(fmt.Errorf("context is done"))
×
1087
                        shard.rmut.Unlock()
×
1088
                        return
×
1089
                default:
2✔
1090
                }
1091
                shard.requestsWithCtx[pos].addFuture(fut)
2✔
1092
        } else {
1,277✔
1093
                shard.requests[pos].addFuture(fut)
1,277✔
1094
                if conn.opts.Timeout > 0 {
2,554✔
1095
                        fut.timeout = time.Since(epoch) + conn.opts.Timeout
1,277✔
1096
                }
1,277✔
1097
        }
1098
        shard.rmut.Unlock()
1,279✔
1099
        if conn.rlimit != nil && conn.opts.RLimitAction == RLimitWait {
1,279✔
1100
                select {
×
1101
                case conn.rlimit <- struct{}{}:
×
1102
                default:
×
1103
                        runtime.Gosched()
×
1104
                        select {
×
1105
                        case conn.rlimit <- struct{}{}:
×
1106
                        case <-fut.done:
×
1107
                                if fut.err == nil {
×
1108
                                        panic("fut.done is closed, but err is nil")
×
1109
                                }
1110
                        }
1111
                }
1112
        }
1113
        return
1,279✔
1114
}
1115

1116
// This method removes a future from the internal queue if the context
1117
// is "done" before the response is come. Such select logic is inspired
1118
// from this thread: https://groups.google.com/g/golang-dev/c/jX4oQEls3uk
1119
func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
2✔
1120
        select {
2✔
1121
        case <-fut.done:
×
1122
        default:
2✔
1123
                select {
2✔
1124
                case <-ctx.Done():
2✔
1125
                        conn.cancelFuture(fut, fmt.Errorf("context is done"))
2✔
1126
                default:
×
1127
                        select {
×
1128
                        case <-fut.done:
×
1129
                        case <-ctx.Done():
×
1130
                                conn.cancelFuture(fut, fmt.Errorf("context is done"))
×
1131
                        }
1132
                }
1133
        }
1134
}
1135

1136
func (conn *Connection) send(req Request, streamId uint64) *Future {
1,292✔
1137
        atomic.AddInt64(&conn.requestCnt, int64(1))
1,292✔
1138

1,292✔
1139
        fut := conn.newFuture(req.Ctx())
1,292✔
1140
        if fut.ready == nil {
1,305✔
1141
                atomic.AddInt64(&conn.requestCnt, int64(-1))
13✔
1142
                return fut
13✔
1143
        }
13✔
1144

1145
        if req.Ctx() != nil {
1,281✔
1146
                select {
2✔
1147
                case <-req.Ctx().Done():
×
1148
                        conn.cancelFuture(fut, fmt.Errorf("context is done"))
×
1149
                        // future here does not belong to any shard yet,
×
1150
                        // so cancelFuture don't call markDone.
×
1151
                        atomic.AddInt64(&conn.requestCnt, int64(-1))
×
1152
                        return fut
×
1153
                default:
2✔
1154
                }
1155
        }
1156

1157
        conn.putFuture(fut, req, streamId)
1,279✔
1158

1,279✔
1159
        if req.Ctx() != nil {
1,281✔
1160
                go conn.contextWatchdog(fut, req.Ctx())
2✔
1161
        }
2✔
1162

1163
        return fut
1,279✔
1164
}
1165

1166
func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
1,279✔
1167
        shardn := fut.requestId & (conn.opts.Concurrency - 1)
1,279✔
1168
        shard := &conn.shard[shardn]
1,279✔
1169
        shard.bufmut.Lock()
1,279✔
1170
        select {
1,279✔
1171
        case <-fut.done:
×
1172
                shard.bufmut.Unlock()
×
1173
                return
×
1174
        default:
1,279✔
1175
        }
1176
        firstWritten := shard.buf.Len() == 0
1,279✔
1177
        if shard.buf.Cap() == 0 {
2,224✔
1178
                shard.buf.b = make([]byte, 0, 128)
945✔
1179
                shard.enc = newEncoder(&shard.buf)
945✔
1180
        }
945✔
1181
        blen := shard.buf.Len()
1,279✔
1182
        reqid := fut.requestId
1,279✔
1183
        if err := pack(&shard.buf, shard.enc, reqid, req, streamId, conn.Schema); err != nil {
1,279✔
1184
                shard.buf.Trunc(blen)
×
1185
                shard.bufmut.Unlock()
×
1186
                if f := conn.fetchFuture(reqid); f == fut {
×
1187
                        fut.SetError(err)
×
1188
                        conn.markDone(fut)
×
1189
                } else if f != nil {
×
1190
                        /* in theory, it is possible. In practice, you have
×
1191
                         * to have race condition that lasts hours */
×
1192
                        panic("Unknown future")
×
1193
                } else {
×
1194
                        fut.wait()
×
1195
                        if fut.err == nil {
×
1196
                                panic("Future removed from queue without error")
×
1197
                        }
1198
                        if _, ok := fut.err.(ClientError); ok {
×
1199
                                // packing error is more important than connection
×
1200
                                // error, because it is indication of programmer's
×
1201
                                // mistake.
×
1202
                                fut.SetError(err)
×
1203
                        }
×
1204
                }
1205
                return
×
1206
        }
1207
        shard.bufmut.Unlock()
1,279✔
1208

1,279✔
1209
        if req.Async() {
1,812✔
1210
                if fut = conn.fetchFuture(reqid); fut != nil {
1,066✔
1211
                        resp := &Response{
533✔
1212
                                RequestId: reqid,
533✔
1213
                                Code:      OkCode,
533✔
1214
                        }
533✔
1215
                        fut.SetResponse(resp)
533✔
1216
                        conn.markDone(fut)
533✔
1217
                }
533✔
1218
        }
1219

1220
        if firstWritten {
2,558✔
1221
                conn.dirtyShard <- shardn
1,279✔
1222
        }
1,279✔
1223
}
1224

1225
func (conn *Connection) markDone(fut *Future) {
1,279✔
1226
        if conn.rlimit != nil {
1,279✔
1227
                <-conn.rlimit
×
1228
        }
×
1229

1230
        if atomic.AddInt64(&conn.requestCnt, int64(-1)) == 0 {
2,442✔
1231
                conn.cond.Broadcast()
1,163✔
1232
        }
1,163✔
1233
}
1234

1235
func (conn *Connection) peekFuture(reqid uint32) (fut *Future) {
13✔
1236
        shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
13✔
1237
        pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1)
13✔
1238
        shard.rmut.Lock()
13✔
1239
        defer shard.rmut.Unlock()
13✔
1240

13✔
1241
        if conn.opts.Timeout > 0 {
26✔
1242
                if fut = conn.getFutureImp(reqid, true); fut != nil {
26✔
1243
                        pair := &shard.requests[pos]
13✔
1244
                        *pair.last = fut
13✔
1245
                        pair.last = &fut.next
13✔
1246
                        fut.timeout = time.Since(epoch) + conn.opts.Timeout
13✔
1247
                }
13✔
1248
        } else {
×
1249
                fut = conn.getFutureImp(reqid, false)
×
1250
        }
×
1251

1252
        return fut
13✔
1253
}
1254

1255
func (conn *Connection) fetchFuture(reqid uint32) (fut *Future) {
1,277✔
1256
        shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
1,277✔
1257
        shard.rmut.Lock()
1,277✔
1258
        fut = conn.getFutureImp(reqid, true)
1,277✔
1259
        shard.rmut.Unlock()
1,277✔
1260
        return fut
1,277✔
1261
}
1,277✔
1262

1263
func (conn *Connection) getFutureImp(reqid uint32, fetch bool) *Future {
1,290✔
1264
        shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
1,290✔
1265
        pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1)
1,290✔
1266
        // futures with even requests id belong to requests list with nil context
1,290✔
1267
        if reqid%2 == 0 {
2,578✔
1268
                return shard.requests[pos].findFuture(reqid, fetch)
1,288✔
1269
        } else {
1,290✔
1270
                return shard.requestsWithCtx[pos].findFuture(reqid, fetch)
2✔
1271
        }
2✔
1272
}
1273

1274
func (conn *Connection) timeouts() {
202✔
1275
        timeout := conn.opts.Timeout
202✔
1276
        t := time.NewTimer(timeout)
202✔
1277
        for {
406✔
1278
                var nowepoch time.Duration
204✔
1279
                select {
204✔
1280
                case <-conn.control:
201✔
1281
                        t.Stop()
201✔
1282
                        return
201✔
1283
                case <-t.C:
2✔
1284
                }
1285
                minNext := time.Since(epoch) + timeout
2✔
1286
                for i := range conn.shard {
18✔
1287
                        nowepoch = time.Since(epoch)
16✔
1288
                        shard := &conn.shard[i]
16✔
1289
                        for pos := range shard.requests {
2,064✔
1290
                                shard.rmut.Lock()
2,048✔
1291
                                pair := &shard.requests[pos]
2,048✔
1292
                                for pair.first != nil && pair.first.timeout < nowepoch {
2,048✔
1293
                                        shard.bufmut.Lock()
×
1294
                                        fut := pair.first
×
1295
                                        pair.first = fut.next
×
1296
                                        if fut.next == nil {
×
1297
                                                pair.last = &pair.first
×
1298
                                        } else {
×
1299
                                                fut.next = nil
×
1300
                                        }
×
1301
                                        fut.SetError(ClientError{
×
1302
                                                Code: ErrTimeouted,
×
1303
                                                Msg:  fmt.Sprintf("client timeout for request %d", fut.requestId),
×
1304
                                        })
×
1305
                                        conn.markDone(fut)
×
1306
                                        shard.bufmut.Unlock()
×
1307
                                }
1308
                                if pair.first != nil && pair.first.timeout < minNext {
2,050✔
1309
                                        minNext = pair.first.timeout
2✔
1310
                                }
2✔
1311
                                shard.rmut.Unlock()
2,048✔
1312
                        }
1313
                }
1314
                nowepoch = time.Since(epoch)
2✔
1315
                if nowepoch+time.Microsecond < minNext {
4✔
1316
                        t.Reset(minNext - nowepoch)
2✔
1317
                } else {
2✔
1318
                        t.Reset(time.Microsecond)
×
1319
                }
×
1320
        }
1321
}
1322

1323
func write(w io.Writer, data []byte) (err error) {
1,680✔
1324
        l, err := w.Write(data)
1,680✔
1325
        if err != nil {
1,680✔
1326
                return
×
1327
        }
×
1328
        if l != len(data) {
1,680✔
1329
                panic("Wrong length writed")
×
1330
        }
1331
        return
1,680✔
1332
}
1333

1334
func (conn *Connection) read(r io.Reader) (response []byte, err error) {
1,692✔
1335
        var length int
1,692✔
1336

1,692✔
1337
        if _, err = io.ReadFull(r, conn.lenbuf[:]); err != nil {
1,895✔
1338
                return
203✔
1339
        }
203✔
1340
        if conn.lenbuf[0] != 0xce {
1,488✔
1341
                err = errors.New("Wrong response header")
×
1342
                return
×
1343
        }
×
1344
        length = (int(conn.lenbuf[1]) << 24) +
1,488✔
1345
                (int(conn.lenbuf[2]) << 16) +
1,488✔
1346
                (int(conn.lenbuf[3]) << 8) +
1,488✔
1347
                int(conn.lenbuf[4])
1,488✔
1348

1,488✔
1349
        if length == 0 {
1,488✔
1350
                err = errors.New("Response should not be 0 length")
×
1351
                return
×
1352
        }
×
1353
        response = make([]byte, length)
1,488✔
1354
        _, err = io.ReadFull(r, response)
1,488✔
1355

1,488✔
1356
        return
1,488✔
1357
}
1358

1359
func (conn *Connection) nextRequestId(context bool) (requestId uint32) {
1,292✔
1360
        if context {
1,294✔
1361
                return atomic.AddUint32(&conn.contextRequestId, 2)
2✔
1362
        } else {
1,292✔
1363
                return atomic.AddUint32(&conn.requestId, 2)
1,290✔
1364
        }
1,290✔
1365
}
1366

1367
// Do performs a request asynchronously on the connection.
1368
//
1369
// An error is returned if the request was formed incorrectly, or failed to
1370
// create the future.
1371
func (conn *Connection) Do(req Request) *Future {
1,257✔
1372
        if connectedReq, ok := req.(ConnectedRequest); ok {
1,262✔
1373
                if connectedReq.Conn() != conn {
6✔
1374
                        fut := NewFuture()
1✔
1375
                        fut.SetError(fmt.Errorf("the passed connected request doesn't belong to the current connection or connection pool"))
1✔
1376
                        return fut
1✔
1377
                }
1✔
1378
        }
1379
        if req.Ctx() != nil {
1,261✔
1380
                select {
5✔
1381
                case <-req.Ctx().Done():
3✔
1382
                        fut := NewFuture()
3✔
1383
                        fut.SetError(fmt.Errorf("context is done"))
3✔
1384
                        return fut
3✔
1385
                default:
2✔
1386
                }
1387
        }
1388
        return conn.send(req, ignoreStreamId)
1,253✔
1389
}
1390

1391
// ConfiguredTimeout returns a timeout from connection config.
1392
func (conn *Connection) ConfiguredTimeout() time.Duration {
×
1393
        return conn.opts.Timeout
×
1394
}
×
1395

1396
// OverrideSchema sets Schema for the connection.
1397
func (conn *Connection) OverrideSchema(s *Schema) {
×
1398
        if s != nil {
×
1399
                conn.mutex.Lock()
×
1400
                defer conn.mutex.Unlock()
×
1401
                conn.Schema = s
×
1402
        }
×
1403
}
1404

1405
// NewPrepared passes a sql statement to Tarantool for preparation synchronously.
1406
func (conn *Connection) NewPrepared(expr string) (*Prepared, error) {
1✔
1407
        req := NewPrepareRequest(expr)
1✔
1408
        resp, err := conn.Do(req).Get()
1✔
1409
        if err != nil {
1✔
1410
                return nil, err
×
1411
        }
×
1412
        return NewPreparedFromResponse(conn, resp)
1✔
1413
}
1414

1415
// NewStream creates new Stream object for connection.
1416
//
1417
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
1418
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
1419
// Since 1.7.0
1420
func (conn *Connection) NewStream() (*Stream, error) {
6✔
1421
        next := atomic.AddUint64(&conn.lastStreamId, 1)
6✔
1422
        return &Stream{
6✔
1423
                Id:   next,
6✔
1424
                Conn: conn,
6✔
1425
        }, nil
6✔
1426
}
6✔
1427

1428
// watchState is the current state of the watcher. See the idea at p. 70, 105:
1429
// https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view
1430
type watchState struct {
1431
        // value is a current value.
1432
        value interface{}
1433
        // version is a current version of the value. The only reason for uint64:
1434
        // go 1.13 has no math.Uint.
1435
        version uint64
1436
        // ack true if the acknowledge is already sent.
1437
        ack bool
1438
        // cnt is a count of active watchers for the key.
1439
        cnt int
1440
        // changed is a channel for broadcast the value changes.
1441
        changed chan struct{}
1442
        // unready channel exists if a state is not ready to work (subscription
1443
        // or unsubscription in progress).
1444
        unready chan struct{}
1445
}
1446

1447
// initWatchEventVersion is an initial version until no events from Tarantool.
1448
const initWatchEventVersion uint64 = 0
1449

1450
// connWatcher is an internal implementation of the Watcher interface.
1451
type connWatcher struct {
1452
        unregister sync.Once
1453
        // done is closed when the watcher is unregistered, but the watcher
1454
        // goroutine is not yet finished.
1455
        done chan struct{}
1456
        // finished is closed when the watcher is unregistered and the watcher
1457
        // goroutine is finished.
1458
        finished chan struct{}
1459
}
1460

1461
// Unregister unregisters the connection watcher.
1462
func (w *connWatcher) Unregister() {
2,264✔
1463
        w.unregister.Do(func() {
3,529✔
1464
                close(w.done)
1,265✔
1465
        })
1,265✔
1466
        <-w.finished
2,264✔
1467
}
1468

1469
// subscribeWatchChannel returns an existing one or a new watch state channel
1470
// for the key. It also increases a counter of active watchers for the channel.
1471
func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error) {
1,266✔
1472
        var st chan watchState
1,266✔
1473

1,266✔
1474
        for st == nil {
2,532✔
1475
                if val, ok := conn.watchMap.Load(key); !ok {
1,476✔
1476
                        st = make(chan watchState, 1)
210✔
1477
                        state := watchState{
210✔
1478
                                value:   nil,
210✔
1479
                                version: initWatchEventVersion,
210✔
1480
                                ack:     false,
210✔
1481
                                cnt:     0,
210✔
1482
                                changed: nil,
210✔
1483
                                unready: make(chan struct{}),
210✔
1484
                        }
210✔
1485
                        st <- state
210✔
1486

210✔
1487
                        if val, loaded := conn.watchMap.LoadOrStore(key, st); !loaded {
420✔
1488
                                if _, err := conn.Do(newWatchRequest(key)).Get(); err != nil {
210✔
1489
                                        conn.watchMap.Delete(key)
×
1490
                                        close(state.unready)
×
1491
                                        return nil, err
×
1492
                                }
×
1493
                                // It is a successful subsctiption to a watch events by itself.
1494
                                state = <-st
210✔
1495
                                state.cnt = 1
210✔
1496
                                close(state.unready)
210✔
1497
                                state.unready = nil
210✔
1498
                                st <- state
210✔
1499
                                continue
210✔
1500
                        } else {
×
1501
                                close(state.unready)
×
1502
                                close(st)
×
1503
                                st = val.(chan watchState)
×
1504
                        }
×
1505
                } else {
1,056✔
1506
                        st = val.(chan watchState)
1,056✔
1507
                }
1,056✔
1508

1509
                // It is an existing channel created outside. It may be in the
1510
                // unready state.
1511
                state := <-st
1,056✔
1512
                if state.unready == nil {
2,112✔
1513
                        state.cnt += 1
1,056✔
1514
                }
1,056✔
1515
                st <- state
1,056✔
1516

1,056✔
1517
                if state.unready != nil {
1,056✔
1518
                        // Wait for an update and retry.
×
1519
                        <-state.unready
×
1520
                        st = nil
×
1521
                }
×
1522
        }
1523

1524
        return st, nil
1,266✔
1525
}
1526

1527
func (conn *Connection) isFeatureInSlice(expected ProtocolFeature, actualSlice []ProtocolFeature) bool {
1,267✔
1528
        for _, actual := range actualSlice {
3,139✔
1529
                if expected == actual {
3,138✔
1530
                        return true
1,266✔
1531
                }
1,266✔
1532
        }
1533
        return false
1✔
1534
}
1535

1536
// NewWatcher creates a new Watcher object for the connection.
1537
//
1538
// You need to require WatchersFeature to use watchers, see examples for the
1539
// function.
1540
//
1541
// After watcher creation, the watcher callback is invoked for the first time.
1542
// In this case, the callback is triggered whether or not the key has already
1543
// been broadcast. All subsequent invocations are triggered with
1544
// box.broadcast() called on the remote host. If a watcher is subscribed for a
1545
// key that has not been broadcast yet, the callback is triggered only once,
1546
// after the registration of the watcher.
1547
//
1548
// The watcher callbacks are always invoked in a separate goroutine. A watcher
1549
// callback is never executed in parallel with itself, but they can be executed
1550
// in parallel to other watchers.
1551
//
1552
// If the key is updated while the watcher callback is running, the callback
1553
// will be invoked again with the latest value as soon as it returns.
1554
//
1555
// Watchers survive reconnection. All registered watchers are automatically
1556
// resubscribed when the connection is reestablished.
1557
//
1558
// Keep in mind that garbage collection of a watcher handle doesn’t lead to the
1559
// watcher’s destruction. In this case, the watcher remains registered. You
1560
// need to call Unregister() directly.
1561
//
1562
// Unregister() guarantees that there will be no the watcher's callback calls
1563
// after it, but Unregister() call from the callback leads to a deadlock.
1564
//
1565
// See:
1566
// https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_events/#box-watchers
1567
//
1568
// Since 1.10.0
1569
func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, error) {
1,065✔
1570
        // We need to check the feature because the IPROTO_WATCH request is
1,065✔
1571
        // asynchronous. We do not expect any response from a Tarantool instance
1,065✔
1572
        // That's why we can't just check the Tarantool response for an unsupported
1,065✔
1573
        // request error.
1,065✔
1574
        if !conn.isFeatureInSlice(WatchersFeature, conn.opts.RequiredProtocolInfo.Features) {
1,066✔
1575
                err := fmt.Errorf("the feature %s must be required by connection "+
1✔
1576
                        "options to create a watcher", WatchersFeature)
1✔
1577
                return nil, err
1✔
1578
        }
1✔
1579

1580
        return conn.newWatcherImpl(key, callback)
1,064✔
1581
}
1582

1583
func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watcher, error) {
1,266✔
1584
        st, err := subscribeWatchChannel(conn, key)
1,266✔
1585
        if err != nil {
1,266✔
1586
                return nil, err
×
1587
        }
×
1588

1589
        // Start the watcher goroutine.
1590
        done := make(chan struct{})
1,266✔
1591
        finished := make(chan struct{})
1,266✔
1592

1,266✔
1593
        go func() {
2,532✔
1594
                version := initWatchEventVersion
1,266✔
1595
                for {
3,383✔
1596
                        state := <-st
2,117✔
1597
                        if state.changed == nil {
2,651✔
1598
                                state.changed = make(chan struct{})
534✔
1599
                        }
534✔
1600
                        st <- state
2,117✔
1601

2,117✔
1602
                        if state.version != version {
3,552✔
1603
                                callback(WatchEvent{
1,435✔
1604
                                        Conn:  conn,
1,435✔
1605
                                        Key:   key,
1,435✔
1606
                                        Value: state.value,
1,435✔
1607
                                })
1,435✔
1608
                                version = state.version
1,435✔
1609

1,435✔
1610
                                // Do we need to acknowledge the notification?
1,435✔
1611
                                state = <-st
1,435✔
1612
                                sendAck := !state.ack && version == state.version
1,435✔
1613
                                if sendAck {
1,759✔
1614
                                        state.ack = true
324✔
1615
                                }
324✔
1616
                                st <- state
1,435✔
1617

1,435✔
1618
                                if sendAck {
1,759✔
1619
                                        conn.Do(newWatchRequest(key)).Get()
324✔
1620
                                        // We expect a reconnect and re-subscribe if it fails to
324✔
1621
                                        // send the watch request. So it looks ok do not check a
324✔
1622
                                        // result.
324✔
1623
                                }
324✔
1624
                        }
1625

1626
                        select {
2,117✔
1627
                        case <-done:
1,265✔
1628
                                state := <-st
1,265✔
1629
                                state.cnt -= 1
1,265✔
1630
                                if state.cnt == 0 {
1,474✔
1631
                                        state.unready = make(chan struct{})
209✔
1632
                                }
209✔
1633
                                st <- state
1,265✔
1634

1,265✔
1635
                                if state.cnt == 0 {
1,474✔
1636
                                        // The last one sends IPROTO_UNWATCH.
209✔
1637
                                        if !conn.ClosedNow() {
217✔
1638
                                                // conn.ClosedNow() check is a workaround for calling
8✔
1639
                                                // Unregister from connectionClose().
8✔
1640
                                                conn.Do(newUnwatchRequest(key)).Get()
8✔
1641
                                        }
8✔
1642
                                        conn.watchMap.Delete(key)
209✔
1643
                                        close(state.unready)
209✔
1644
                                }
1645

1646
                                close(finished)
1,265✔
1647
                                return
1,265✔
1648
                        case <-state.changed:
851✔
1649
                        }
1650
                }
1651
        }()
1652

1653
        return &connWatcher{
1,266✔
1654
                done:     done,
1,266✔
1655
                finished: finished,
1,266✔
1656
        }, nil
1,266✔
1657
}
1658

1659
// checkProtocolInfo checks that expected protocol version is
1660
// and protocol features are supported.
1661
func checkProtocolInfo(expected ProtocolInfo, actual ProtocolInfo) error {
204✔
1662
        var found bool
204✔
1663
        var missingFeatures []ProtocolFeature
204✔
1664

204✔
1665
        if expected.Version > actual.Version {
204✔
1666
                return fmt.Errorf("protocol version %d is not supported", expected.Version)
×
1667
        }
×
1668

1669
        // It seems that iterating over a small list is way faster
1670
        // than building a map: https://stackoverflow.com/a/52710077/11646599
1671
        for _, expectedFeature := range expected.Features {
322✔
1672
                found = false
118✔
1673
                for _, actualFeature := range actual.Features {
590✔
1674
                        if expectedFeature == actualFeature {
590✔
1675
                                found = true
118✔
1676
                        }
118✔
1677
                }
1678
                if !found {
118✔
1679
                        missingFeatures = append(missingFeatures, expectedFeature)
×
1680
                }
×
1681
        }
1682

1683
        if len(missingFeatures) == 1 {
204✔
1684
                return fmt.Errorf("protocol feature %s is not supported", missingFeatures[0])
×
1685
        }
×
1686

1687
        if len(missingFeatures) > 1 {
204✔
1688
                var sarr []string
×
1689
                for _, missingFeature := range missingFeatures {
×
1690
                        sarr = append(sarr, missingFeature.String())
×
1691
                }
×
1692
                return fmt.Errorf("protocol features %s are not supported", strings.Join(sarr, ", "))
×
1693
        }
1694

1695
        return nil
204✔
1696
}
1697

1698
// identify sends info about client protocol, receives info
1699
// about server protocol in response and stores it in the connection.
1700
func (conn *Connection) identify(w *bufio.Writer, r *bufio.Reader) error {
204✔
1701
        var ok bool
204✔
1702

204✔
1703
        werr := conn.writeIdRequest(w, clientProtocolInfo)
204✔
1704
        if werr != nil {
204✔
1705
                return werr
×
1706
        }
×
1707

1708
        resp, rerr := conn.readIdResponse(r)
204✔
1709
        if rerr != nil {
204✔
1710
                if resp.Code == ErrUnknownRequestType {
×
1711
                        // IPROTO_ID requests are not supported by server.
×
1712
                        return nil
×
1713
                }
×
1714

1715
                return rerr
×
1716
        }
1717

1718
        if len(resp.Data) == 0 {
204✔
1719
                return fmt.Errorf("identify: unexpected response: no data")
×
1720
        }
×
1721

1722
        conn.serverProtocolInfo, ok = resp.Data[0].(ProtocolInfo)
204✔
1723
        if !ok {
204✔
1724
                return fmt.Errorf("identify: unexpected response: wrong data")
×
1725
        }
×
1726

1727
        return nil
204✔
1728
}
1729

1730
// ServerProtocolVersion returns protocol version and protocol features
1731
// supported by connected Tarantool server. Beware that values might be
1732
// outdated if connection is in a disconnected state.
1733
// Since 1.10.0
1734
func (conn *Connection) ServerProtocolInfo() ProtocolInfo {
4✔
1735
        return conn.serverProtocolInfo.Clone()
4✔
1736
}
4✔
1737

1738
// ClientProtocolVersion returns protocol version and protocol features
1739
// supported by Go connection client.
1740
// Since 1.10.0
1741
func (conn *Connection) ClientProtocolInfo() ProtocolInfo {
5✔
1742
        return clientProtocolInfo.Clone()
5✔
1743
}
5✔
1744

1745
func shutdownEventCallback(event WatchEvent) {
305✔
1746
        // Receives "true" on server shutdown.
305✔
1747
        // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
305✔
1748
        // step 2.
305✔
1749
        val, ok := event.Value.(bool)
305✔
1750
        if ok && val {
408✔
1751
                go event.Conn.shutdown()
103✔
1752
        }
103✔
1753
}
1754

1755
func (conn *Connection) shutdown() {
103✔
1756
        // Forbid state changes.
103✔
1757
        conn.mutex.Lock()
103✔
1758
        defer conn.mutex.Unlock()
103✔
1759

103✔
1760
        if !atomic.CompareAndSwapUint32(&(conn.state), connConnected, connShutdown) {
103✔
1761
                return
×
1762
        }
×
1763
        conn.cond.Broadcast()
103✔
1764
        conn.notify(Shutdown)
103✔
1765

103✔
1766
        c := conn.c
103✔
1767
        for {
260✔
1768
                if (atomic.LoadUint32(&conn.state) != connShutdown) || (c != conn.c) {
159✔
1769
                        return
2✔
1770
                }
2✔
1771
                if atomic.LoadInt64(&conn.requestCnt) == 0 {
256✔
1772
                        break
101✔
1773
                }
1774
                // Use cond var on conn.mutex since request execution may
1775
                // call reconnect(). It is ok if state changes as part of
1776
                // reconnect since Tarantool server won't allow to reconnect
1777
                // in the middle of shutting down.
1778
                conn.cond.Wait()
54✔
1779
        }
1780

1781
        // Start to reconnect based on common rules, same as in net.box.
1782
        // Reconnect also closes the connection: server waits until all
1783
        // subscribed connections are terminated.
1784
        // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1785
        // step 3.
1786
        conn.reconnectImpl(
101✔
1787
                ClientError{
101✔
1788
                        ErrConnectionClosed,
101✔
1789
                        "connection closed after server shutdown",
101✔
1790
                }, conn.c)
101✔
1791
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc