• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / go-tarantool / 13930913017

18 Mar 2025 06:33PM UTC coverage: 75.939% (+0.08%) from 75.863%
13930913017

Pull #435

github

maksim.konovalov
pool: Pooler interface supports GetInfo method in TopologyEditor
Pull Request #435: pool: Pooler interface supports GetInfo method in TopologyEditor

2992 of 3940 relevant lines covered (75.94%)

9863.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.98
/connection.go
1
// Package with implementation of methods and structures for work with
2
// Tarantool instance.
3
package tarantool
4

5
import (
6
        "context"
7
        "encoding/binary"
8
        "errors"
9
        "fmt"
10
        "io"
11
        "log"
12
        "math"
13
        "net"
14
        "runtime"
15
        "sync"
16
        "sync/atomic"
17
        "time"
18

19
        "github.com/tarantool/go-iproto"
20
        "github.com/vmihailenco/msgpack/v5"
21
)
22

23
const requestsMap = 128
24
const ignoreStreamId = 0
25
const (
26
        connDisconnected = 0
27
        connConnected    = 1
28
        connShutdown     = 2
29
        connClosed       = 3
30
)
31

32
const shutdownEventKey = "box.shutdown"
33

34
type ConnEventKind int
35
type ConnLogKind int
36

37
var (
38
        errUnknownRequest = errors.New("the passed connected request doesn't belong " +
39
                "to the current connection or connection pool")
40
)
41

42
const (
43
        // Connected signals that connection is established or reestablished.
44
        Connected ConnEventKind = iota + 1
45
        // Disconnected signals that connection is broken.
46
        Disconnected
47
        // ReconnectFailed signals that attempt to reconnect has failed.
48
        ReconnectFailed
49
        // Shutdown signals that shutdown callback is processing.
50
        Shutdown
51
        // Either reconnect attempts exhausted, or explicit Close is called.
52
        Closed
53

54
        // LogReconnectFailed is logged when reconnect attempt failed.
55
        LogReconnectFailed ConnLogKind = iota + 1
56
        // LogLastReconnectFailed is logged when last reconnect attempt failed,
57
        // connection will be closed after that.
58
        LogLastReconnectFailed
59
        // LogUnexpectedResultId is logged when response with unknown id was received.
60
        // Most probably it is due to request timeout.
61
        LogUnexpectedResultId
62
        // LogWatchEventReadFailed is logged when failed to read a watch event.
63
        LogWatchEventReadFailed
64
        // LogAppendPushFailed is logged when failed to append a push response.
65
        LogAppendPushFailed
66
)
67

68
// ConnEvent is sent throw Notify channel specified in Opts.
69
type ConnEvent struct {
70
        Conn *Connection
71
        Kind ConnEventKind
72
        When time.Time
73
}
74

75
// A raw watch event.
76
type connWatchEvent struct {
77
        key   string
78
        value interface{}
79
}
80

81
var epoch = time.Now()
82

83
// Logger is logger type expected to be passed in options.
84
type Logger interface {
85
        Report(event ConnLogKind, conn *Connection, v ...interface{})
86
}
87

88
type defaultLogger struct{}
89

90
func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interface{}) {
58✔
91
        switch event {
58✔
92
        case LogReconnectFailed:
58✔
93
                reconnects := v[0].(uint)
58✔
94
                err := v[1].(error)
58✔
95
                log.Printf("tarantool: reconnect (%d/%d) to %s failed: %s",
58✔
96
                        reconnects, conn.opts.MaxReconnects, conn.Addr(), err)
58✔
97
        case LogLastReconnectFailed:
×
98
                err := v[0].(error)
×
99
                log.Printf("tarantool: last reconnect to %s failed: %s, giving it up",
×
100
                        conn.Addr(), err)
×
101
        case LogUnexpectedResultId:
×
102
                header := v[0].(Header)
×
103
                log.Printf("tarantool: connection %s got unexpected request ID (%d) in response "+
×
104
                        "(probably cancelled request)",
×
105
                        conn.Addr(), header.RequestId)
×
106
        case LogWatchEventReadFailed:
×
107
                err := v[0].(error)
×
108
                log.Printf("tarantool: unable to parse watch event: %s", err)
×
109
        case LogAppendPushFailed:
×
110
                err := v[0].(error)
×
111
                log.Printf("tarantool: unable to append a push response: %s", err)
×
112
        default:
×
113
                args := append([]interface{}{"tarantool: unexpected event ", event, conn}, v...)
×
114
                log.Print(args...)
×
115
        }
116
}
117

118
// Connection is a handle with a single connection to a Tarantool instance.
119
//
120
// It is created and configured with Connect function, and could not be
121
// reconfigured later.
122
//
123
// Connection could be in three possible states:
124
//
125
// - In "Connected" state it sends queries to Tarantool.
126
//
127
// - In "Disconnected" state it rejects queries with ClientError{Code:
128
// ErrConnectionNotReady}
129
//
130
// - In "Shutdown" state it rejects queries with ClientError{Code:
131
// ErrConnectionShutdown}. The state indicates that a graceful shutdown
132
// in progress. The connection waits for all active requests to
133
// complete.
134
//
135
// - In "Closed" state it rejects queries with ClientError{Code:
136
// ErrConnectionClosed}. Connection could become "Closed" when
137
// Connection.Close() method called, or when Tarantool disconnected and
138
// Reconnect pause is not specified or MaxReconnects is specified and
139
// MaxReconnect reconnect attempts already performed.
140
//
141
// You may perform data manipulation operation by calling its methods:
142
// Call*, Insert*, Replace*, Update*, Upsert*, Call*, Eval*.
143
//
144
// In any method that accepts space you my pass either space number or space
145
// name (in this case it will be looked up in schema). Same is true for index.
146
//
147
// ATTENTION: tuple, key, ops and args arguments for any method should be
148
// and array or should serialize to msgpack array.
149
//
150
// ATTENTION: result argument for *Typed methods should deserialize from
151
// msgpack array, cause Tarantool always returns result as an array.
152
// For all space related methods and Call16* (but not Call17*) methods Tarantool
153
// always returns array of array (array of tuples for space related methods).
154
// For Eval* and Call* Tarantool always returns array, but does not forces
155
// array of arrays.
156
//
157
// If connected to Tarantool 2.10 or newer, connection supports server graceful
158
// shutdown. In this case, server will wait until all client requests will be
159
// finished and client disconnects before going down (server also may go down
160
// by timeout). Client reconnect will happen if connection options enable
161
// reconnect. Beware that graceful shutdown event initialization is asynchronous.
162
//
163
// More on graceful shutdown:
164
// https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
165
type Connection struct {
166
        addr   net.Addr
167
        dialer Dialer
168
        c      Conn
169
        mutex  sync.Mutex
170
        cond   *sync.Cond
171
        // schemaResolver contains a SchemaResolver implementation.
172
        schemaResolver SchemaResolver
173
        // requestId contains the last request ID for requests with nil context.
174
        requestId uint32
175
        // contextRequestId contains the last request ID for requests with context.
176
        contextRequestId uint32
177
        // Greeting contains first message sent by Tarantool.
178
        Greeting *Greeting
179

180
        shard      []connShard
181
        dirtyShard chan uint32
182

183
        control chan struct{}
184
        rlimit  chan struct{}
185
        opts    Opts
186
        state   uint32
187
        dec     *msgpack.Decoder
188
        lenbuf  [packetLengthBytes]byte
189

190
        lastStreamId uint64
191

192
        serverProtocolInfo ProtocolInfo
193
        // watchMap is a map of key -> chan watchState.
194
        watchMap sync.Map
195

196
        // shutdownWatcher is the "box.shutdown" event watcher.
197
        shutdownWatcher Watcher
198
        // requestCnt is a counter of active requests.
199
        requestCnt int64
200
}
201

202
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
203

204
type futureList struct {
205
        first *Future
206
        last  **Future
207
}
208

209
func (list *futureList) findFuture(reqid uint32, fetch bool) *Future {
1,830✔
210
        root := &list.first
1,830✔
211
        for {
3,660✔
212
                fut := *root
1,830✔
213
                if fut == nil {
1,840✔
214
                        return nil
10✔
215
                }
10✔
216
                if fut.requestId == reqid {
3,640✔
217
                        if fetch {
3,640✔
218
                                *root = fut.next
1,820✔
219
                                if fut.next == nil {
3,640✔
220
                                        list.last = root
1,820✔
221
                                } else {
1,820✔
222
                                        fut.next = nil
×
223
                                }
×
224
                        }
225
                        return fut
1,820✔
226
                }
227
                root = &fut.next
×
228
        }
229
}
230

231
func (list *futureList) addFuture(fut *Future) {
1,816✔
232
        *list.last = fut
1,816✔
233
        list.last = &fut.next
1,816✔
234
}
1,816✔
235

236
func (list *futureList) clear(err error, conn *Connection) {
2,572,288✔
237
        fut := list.first
2,572,288✔
238
        list.first = nil
2,572,288✔
239
        list.last = &list.first
2,572,288✔
240
        for fut != nil {
2,572,297✔
241
                fut.SetError(err)
9✔
242
                conn.markDone(fut)
9✔
243
                fut, fut.next = fut.next, nil
9✔
244
        }
9✔
245
}
246

247
type connShard struct {
248
        rmut            sync.Mutex
249
        requests        [requestsMap]futureList
250
        requestsWithCtx [requestsMap]futureList
251
        bufmut          sync.Mutex
252
        buf             smallWBuf
253
        enc             *msgpack.Encoder
254
}
255

256
// RLimitActions is an enumeration type for an action to do when a rate limit
257
// is reached.
258
type RLimitAction int
259

260
const (
261
        // RLimitDrop immediately aborts the request.
262
        RLimitDrop RLimitAction = iota
263
        // RLimitWait waits during timeout period for some request to be answered.
264
        // If no request answered during timeout period, this request is aborted.
265
        // If no timeout period is set, it will wait forever.
266
        RLimitWait
267
)
268

269
// Opts is a way to configure Connection
270
type Opts struct {
271
        // Timeout for response to a particular request. The timeout is reset when
272
        // push messages are received. If Timeout is zero, any request can be
273
        // blocked infinitely.
274
        // Also used to setup net.TCPConn.Set(Read|Write)Deadline.
275
        //
276
        // Pay attention, when using contexts with request objects,
277
        // the timeout option for Connection does not affect the lifetime
278
        // of the request. For those purposes use context.WithTimeout() as
279
        // the root context.
280
        Timeout time.Duration
281
        // Timeout between reconnect attempts. If Reconnect is zero, no
282
        // reconnect attempts will be made.
283
        // If specified, then when Tarantool is not reachable or disconnected,
284
        // new connect attempt is performed after pause.
285
        // By default, no reconnection attempts are performed,
286
        // so once disconnected, connection becomes Closed.
287
        Reconnect time.Duration
288
        // Maximum number of reconnect failures; after that we give it up to
289
        // on. If MaxReconnects is zero, the client will try to reconnect
290
        // endlessly.
291
        // After MaxReconnects attempts Connection becomes closed.
292
        MaxReconnects uint
293
        // RateLimit limits number of 'in-fly' request, i.e. already put into
294
        // requests queue, but not yet answered by server or timeouted.
295
        // It is disabled by default.
296
        // See RLimitAction for possible actions when RateLimit.reached.
297
        RateLimit uint
298
        // RLimitAction tells what to do when RateLimit is reached.
299
        // It is required if RateLimit is specified.
300
        RLimitAction RLimitAction
301
        // Concurrency is amount of separate mutexes for request
302
        // queues and buffers inside of connection.
303
        // It is rounded up to nearest power of 2.
304
        // By default it is runtime.GOMAXPROCS(-1) * 4
305
        Concurrency uint32
306
        // SkipSchema disables schema loading. Without disabling schema loading,
307
        // there is no way to create Connection for currently not accessible Tarantool.
308
        SkipSchema bool
309
        // Notify is a channel which receives notifications about Connection status
310
        // changes.
311
        Notify chan<- ConnEvent
312
        // Handle is user specified value, that could be retrivied with
313
        // Handle() method.
314
        Handle interface{}
315
        // Logger is user specified logger used for error messages.
316
        Logger Logger
317
}
318

319
// Connect creates and configures a new Connection.
320
func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, err error) {
330✔
321
        conn = &Connection{
330✔
322
                dialer:           dialer,
330✔
323
                requestId:        0,
330✔
324
                contextRequestId: 1,
330✔
325
                Greeting:         &Greeting{},
330✔
326
                control:          make(chan struct{}),
330✔
327
                opts:             opts,
330✔
328
                dec:              msgpack.NewDecoder(&smallBuf{}),
330✔
329
        }
330✔
330
        maxprocs := uint32(runtime.GOMAXPROCS(-1))
330✔
331
        if conn.opts.Concurrency == 0 || conn.opts.Concurrency > maxprocs*128 {
658✔
332
                conn.opts.Concurrency = maxprocs * 4
328✔
333
        }
328✔
334
        if c := conn.opts.Concurrency; c&(c-1) != 0 {
330✔
335
                for i := uint(1); i < 32; i *= 2 {
×
336
                        c |= c >> i
×
337
                }
×
338
                conn.opts.Concurrency = c + 1
×
339
        }
340
        conn.dirtyShard = make(chan uint32, conn.opts.Concurrency*2)
330✔
341
        conn.shard = make([]connShard, conn.opts.Concurrency)
330✔
342
        for i := range conn.shard {
5,642✔
343
                shard := &conn.shard[i]
5,312✔
344
                requestsLists := []*[requestsMap]futureList{&shard.requests, &shard.requestsWithCtx}
5,312✔
345
                for _, requests := range requestsLists {
15,936✔
346
                        for j := range requests {
1,370,496✔
347
                                requests[j].last = &requests[j].first
1,359,872✔
348
                        }
1,359,872✔
349
                }
350
        }
351

352
        if conn.opts.RateLimit > 0 {
330✔
353
                conn.rlimit = make(chan struct{}, conn.opts.RateLimit)
×
354
                if conn.opts.RLimitAction != RLimitDrop && conn.opts.RLimitAction != RLimitWait {
×
355
                        return nil, errors.New("RLimitAction should be specified to RLimitDone nor RLimitWait")
×
356
                }
×
357
        }
358

359
        if conn.opts.Logger == nil {
660✔
360
                conn.opts.Logger = defaultLogger{}
330✔
361
        }
330✔
362

363
        conn.cond = sync.NewCond(&conn.mutex)
330✔
364

330✔
365
        if err = conn.createConnection(ctx); err != nil {
348✔
366
                return nil, err
18✔
367
        }
18✔
368

369
        go conn.pinger()
312✔
370
        if conn.opts.Timeout > 0 {
623✔
371
                go conn.timeouts()
311✔
372
        }
311✔
373

374
        // TODO: reload schema after reconnect.
375
        if !conn.opts.SkipSchema {
605✔
376
                schema, err := GetSchema(conn)
293✔
377
                if err != nil {
336✔
378
                        conn.mutex.Lock()
43✔
379
                        defer conn.mutex.Unlock()
43✔
380
                        conn.closeConnection(err, true)
43✔
381
                        return nil, err
43✔
382
                }
43✔
383
                conn.SetSchema(schema)
250✔
384
        }
385

386
        return conn, err
269✔
387
}
388

389
// ConnectedNow reports if connection is established at the moment.
390
func (conn *Connection) ConnectedNow() bool {
4✔
391
        return atomic.LoadUint32(&conn.state) == connConnected
4✔
392
}
4✔
393

394
// ClosedNow reports if connection is closed by user or after reconnect.
395
func (conn *Connection) ClosedNow() bool {
316✔
396
        return atomic.LoadUint32(&conn.state) == connClosed
316✔
397
}
316✔
398

399
// Close closes Connection.
400
// After this method called, there is no way to reopen this Connection.
401
func (conn *Connection) Close() error {
270✔
402
        err := ClientError{ErrConnectionClosed, "connection closed by client"}
270✔
403
        conn.mutex.Lock()
270✔
404
        defer conn.mutex.Unlock()
270✔
405
        return conn.closeConnection(err, true)
270✔
406
}
270✔
407

408
// CloseGraceful closes Connection gracefully. It waits for all requests to
409
// complete.
410
// After this method called, there is no way to reopen this Connection.
411
func (conn *Connection) CloseGraceful() error {
2✔
412
        return conn.shutdown(true)
2✔
413
}
2✔
414

415
// Addr returns a configured address of Tarantool socket.
416
func (conn *Connection) Addr() net.Addr {
59✔
417
        return conn.addr
59✔
418
}
59✔
419

420
// Handle returns a user-specified handle from Opts.
421
func (conn *Connection) Handle() interface{} {
×
422
        return conn.opts.Handle
×
423
}
×
424

425
func (conn *Connection) cancelFuture(fut *Future, err error) {
4✔
426
        if fut = conn.fetchFuture(fut.requestId); fut != nil {
4✔
427
                fut.SetError(err)
×
428
                conn.markDone(fut)
×
429
        }
×
430
}
431

432
func (conn *Connection) dial(ctx context.Context) error {
391✔
433
        opts := conn.opts
391✔
434

391✔
435
        var c Conn
391✔
436
        c, err := conn.dialer.Dial(ctx, DialOpts{
391✔
437
                IoTimeout: opts.Timeout,
391✔
438
        })
391✔
439
        if err != nil {
467✔
440
                return err
76✔
441
        }
76✔
442

443
        conn.addr = c.Addr()
315✔
444
        connGreeting := c.Greeting()
315✔
445
        conn.Greeting.Version = connGreeting.Version
315✔
446
        conn.Greeting.Salt = connGreeting.Salt
315✔
447
        conn.serverProtocolInfo = c.ProtocolInfo()
315✔
448

315✔
449
        if conn.schemaResolver == nil {
627✔
450
                namesSupported := isFeatureInSlice(iproto.IPROTO_FEATURE_SPACE_AND_INDEX_NAMES,
312✔
451
                        conn.serverProtocolInfo.Features)
312✔
452

312✔
453
                conn.schemaResolver = &noSchemaResolver{
312✔
454
                        SpaceAndIndexNamesSupported: namesSupported,
312✔
455
                }
312✔
456
        }
312✔
457

458
        // Watchers.
459
        conn.watchMap.Range(func(key, value interface{}) bool {
319✔
460
                st := value.(chan watchState)
4✔
461
                state := <-st
4✔
462
                if state.unready != nil {
4✔
463
                        st <- state
×
464
                        return true
×
465
                }
×
466

467
                req := newWatchRequest(key.(string))
4✔
468
                if err = writeRequest(c, req); err != nil {
4✔
469
                        st <- state
×
470
                        return false
×
471
                }
×
472
                state.ack = true
4✔
473

4✔
474
                st <- state
4✔
475
                return true
4✔
476
        })
477

478
        if err != nil {
315✔
479
                c.Close()
×
480
                return fmt.Errorf("unable to register watch: %w", err)
×
481
        }
×
482

483
        // Only if connected and fully initialized.
484
        conn.lockShards()
315✔
485
        conn.c = c
315✔
486
        atomic.StoreUint32(&conn.state, connConnected)
315✔
487
        conn.cond.Broadcast()
315✔
488
        conn.unlockShards()
315✔
489
        go conn.writer(c, c)
315✔
490
        go conn.reader(c, c)
315✔
491

315✔
492
        // Subscribe shutdown event to process graceful shutdown.
315✔
493
        if conn.shutdownWatcher == nil &&
315✔
494
                isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
315✔
495
                        conn.serverProtocolInfo.Features) {
622✔
496
                watcher, werr := conn.newWatcherImpl(shutdownEventKey, shutdownEventCallback)
307✔
497
                if werr != nil {
307✔
498
                        return werr
×
499
                }
×
500
                conn.shutdownWatcher = watcher
307✔
501
        }
502

503
        return nil
315✔
504
}
505

506
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
507
        req Request, streamId uint64, res SchemaResolver) (err error) {
2,463✔
508
        const uint32Code = 0xce
2,463✔
509
        const uint64Code = 0xcf
2,463✔
510
        const streamBytesLenUint64 = 10
2,463✔
511
        const streamBytesLenUint32 = 6
2,463✔
512

2,463✔
513
        hl := h.Len()
2,463✔
514

2,463✔
515
        var streamBytesLen = 0
2,463✔
516
        var streamBytes [streamBytesLenUint64]byte
2,463✔
517
        hMapLen := byte(0x82) // 2 element map.
2,463✔
518
        if streamId != ignoreStreamId {
2,501✔
519
                hMapLen = byte(0x83) // 3 element map.
38✔
520
                streamBytes[0] = byte(iproto.IPROTO_STREAM_ID)
38✔
521
                if streamId > math.MaxUint32 {
40✔
522
                        streamBytesLen = streamBytesLenUint64
2✔
523
                        streamBytes[1] = uint64Code
2✔
524
                        binary.BigEndian.PutUint64(streamBytes[2:], streamId)
2✔
525
                } else {
38✔
526
                        streamBytesLen = streamBytesLenUint32
36✔
527
                        streamBytes[1] = uint32Code
36✔
528
                        binary.BigEndian.PutUint32(streamBytes[2:], uint32(streamId))
36✔
529
                }
36✔
530
        }
531

532
        hBytes := append([]byte{
2,463✔
533
                uint32Code, 0, 0, 0, 0, // Length.
2,463✔
534
                hMapLen,
2,463✔
535
                byte(iproto.IPROTO_REQUEST_TYPE), byte(req.Type()), // Request type.
2,463✔
536
                byte(iproto.IPROTO_SYNC), uint32Code,
2,463✔
537
                byte(reqid >> 24), byte(reqid >> 16),
2,463✔
538
                byte(reqid >> 8), byte(reqid),
2,463✔
539
        }, streamBytes[:streamBytesLen]...)
2,463✔
540

2,463✔
541
        h.Write(hBytes)
2,463✔
542

2,463✔
543
        if err = req.Body(res, enc); err != nil {
2,463✔
544
                return
×
545
        }
×
546

547
        l := uint32(h.Len() - 5 - hl)
2,463✔
548
        h.b[hl+1] = byte(l >> 24)
2,463✔
549
        h.b[hl+2] = byte(l >> 16)
2,463✔
550
        h.b[hl+3] = byte(l >> 8)
2,463✔
551
        h.b[hl+4] = byte(l)
2,463✔
552

2,463✔
553
        return
2,463✔
554
}
555

556
func (conn *Connection) createConnection(ctx context.Context) error {
444✔
557
        var err error
444✔
558
        if conn.c == nil && conn.state == connDisconnected {
835✔
559
                if err = conn.dial(ctx); err == nil {
706✔
560
                        conn.notify(Connected)
315✔
561
                        return nil
315✔
562
                }
315✔
563
        }
564
        if conn.state == connClosed {
182✔
565
                err = ClientError{ErrConnectionClosed, "using closed connection"}
53✔
566
        }
53✔
567
        return err
129✔
568
}
569

570
func (conn *Connection) closeConnection(neterr error, forever bool) (err error) {
625✔
571
        conn.lockShards()
625✔
572
        defer conn.unlockShards()
625✔
573
        if forever {
1,194✔
574
                if conn.state != connClosed {
879✔
575
                        close(conn.control)
310✔
576
                        atomic.StoreUint32(&conn.state, connClosed)
310✔
577
                        conn.cond.Broadcast()
310✔
578
                        // Free the resources.
310✔
579
                        if conn.shutdownWatcher != nil {
615✔
580
                                go conn.shutdownWatcher.Unregister()
305✔
581
                                conn.shutdownWatcher = nil
305✔
582
                        }
305✔
583
                        conn.notify(Closed)
310✔
584
                }
585
        } else {
56✔
586
                atomic.StoreUint32(&conn.state, connDisconnected)
56✔
587
                conn.cond.Broadcast()
56✔
588
                conn.notify(Disconnected)
56✔
589
        }
56✔
590
        if conn.c != nil {
938✔
591
                err = conn.c.Close()
313✔
592
                conn.c = nil
313✔
593
        }
313✔
594
        for i := range conn.shard {
10,673✔
595
                conn.shard[i].buf.Reset()
10,048✔
596
                requestsLists := []*[requestsMap]futureList{
10,048✔
597
                        &conn.shard[i].requests,
10,048✔
598
                        &conn.shard[i].requestsWithCtx,
10,048✔
599
                }
10,048✔
600
                for _, requests := range requestsLists {
30,144✔
601
                        for pos := range requests {
2,592,384✔
602
                                requests[pos].clear(neterr, conn)
2,572,288✔
603
                        }
2,572,288✔
604
                }
605
        }
606
        return
625✔
607
}
608

609
func (conn *Connection) getDialTimeout() time.Duration {
56✔
610
        dialTimeout := conn.opts.Reconnect / 2
56✔
611
        if dialTimeout == 0 {
56✔
612
                dialTimeout = 500 * time.Millisecond
×
613
        } else if dialTimeout > 5*time.Second {
56✔
614
                dialTimeout = 5 * time.Second
×
615
        }
×
616
        return dialTimeout
56✔
617
}
618

619
func (conn *Connection) runReconnects() error {
56✔
620
        dialTimeout := conn.getDialTimeout()
56✔
621
        var reconnects uint
56✔
622
        var err error
56✔
623

56✔
624
        for conn.opts.MaxReconnects == 0 || reconnects <= conn.opts.MaxReconnects {
170✔
625
                now := time.Now()
114✔
626

114✔
627
                ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
114✔
628
                err = conn.createConnection(ctx)
114✔
629
                cancel()
114✔
630

114✔
631
                if err != nil {
225✔
632
                        if clientErr, ok := err.(ClientError); ok &&
111✔
633
                                clientErr.Code == ErrConnectionClosed {
164✔
634
                                return err
53✔
635
                        }
53✔
636
                } else {
3✔
637
                        return nil
3✔
638
                }
3✔
639

640
                conn.opts.Logger.Report(LogReconnectFailed, conn, reconnects, err)
58✔
641
                conn.notify(ReconnectFailed)
58✔
642
                reconnects++
58✔
643
                conn.mutex.Unlock()
58✔
644

58✔
645
                time.Sleep(time.Until(now.Add(conn.opts.Reconnect)))
58✔
646

58✔
647
                conn.mutex.Lock()
58✔
648
        }
649

650
        conn.opts.Logger.Report(LogLastReconnectFailed, conn, err)
×
651
        // mark connection as closed to avoid reopening by another goroutine
×
652
        return ClientError{ErrConnectionClosed, "last reconnect failed"}
×
653
}
654

655
func (conn *Connection) reconnectImpl(neterr error, c Conn) {
330✔
656
        if conn.opts.Reconnect > 0 {
458✔
657
                if c == conn.c {
184✔
658
                        conn.closeConnection(neterr, false)
56✔
659
                        if err := conn.runReconnects(); err != nil {
109✔
660
                                conn.closeConnection(err, true)
53✔
661
                        }
53✔
662
                }
663
        } else {
202✔
664
                conn.closeConnection(neterr, true)
202✔
665
        }
202✔
666
}
667

668
func (conn *Connection) reconnect(neterr error, c Conn) {
277✔
669
        conn.mutex.Lock()
277✔
670
        defer conn.mutex.Unlock()
277✔
671
        conn.reconnectImpl(neterr, c)
277✔
672
        conn.cond.Broadcast()
277✔
673
}
277✔
674

675
func (conn *Connection) lockShards() {
1,191✔
676
        for i := range conn.shard {
20,359✔
677
                conn.shard[i].rmut.Lock()
19,168✔
678
                conn.shard[i].bufmut.Lock()
19,168✔
679
        }
19,168✔
680
}
681

682
func (conn *Connection) unlockShards() {
1,191✔
683
        for i := range conn.shard {
20,359✔
684
                conn.shard[i].rmut.Unlock()
19,168✔
685
                conn.shard[i].bufmut.Unlock()
19,168✔
686
        }
19,168✔
687
}
688

689
func (conn *Connection) pinger() {
312✔
690
        to := conn.opts.Timeout
312✔
691
        if to == 0 {
313✔
692
                to = 3 * time.Second
1✔
693
        }
1✔
694
        t := time.NewTicker(to / 3)
312✔
695
        defer t.Stop()
312✔
696
        for {
625✔
697
                select {
313✔
698
                case <-conn.control:
310✔
699
                        return
310✔
700
                case <-t.C:
1✔
701
                }
702
                conn.Ping()
1✔
703
        }
704
}
705

706
func (conn *Connection) notify(kind ConnEventKind) {
796✔
707
        if conn.opts.Notify != nil {
796✔
708
                select {
×
709
                case conn.opts.Notify <- ConnEvent{Kind: kind, Conn: conn, When: time.Now()}:
×
710
                default:
×
711
                }
712
        }
713
}
714

715
func (conn *Connection) writer(w writeFlusher, c Conn) {
315✔
716
        var shardn uint32
315✔
717
        var packet smallWBuf
315✔
718
        for atomic.LoadUint32(&conn.state) != connClosed {
2,432✔
719
                select {
2,117✔
720
                case shardn = <-conn.dirtyShard:
537✔
721
                default:
1,580✔
722
                        runtime.Gosched()
1,580✔
723
                        if len(conn.dirtyShard) == 0 {
3,114✔
724
                                if err := w.Flush(); err != nil {
1,541✔
725
                                        err = ClientError{
7✔
726
                                                ErrIoError,
7✔
727
                                                fmt.Sprintf("failed to flush data to the connection: %s", err),
7✔
728
                                        }
7✔
729
                                        conn.reconnect(err, c)
7✔
730
                                        return
7✔
731
                                }
7✔
732
                        }
733
                        select {
1,573✔
734
                        case shardn = <-conn.dirtyShard:
1,278✔
735
                        case <-conn.control:
293✔
736
                                return
293✔
737
                        }
738
                }
739
                shard := &conn.shard[shardn]
1,815✔
740
                shard.bufmut.Lock()
1,815✔
741
                if conn.c != c {
1,827✔
742
                        conn.dirtyShard <- shardn
12✔
743
                        shard.bufmut.Unlock()
12✔
744
                        return
12✔
745
                }
12✔
746
                packet, shard.buf = shard.buf, packet
1,803✔
747
                shard.bufmut.Unlock()
1,803✔
748
                if packet.Len() == 0 {
1,803✔
749
                        continue
×
750
                }
751
                if _, err := w.Write(packet.b); err != nil {
1,803✔
752
                        err = ClientError{
×
753
                                ErrIoError,
×
754
                                fmt.Sprintf("failed to write data to the connection: %s", err),
×
755
                        }
×
756
                        conn.reconnect(err, c)
×
757
                        return
×
758
                }
×
759
                packet.Reset()
1,803✔
760
        }
761
}
762

763
func readWatchEvent(reader io.Reader) (connWatchEvent, error) {
429✔
764
        keyExist := false
429✔
765
        event := connWatchEvent{}
429✔
766
        d := msgpack.NewDecoder(reader)
429✔
767

429✔
768
        l, err := d.DecodeMapLen()
429✔
769
        if err != nil {
429✔
770
                return event, err
×
771
        }
×
772

773
        for ; l > 0; l-- {
970✔
774
                cd, err := d.DecodeInt()
541✔
775
                if err != nil {
541✔
776
                        return event, err
×
777
                }
×
778

779
                switch iproto.Key(cd) {
541✔
780
                case iproto.IPROTO_EVENT_KEY:
429✔
781
                        if event.key, err = d.DecodeString(); err != nil {
429✔
782
                                return event, err
×
783
                        }
×
784
                        keyExist = true
429✔
785
                case iproto.IPROTO_EVENT_DATA:
112✔
786
                        if event.value, err = d.DecodeInterface(); err != nil {
112✔
787
                                return event, err
×
788
                        }
×
789
                default:
×
790
                        if err = d.Skip(); err != nil {
×
791
                                return event, err
×
792
                        }
×
793
                }
794
        }
795

796
        if !keyExist {
429✔
797
                return event, errors.New("watch event does not have a key")
×
798
        }
×
799

800
        return event, nil
429✔
801
}
802

803
func (conn *Connection) reader(r io.Reader, c Conn) {
315✔
804
        events := make(chan connWatchEvent, 1024)
315✔
805
        defer close(events)
315✔
806

315✔
807
        go conn.eventer(events)
315✔
808

315✔
809
        for atomic.LoadUint32(&conn.state) != connClosed {
2,138✔
810
                respBytes, err := read(r, conn.lenbuf[:])
1,823✔
811
                if err != nil {
2,093✔
812
                        err = ClientError{
270✔
813
                                ErrIoError,
270✔
814
                                fmt.Sprintf("failed to read data from the connection: %s", err),
270✔
815
                        }
270✔
816
                        conn.reconnect(err, c)
270✔
817
                        return
270✔
818
                }
270✔
819
                buf := smallBuf{b: respBytes}
1,551✔
820
                header, code, err := decodeHeader(conn.dec, &buf)
1,551✔
821
                if err != nil {
1,551✔
822
                        err = ClientError{
×
823
                                ErrProtocolError,
×
824
                                fmt.Sprintf("failed to decode IPROTO header: %s", err),
×
825
                        }
×
826
                        conn.reconnect(err, c)
×
827
                        return
×
828
                }
×
829

830
                var fut *Future = nil
1,551✔
831
                if code == iproto.IPROTO_EVENT {
1,980✔
832
                        if event, err := readWatchEvent(&buf); err == nil {
858✔
833
                                events <- event
429✔
834
                        } else {
429✔
835
                                err = ClientError{
×
836
                                        ErrProtocolError,
×
837
                                        fmt.Sprintf("failed to decode IPROTO_EVENT: %s", err),
×
838
                                }
×
839
                                conn.opts.Logger.Report(LogWatchEventReadFailed, conn, err)
×
840
                        }
×
841
                        continue
429✔
842
                } else if code == iproto.IPROTO_CHUNK {
1,135✔
843
                        if fut = conn.peekFuture(header.RequestId); fut != nil {
26✔
844
                                if err := fut.AppendPush(header, &buf); err != nil {
13✔
845
                                        err = ClientError{
×
846
                                                ErrProtocolError,
×
847
                                                fmt.Sprintf("failed to append push response: %s", err),
×
848
                                        }
×
849
                                        conn.opts.Logger.Report(LogAppendPushFailed, conn, err)
×
850
                                }
×
851
                        }
852
                } else {
1,109✔
853
                        if fut = conn.fetchFuture(header.RequestId); fut != nil {
2,218✔
854
                                if err := fut.SetResponse(header, &buf); err != nil {
1,110✔
855
                                        fut.SetError(fmt.Errorf("failed to set response: %w", err))
1✔
856
                                }
1✔
857
                                conn.markDone(fut)
1,109✔
858
                        }
859
                }
860

861
                if fut == nil {
1,122✔
862
                        conn.opts.Logger.Report(LogUnexpectedResultId, conn, header)
×
863
                }
×
864
        }
865
}
866

867
// eventer goroutine gets watch events and updates values for watchers.
868
func (conn *Connection) eventer(events <-chan connWatchEvent) {
315✔
869
        for event := range events {
744✔
870
                if value, ok := conn.watchMap.Load(event.key); ok {
817✔
871
                        st := value.(chan watchState)
388✔
872
                        state := <-st
388✔
873
                        state.value = event.value
388✔
874
                        if state.version == math.MaxUint {
388✔
875
                                state.version = initWatchEventVersion + 1
×
876
                        } else {
388✔
877
                                state.version += 1
388✔
878
                        }
388✔
879
                        state.ack = false
388✔
880
                        if state.changed != nil {
776✔
881
                                close(state.changed)
388✔
882
                                state.changed = nil
388✔
883
                        }
388✔
884
                        st <- state
388✔
885
                }
886
                // It is possible to get IPROTO_EVENT after we already send
887
                // IPROTO_UNWATCH due to processing on a Tarantool side or slow
888
                // read from the network, so it looks like an expected behavior.
889
        }
890
}
891

892
func (conn *Connection) newFuture(req Request) (fut *Future) {
1,833✔
893
        ctx := req.Ctx()
1,833✔
894
        fut = NewFuture(req)
1,833✔
895
        if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
1,833✔
896
                select {
×
897
                case conn.rlimit <- struct{}{}:
×
898
                default:
×
899
                        fut.err = ClientError{
×
900
                                ErrRateLimited,
×
901
                                "Request is rate limited on client",
×
902
                        }
×
903
                        fut.ready = nil
×
904
                        fut.done = nil
×
905
                        return
×
906
                }
907
        }
908
        fut.requestId = conn.nextRequestId(ctx != nil)
1,833✔
909
        shardn := fut.requestId & (conn.opts.Concurrency - 1)
1,833✔
910
        shard := &conn.shard[shardn]
1,833✔
911
        shard.rmut.Lock()
1,833✔
912
        switch atomic.LoadUint32(&conn.state) {
1,833✔
913
        case connClosed:
7✔
914
                fut.err = ClientError{
7✔
915
                        ErrConnectionClosed,
7✔
916
                        "using closed connection",
7✔
917
                }
7✔
918
                fut.ready = nil
7✔
919
                fut.done = nil
7✔
920
                shard.rmut.Unlock()
7✔
921
                return
7✔
922
        case connDisconnected:
1✔
923
                fut.err = ClientError{
1✔
924
                        ErrConnectionNotReady,
1✔
925
                        "client connection is not ready",
1✔
926
                }
1✔
927
                fut.ready = nil
1✔
928
                fut.done = nil
1✔
929
                shard.rmut.Unlock()
1✔
930
                return
1✔
931
        case connShutdown:
5✔
932
                fut.err = ClientError{
5✔
933
                        ErrConnectionShutdown,
5✔
934
                        "server shutdown in progress",
5✔
935
                }
5✔
936
                fut.ready = nil
5✔
937
                fut.done = nil
5✔
938
                shard.rmut.Unlock()
5✔
939
                return
5✔
940
        }
941
        pos := (fut.requestId / conn.opts.Concurrency) & (requestsMap - 1)
1,820✔
942
        if ctx != nil {
1,824✔
943
                select {
4✔
944
                case <-ctx.Done():
4✔
945
                        fut.SetError(fmt.Errorf("context is done (request ID %d)", fut.requestId))
4✔
946
                        shard.rmut.Unlock()
4✔
947
                        return
4✔
948
                default:
×
949
                }
950
                shard.requestsWithCtx[pos].addFuture(fut)
×
951
        } else {
1,816✔
952
                shard.requests[pos].addFuture(fut)
1,816✔
953
                if conn.opts.Timeout > 0 {
3,623✔
954
                        fut.timeout = time.Since(epoch) + conn.opts.Timeout
1,807✔
955
                }
1,807✔
956
        }
957
        shard.rmut.Unlock()
1,816✔
958
        if conn.rlimit != nil && conn.opts.RLimitAction == RLimitWait {
1,816✔
959
                select {
×
960
                case conn.rlimit <- struct{}{}:
×
961
                default:
×
962
                        runtime.Gosched()
×
963
                        select {
×
964
                        case conn.rlimit <- struct{}{}:
×
965
                        case <-fut.done:
×
966
                                if fut.err == nil {
×
967
                                        panic("fut.done is closed, but err is nil")
×
968
                                }
969
                        }
970
                }
971
        }
972
        return
1,816✔
973
}
974

975
// This method removes a future from the internal queue if the context
976
// is "done" before the response is come.
977
func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
×
978
        select {
×
979
        case <-fut.done:
×
980
        case <-ctx.Done():
×
981
        }
982

983
        select {
×
984
        case <-fut.done:
×
985
                return
×
986
        default:
×
987
                conn.cancelFuture(fut, fmt.Errorf("context is done (request ID %d)", fut.requestId))
×
988
        }
989
}
990

991
func (conn *Connection) incrementRequestCnt() {
1,833✔
992
        atomic.AddInt64(&conn.requestCnt, int64(1))
1,833✔
993
}
1,833✔
994

995
func (conn *Connection) decrementRequestCnt() {
1,829✔
996
        if atomic.AddInt64(&conn.requestCnt, int64(-1)) == 0 {
3,568✔
997
                conn.cond.Broadcast()
1,739✔
998
        }
1,739✔
999
}
1000

1001
func (conn *Connection) send(req Request, streamId uint64) *Future {
1,833✔
1002
        conn.incrementRequestCnt()
1,833✔
1003

1,833✔
1004
        fut := conn.newFuture(req)
1,833✔
1005
        if fut.ready == nil {
1,846✔
1006
                conn.decrementRequestCnt()
13✔
1007
                return fut
13✔
1008
        }
13✔
1009

1010
        if req.Ctx() != nil {
1,824✔
1011
                select {
4✔
1012
                case <-req.Ctx().Done():
4✔
1013
                        conn.cancelFuture(fut, fmt.Errorf("context is done (request ID %d)", fut.requestId))
4✔
1014
                        return fut
4✔
1015
                default:
×
1016
                }
1017
                go conn.contextWatchdog(fut, req.Ctx())
×
1018
        }
1019
        conn.putFuture(fut, req, streamId)
1,816✔
1020

1,816✔
1021
        return fut
1,816✔
1022
}
1023

1024
func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
1,816✔
1025
        shardn := fut.requestId & (conn.opts.Concurrency - 1)
1,816✔
1026
        shard := &conn.shard[shardn]
1,816✔
1027
        shard.bufmut.Lock()
1,816✔
1028
        select {
1,816✔
1029
        case <-fut.done:
×
1030
                shard.bufmut.Unlock()
×
1031
                return
×
1032
        default:
1,816✔
1033
        }
1034
        firstWritten := shard.buf.Len() == 0
1,816✔
1035
        if shard.buf.Cap() == 0 {
3,365✔
1036
                shard.buf.b = make([]byte, 0, 128)
1,549✔
1037
                shard.enc = msgpack.NewEncoder(&shard.buf)
1,549✔
1038
        }
1,549✔
1039
        blen := shard.buf.Len()
1,816✔
1040
        reqid := fut.requestId
1,816✔
1041
        if err := pack(&shard.buf, shard.enc, reqid, req, streamId, conn.schemaResolver); err != nil {
1,816✔
1042
                shard.buf.Trunc(blen)
×
1043
                shard.bufmut.Unlock()
×
1044
                if f := conn.fetchFuture(reqid); f == fut {
×
1045
                        fut.SetError(err)
×
1046
                        conn.markDone(fut)
×
1047
                } else if f != nil {
×
1048
                        /* in theory, it is possible. In practice, you have
×
1049
                         * to have race condition that lasts hours */
×
1050
                        panic("Unknown future")
×
1051
                } else {
×
1052
                        fut.wait()
×
1053
                        if fut.err == nil {
×
1054
                                panic("Future removed from queue without error")
×
1055
                        }
1056
                        if _, ok := fut.err.(ClientError); ok {
×
1057
                                // packing error is more important than connection
×
1058
                                // error, because it is indication of programmer's
×
1059
                                // mistake.
×
1060
                                fut.SetError(err)
×
1061
                        }
×
1062
                }
1063
                return
×
1064
        }
1065
        shard.bufmut.Unlock()
1,816✔
1066

1,816✔
1067
        if firstWritten {
3,632✔
1068
                conn.dirtyShard <- shardn
1,816✔
1069
        }
1,816✔
1070

1071
        if req.Async() {
2,520✔
1072
                if fut = conn.fetchFuture(reqid); fut != nil {
1,402✔
1073
                        header := Header{
698✔
1074
                                RequestId: reqid,
698✔
1075
                                Error:     ErrorNo,
698✔
1076
                        }
698✔
1077
                        fut.SetResponse(header, nil)
698✔
1078
                        conn.markDone(fut)
698✔
1079
                }
698✔
1080
        }
1081
}
1082

1083
func (conn *Connection) markDone(fut *Future) {
1,816✔
1084
        if conn.rlimit != nil {
1,816✔
1085
                <-conn.rlimit
×
1086
        }
×
1087
        conn.decrementRequestCnt()
1,816✔
1088
}
1089

1090
func (conn *Connection) peekFuture(reqid uint32) (fut *Future) {
13✔
1091
        shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
13✔
1092
        pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1)
13✔
1093
        shard.rmut.Lock()
13✔
1094
        defer shard.rmut.Unlock()
13✔
1095

13✔
1096
        if conn.opts.Timeout > 0 {
26✔
1097
                if fut = conn.getFutureImp(reqid, true); fut != nil {
26✔
1098
                        pair := &shard.requests[pos]
13✔
1099
                        *pair.last = fut
13✔
1100
                        pair.last = &fut.next
13✔
1101
                        fut.timeout = time.Since(epoch) + conn.opts.Timeout
13✔
1102
                }
13✔
1103
        } else {
×
1104
                fut = conn.getFutureImp(reqid, false)
×
1105
        }
×
1106

1107
        return fut
13✔
1108
}
1109

1110
func (conn *Connection) fetchFuture(reqid uint32) (fut *Future) {
1,817✔
1111
        shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
1,817✔
1112
        shard.rmut.Lock()
1,817✔
1113
        fut = conn.getFutureImp(reqid, true)
1,817✔
1114
        shard.rmut.Unlock()
1,817✔
1115
        return fut
1,817✔
1116
}
1,817✔
1117

1118
func (conn *Connection) getFutureImp(reqid uint32, fetch bool) *Future {
1,830✔
1119
        shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
1,830✔
1120
        pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1)
1,830✔
1121
        // futures with even requests id belong to requests list with nil context
1,830✔
1122
        if reqid%2 == 0 {
3,656✔
1123
                return shard.requests[pos].findFuture(reqid, fetch)
1,826✔
1124
        } else {
1,830✔
1125
                return shard.requestsWithCtx[pos].findFuture(reqid, fetch)
4✔
1126
        }
4✔
1127
}
1128

1129
func (conn *Connection) timeouts() {
311✔
1130
        timeout := conn.opts.Timeout
311✔
1131
        t := time.NewTimer(timeout)
311✔
1132
        for {
622✔
1133
                var nowepoch time.Duration
311✔
1134
                select {
311✔
1135
                case <-conn.control:
310✔
1136
                        t.Stop()
310✔
1137
                        return
310✔
1138
                case <-t.C:
×
1139
                }
1140
                minNext := time.Since(epoch) + timeout
×
1141
                for i := range conn.shard {
×
1142
                        nowepoch = time.Since(epoch)
×
1143
                        shard := &conn.shard[i]
×
1144
                        for pos := range shard.requests {
×
1145
                                shard.rmut.Lock()
×
1146
                                pair := &shard.requests[pos]
×
1147
                                for pair.first != nil && pair.first.timeout < nowepoch {
×
1148
                                        shard.bufmut.Lock()
×
1149
                                        fut := pair.first
×
1150
                                        pair.first = fut.next
×
1151
                                        if fut.next == nil {
×
1152
                                                pair.last = &pair.first
×
1153
                                        } else {
×
1154
                                                fut.next = nil
×
1155
                                        }
×
1156
                                        fut.SetError(ClientError{
×
1157
                                                Code: ErrTimeouted,
×
1158
                                                Msg:  fmt.Sprintf("client timeout for request %d", fut.requestId),
×
1159
                                        })
×
1160
                                        conn.markDone(fut)
×
1161
                                        shard.bufmut.Unlock()
×
1162
                                }
1163
                                if pair.first != nil && pair.first.timeout < minNext {
×
1164
                                        minNext = pair.first.timeout
×
1165
                                }
×
1166
                                shard.rmut.Unlock()
×
1167
                        }
1168
                }
1169
                nowepoch = time.Since(epoch)
×
1170
                if nowepoch+time.Microsecond < minNext {
×
1171
                        t.Reset(minNext - nowepoch)
×
1172
                } else {
×
1173
                        t.Reset(time.Microsecond)
×
1174
                }
×
1175
        }
1176
}
1177

1178
func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
2,466✔
1179
        var length uint64
2,466✔
1180

2,466✔
1181
        if _, err = io.ReadFull(r, lenbuf); err != nil {
2,740✔
1182
                return
274✔
1183
        }
274✔
1184
        if lenbuf[0] != 0xce {
2,190✔
1185
                err = errors.New("wrong response header")
×
1186
                return
×
1187
        }
×
1188
        length = (uint64(lenbuf[1]) << 24) +
2,190✔
1189
                (uint64(lenbuf[2]) << 16) +
2,190✔
1190
                (uint64(lenbuf[3]) << 8) +
2,190✔
1191
                uint64(lenbuf[4])
2,190✔
1192

2,190✔
1193
        switch {
2,190✔
1194
        case length == 0:
×
1195
                err = errors.New("response should not be 0 length")
×
1196
                return
×
1197
        case length > math.MaxUint32:
×
1198
                err = errors.New("response is too big")
×
1199
                return
×
1200
        }
1201

1202
        response = make([]byte, length)
2,190✔
1203
        _, err = io.ReadFull(r, response)
2,190✔
1204

2,190✔
1205
        return
2,190✔
1206
}
1207

1208
func (conn *Connection) nextRequestId(context bool) (requestId uint32) {
1,833✔
1209
        if context {
1,837✔
1210
                return atomic.AddUint32(&conn.contextRequestId, 2)
4✔
1211
        } else {
1,833✔
1212
                return atomic.AddUint32(&conn.requestId, 2)
1,829✔
1213
        }
1,829✔
1214
}
1215

1216
// Do performs a request asynchronously on the connection.
1217
//
1218
// An error is returned if the request was formed incorrectly, or failed to
1219
// create the future.
1220
func (conn *Connection) Do(req Request) *Future {
1,795✔
1221
        if connectedReq, ok := req.(ConnectedRequest); ok {
1,801✔
1222
                if connectedReq.Conn() != conn {
7✔
1223
                        fut := NewFuture(req)
1✔
1224
                        fut.SetError(errUnknownRequest)
1✔
1225
                        return fut
1✔
1226
                }
1✔
1227
        }
1228
        return conn.send(req, ignoreStreamId)
1,794✔
1229
}
1230

1231
// ConfiguredTimeout returns a timeout from connection config.
1232
func (conn *Connection) ConfiguredTimeout() time.Duration {
×
1233
        return conn.opts.Timeout
×
1234
}
×
1235

1236
// SetSchema sets Schema for the connection.
1237
func (conn *Connection) SetSchema(s Schema) {
251✔
1238
        sCopy := s.copy()
251✔
1239
        spaceAndIndexNamesSupported :=
251✔
1240
                isFeatureInSlice(iproto.IPROTO_FEATURE_SPACE_AND_INDEX_NAMES,
251✔
1241
                        conn.serverProtocolInfo.Features)
251✔
1242

251✔
1243
        conn.mutex.Lock()
251✔
1244
        defer conn.mutex.Unlock()
251✔
1245
        conn.lockShards()
251✔
1246
        defer conn.unlockShards()
251✔
1247

251✔
1248
        conn.schemaResolver = &loadedSchemaResolver{
251✔
1249
                Schema:                      sCopy,
251✔
1250
                SpaceAndIndexNamesSupported: spaceAndIndexNamesSupported,
251✔
1251
        }
251✔
1252
}
251✔
1253

1254
// NewPrepared passes a sql statement to Tarantool for preparation synchronously.
1255
func (conn *Connection) NewPrepared(expr string) (*Prepared, error) {
1✔
1256
        req := NewPrepareRequest(expr)
1✔
1257
        resp, err := conn.Do(req).GetResponse()
1✔
1258
        if err != nil {
1✔
1259
                return nil, err
×
1260
        }
×
1261
        return NewPreparedFromResponse(conn, resp)
1✔
1262
}
1263

1264
// NewStream creates new Stream object for connection.
1265
//
1266
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
1267
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
1268
// Since 1.7.0
1269
func (conn *Connection) NewStream() (*Stream, error) {
6✔
1270
        next := atomic.AddUint64(&conn.lastStreamId, 1)
6✔
1271
        return &Stream{
6✔
1272
                Id:   next,
6✔
1273
                Conn: conn,
6✔
1274
        }, nil
6✔
1275
}
6✔
1276

1277
// watchState is the current state of the watcher. See the idea at p. 70, 105:
1278
// https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view
1279
type watchState struct {
1280
        // value is a current value.
1281
        value interface{}
1282
        // version is a current version of the value.
1283
        version uint
1284
        // ack true if the acknowledge is already sent.
1285
        ack bool
1286
        // cnt is a count of active watchers for the key.
1287
        cnt int
1288
        // changed is a channel for broadcast the value changes.
1289
        changed chan struct{}
1290
        // unready channel exists if a state is not ready to work (subscription
1291
        // or unsubscription in progress).
1292
        unready chan struct{}
1293
}
1294

1295
// initWatchEventVersion is an initial version until no events from Tarantool.
1296
const initWatchEventVersion uint = 0
1297

1298
// connWatcher is an internal implementation of the Watcher interface.
1299
type connWatcher struct {
1300
        unregister sync.Once
1301
        // done is closed when the watcher is unregistered, but the watcher
1302
        // goroutine is not yet finished.
1303
        done chan struct{}
1304
        // finished is closed when the watcher is unregistered and the watcher
1305
        // goroutine is finished.
1306
        finished chan struct{}
1307
}
1308

1309
// Unregister unregisters the connection watcher.
1310
func (w *connWatcher) Unregister() {
2,368✔
1311
        w.unregister.Do(func() {
3,737✔
1312
                close(w.done)
1,369✔
1313
        })
1,369✔
1314
        <-w.finished
2,368✔
1315
}
1316

1317
// subscribeWatchChannel returns an existing one or a new watch state channel
1318
// for the key. It also increases a counter of active watchers for the channel.
1319
func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error) {
1,371✔
1320
        var st chan watchState
1,371✔
1321

1,371✔
1322
        for st == nil {
2,742✔
1323
                if val, ok := conn.watchMap.Load(key); !ok {
1,687✔
1324
                        st = make(chan watchState, 1)
316✔
1325
                        state := watchState{
316✔
1326
                                value:   nil,
316✔
1327
                                version: initWatchEventVersion,
316✔
1328
                                ack:     false,
316✔
1329
                                cnt:     0,
316✔
1330
                                changed: nil,
316✔
1331
                                unready: make(chan struct{}),
316✔
1332
                        }
316✔
1333
                        st <- state
316✔
1334

316✔
1335
                        if val, loaded := conn.watchMap.LoadOrStore(key, st); !loaded {
631✔
1336
                                if _, err := conn.Do(newWatchRequest(key)).Get(); err != nil {
315✔
1337
                                        conn.watchMap.Delete(key)
×
1338
                                        close(state.unready)
×
1339
                                        return nil, err
×
1340
                                }
×
1341
                                // It is a successful subsctiption to a watch events by itself.
1342
                                state = <-st
315✔
1343
                                state.cnt = 1
315✔
1344
                                close(state.unready)
315✔
1345
                                state.unready = nil
315✔
1346
                                st <- state
315✔
1347
                                continue
315✔
1348
                        } else {
1✔
1349
                                close(state.unready)
1✔
1350
                                close(st)
1✔
1351
                                st = val.(chan watchState)
1✔
1352
                        }
1✔
1353
                } else {
1,055✔
1354
                        st = val.(chan watchState)
1,055✔
1355
                }
1,055✔
1356

1357
                // It is an existing channel created outside. It may be in the
1358
                // unready state.
1359
                state := <-st
1,056✔
1360
                if state.unready == nil {
2,112✔
1361
                        state.cnt += 1
1,056✔
1362
                }
1,056✔
1363
                st <- state
1,056✔
1364

1,056✔
1365
                if state.unready != nil {
1,056✔
1366
                        // Wait for an update and retry.
×
1367
                        <-state.unready
×
1368
                        st = nil
×
1369
                }
×
1370
        }
1371

1372
        return st, nil
1,371✔
1373
}
1374

1375
func isFeatureInSlice(expected iproto.Feature, actualSlice []iproto.Feature) bool {
1,939✔
1376
        for _, actual := range actualSlice {
10,773✔
1377
                if expected == actual {
10,763✔
1378
                        return true
1,929✔
1379
                }
1,929✔
1380
        }
1381
        return false
10✔
1382
}
1383

1384
// NewWatcher creates a new Watcher object for the connection.
1385
//
1386
// Server must support IPROTO_FEATURE_WATCHERS to use watchers.
1387
//
1388
// After watcher creation, the watcher callback is invoked for the first time.
1389
// In this case, the callback is triggered whether or not the key has already
1390
// been broadcast. All subsequent invocations are triggered with
1391
// box.broadcast() called on the remote host. If a watcher is subscribed for a
1392
// key that has not been broadcast yet, the callback is triggered only once,
1393
// after the registration of the watcher.
1394
//
1395
// The watcher callbacks are always invoked in a separate goroutine. A watcher
1396
// callback is never executed in parallel with itself, but they can be executed
1397
// in parallel to other watchers.
1398
//
1399
// If the key is updated while the watcher callback is running, the callback
1400
// will be invoked again with the latest value as soon as it returns.
1401
//
1402
// Watchers survive reconnection. All registered watchers are automatically
1403
// resubscribed when the connection is reestablished.
1404
//
1405
// Keep in mind that garbage collection of a watcher handle doesn’t lead to the
1406
// watcher’s destruction. In this case, the watcher remains registered. You
1407
// need to call Unregister() directly.
1408
//
1409
// Unregister() guarantees that there will be no the watcher's callback calls
1410
// after it, but Unregister() call from the callback leads to a deadlock.
1411
//
1412
// See:
1413
// https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_events/#box-watchers
1414
//
1415
// Since 1.10.0
1416
func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, error) {
1,064✔
1417
        // We need to check the feature because the IPROTO_WATCH request is
1,064✔
1418
        // asynchronous. We do not expect any response from a Tarantool instance
1,064✔
1419
        // That's why we can't just check the Tarantool response for an unsupported
1,064✔
1420
        // request error.
1,064✔
1421
        if !isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
1,064✔
1422
                conn.c.ProtocolInfo().Features) {
1,064✔
1423
                err := fmt.Errorf("the feature %s must be supported by connection "+
×
1424
                        "to create a watcher", iproto.IPROTO_FEATURE_WATCHERS)
×
1425
                return nil, err
×
1426
        }
×
1427

1428
        return conn.newWatcherImpl(key, callback)
1,064✔
1429
}
1430

1431
func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watcher, error) {
1,371✔
1432
        st, err := subscribeWatchChannel(conn, key)
1,371✔
1433
        if err != nil {
1,371✔
1434
                return nil, err
×
1435
        }
×
1436

1437
        // Start the watcher goroutine.
1438
        done := make(chan struct{})
1,371✔
1439
        finished := make(chan struct{})
1,371✔
1440

1,371✔
1441
        go func() {
2,742✔
1442
                version := initWatchEventVersion
1,371✔
1443
                for {
3,216✔
1444
                        state := <-st
1,845✔
1445
                        if state.changed == nil {
2,548✔
1446
                                state.changed = make(chan struct{})
703✔
1447
                        }
703✔
1448
                        st <- state
1,845✔
1449

1,845✔
1450
                        if state.version != version {
3,344✔
1451
                                callback(WatchEvent{
1,499✔
1452
                                        Conn:  conn,
1,499✔
1453
                                        Key:   key,
1,499✔
1454
                                        Value: state.value,
1,499✔
1455
                                })
1,499✔
1456
                                version = state.version
1,499✔
1457

1,499✔
1458
                                // Do we need to acknowledge the notification?
1,499✔
1459
                                state = <-st
1,499✔
1460
                                sendAck := !state.ack && version == state.version
1,499✔
1461
                                if sendAck {
1,887✔
1462
                                        state.ack = true
388✔
1463
                                }
388✔
1464
                                st <- state
1,499✔
1465

1,499✔
1466
                                if sendAck {
1,887✔
1467
                                        // We expect a reconnect and re-subscribe if it fails to
388✔
1468
                                        // send the watch request. So it looks ok do not check a
388✔
1469
                                        // result. But we need to make sure that the re-watch
388✔
1470
                                        // request will not be finished by a small per-request
388✔
1471
                                        // timeout.
388✔
1472
                                        req := newWatchRequest(key).Context(context.Background())
388✔
1473
                                        conn.Do(req).Get()
388✔
1474
                                }
388✔
1475
                        }
1476

1477
                        select {
1,845✔
1478
                        case <-done:
1,369✔
1479
                                state := <-st
1,369✔
1480
                                state.cnt -= 1
1,369✔
1481
                                if state.cnt == 0 {
1,682✔
1482
                                        state.unready = make(chan struct{})
313✔
1483
                                }
313✔
1484
                                st <- state
1,369✔
1485

1,369✔
1486
                                if state.cnt == 0 {
1,682✔
1487
                                        // The last one sends IPROTO_UNWATCH.
313✔
1488
                                        if !conn.ClosedNow() {
321✔
1489
                                                // conn.ClosedNow() check is a workaround for calling
8✔
1490
                                                // Unregister from connectionClose().
8✔
1491
                                                //
8✔
1492
                                                // We need to make sure that the unwatch request will
8✔
1493
                                                // not be finished by a small per-request timeout to
8✔
1494
                                                // avoid lost of the request.
8✔
1495
                                                req := newUnwatchRequest(key).Context(context.Background())
8✔
1496
                                                conn.Do(req).Get()
8✔
1497
                                        }
8✔
1498
                                        conn.watchMap.Delete(key)
313✔
1499
                                        close(state.unready)
313✔
1500
                                }
1501

1502
                                close(finished)
1,369✔
1503
                                return
1,369✔
1504
                        case <-state.changed:
474✔
1505
                        }
1506
                }
1507
        }()
1508

1509
        return &connWatcher{
1,371✔
1510
                done:     done,
1,371✔
1511
                finished: finished,
1,371✔
1512
        }, nil
1,371✔
1513
}
1514

1515
// ProtocolInfo returns protocol version and protocol features
1516
// supported by connected Tarantool server. Beware that values might be
1517
// outdated if connection is in a disconnected state.
1518
// Since 2.0.0
1519
func (conn *Connection) ProtocolInfo() ProtocolInfo {
5✔
1520
        return conn.serverProtocolInfo.Clone()
5✔
1521
}
5✔
1522

1523
func shutdownEventCallback(event WatchEvent) {
369✔
1524
        // Receives "true" on server shutdown.
369✔
1525
        // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
369✔
1526
        // step 2.
369✔
1527
        val, ok := event.Value.(bool)
369✔
1528
        if ok && val {
428✔
1529
                go event.Conn.shutdown(false)
59✔
1530
        }
59✔
1531
}
1532

1533
func (conn *Connection) shutdown(forever bool) error {
61✔
1534
        // Forbid state changes.
61✔
1535
        conn.mutex.Lock()
61✔
1536
        defer conn.mutex.Unlock()
61✔
1537

61✔
1538
        if !atomic.CompareAndSwapUint32(&conn.state, connConnected, connShutdown) {
65✔
1539
                if forever {
4✔
1540
                        err := ClientError{ErrConnectionClosed, "connection closed by client"}
×
1541
                        return conn.closeConnection(err, true)
×
1542
                }
×
1543
                return nil
4✔
1544
        }
1545

1546
        if forever {
59✔
1547
                // We don't want to reconnect any more.
2✔
1548
                conn.opts.Reconnect = 0
2✔
1549
                conn.opts.MaxReconnects = 0
2✔
1550
        }
2✔
1551

1552
        conn.cond.Broadcast()
57✔
1553
        conn.notify(Shutdown)
57✔
1554

57✔
1555
        c := conn.c
57✔
1556
        for {
170✔
1557
                if (atomic.LoadUint32(&conn.state) != connShutdown) || (c != conn.c) {
116✔
1558
                        return nil
3✔
1559
                }
3✔
1560
                if atomic.LoadInt64(&conn.requestCnt) == 0 {
164✔
1561
                        break
54✔
1562
                }
1563
                // Use cond var on conn.mutex since request execution may
1564
                // call reconnect(). It is ok if state changes as part of
1565
                // reconnect since Tarantool server won't allow to reconnect
1566
                // in the middle of shutting down.
1567
                conn.cond.Wait()
56✔
1568
        }
1569

1570
        if forever {
55✔
1571
                err := ClientError{ErrConnectionClosed, "connection closed by client"}
1✔
1572
                return conn.closeConnection(err, true)
1✔
1573
        } else {
54✔
1574
                // Start to reconnect based on common rules, same as in net.box.
53✔
1575
                // Reconnect also closes the connection: server waits until all
53✔
1576
                // subscribed connections are terminated.
53✔
1577
                // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
53✔
1578
                // step 3.
53✔
1579
                conn.reconnectImpl(ClientError{
53✔
1580
                        ErrConnectionClosed,
53✔
1581
                        "connection closed after server shutdown",
53✔
1582
                }, conn.c)
53✔
1583
                return nil
53✔
1584
        }
53✔
1585
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc