• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / go-tarantool / 22105359978

17 Feb 2026 03:51PM UTC coverage: 73.777% (-0.4%) from 74.209%
22105359978

Pull #518

github

babyTsakhes
test correct with message and return deleted code
Pull Request #518: conn: change design of tarantool.Logger

185 of 272 new or added lines in 3 files covered. (68.01%)

28 existing lines in 2 files now uncovered.

3123 of 4233 relevant lines covered (73.78%)

9475.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.56
/connection.go
1
// Package with implementation of methods and structures for work with
2
// Tarantool instance.
3
package tarantool
4

5
import (
6
        "context"
7
        "encoding/binary"
8
        "errors"
9
        "fmt"
10
        "io"
11
        "log/slog"
12
        "math"
13
        "net"
14
        "runtime"
15
        "sync"
16
        "sync/atomic"
17
        "time"
18

19
        "github.com/tarantool/go-iproto"
20
        "github.com/vmihailenco/msgpack/v5"
21
)
22

23
const requestsMap = 128
24
const ignoreStreamId = 0
25
const (
26
        connDisconnected = 0
27
        connConnected    = 1
28
        connShutdown     = 2
29
        connClosed       = 3
30
)
31

32
const shutdownEventKey = "box.shutdown"
33

34
type ConnEventKind int
35

36
var (
37
        errUnknownRequest = errors.New("the passed connected request doesn't belong " +
38
                "to the current connection or connection pool")
39
)
40

41
const (
42
        // Connected signals that connection is established or reestablished.
43
        Connected ConnEventKind = iota + 1
44
        // Disconnected signals that connection is broken.
45
        Disconnected
46
        // ReconnectFailed signals that attempt to reconnect has failed.
47
        ReconnectFailed
48
        // Shutdown signals that shutdown callback is processing.
49
        Shutdown
50
        // Either reconnect attempts exhausted, or explicit Close is called.
51
        Closed
52
)
53

54
// ConnEvent is sent throw Notify channel specified in Opts.
55
type ConnEvent struct {
56
        Conn *Connection
57
        Kind ConnEventKind
58
        When time.Time
59
}
60

61
// A raw watch event.
62
type connWatchEvent struct {
63
        key   string
64
        value interface{}
65
}
66

67
var epoch = time.Now()
68

69
// Connection is a handle with a single connection to a Tarantool instance.
70
//
71
// It is created and configured with Connect function, and could not be
72
// reconfigured later.
73
//
74
// Connection could be in three possible states:
75
//
76
// - In "Connected" state it sends queries to Tarantool.
77
//
78
// - In "Disconnected" state it rejects queries with ClientError{Code:
79
// ErrConnectionNotReady}
80
//
81
// - In "Shutdown" state it rejects queries with ClientError{Code:
82
// ErrConnectionShutdown}. The state indicates that a graceful shutdown
83
// in progress. The connection waits for all active requests to
84
// complete.
85
//
86
// - In "Closed" state it rejects queries with ClientError{Code:
87
// ErrConnectionClosed}. Connection could become "Closed" when
88
// Connection.Close() method called, or when Tarantool disconnected and
89
// Reconnect pause is not specified or MaxReconnects is specified and
90
// MaxReconnect reconnect attempts already performed.
91
//
92
// You may perform data manipulation operation by calling its methods:
93
// Call*, Insert*, Replace*, Update*, Upsert*, Call*, Eval*.
94
//
95
// In any method that accepts space you my pass either space number or space
96
// name (in this case it will be looked up in schema). Same is true for index.
97
//
98
// ATTENTION: tuple, key, ops and args arguments for any method should be
99
// and array or should serialize to msgpack array.
100
//
101
// ATTENTION: result argument for *Typed methods should deserialize from
102
// msgpack array, cause Tarantool always returns result as an array.
103
// For all space related methods and Call16* (but not Call17*) methods Tarantool
104
// always returns array of array (array of tuples for space related methods).
105
// For Eval* and Call* Tarantool always returns array, but does not forces
106
// array of arrays.
107
//
108
// If connected to Tarantool 2.10 or newer, connection supports server graceful
109
// shutdown. In this case, server will wait until all client requests will be
110
// finished and client disconnects before going down (server also may go down
111
// by timeout). Client reconnect will happen if connection options enable
112
// reconnect. Beware that graceful shutdown event initialization is asynchronous.
113
//
114
// More on graceful shutdown:
115
// https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
116
type Connection struct {
117
        addr   net.Addr
118
        dialer Dialer
119
        c      Conn
120
        mutex  sync.Mutex
121
        cond   *sync.Cond
122
        // schemaResolver contains a SchemaResolver implementation.
123
        schemaResolver SchemaResolver
124
        // requestId contains the last request ID for requests with nil context.
125
        requestId uint32
126
        // contextRequestId contains the last request ID for requests with context.
127
        contextRequestId uint32
128
        // Greeting contains first message sent by Tarantool.
129
        Greeting *Greeting
130

131
        shard      []connShard
132
        dirtyShard chan uint32
133

134
        control chan struct{}
135
        rlimit  chan struct{}
136
        opts    Opts
137
        state   uint32
138
        dec     *msgpack.Decoder
139
        lenbuf  [packetLengthBytes]byte
140

141
        lastStreamId uint64
142

143
        serverProtocolInfo ProtocolInfo
144
        // watchMap is a map of key -> chan watchState.
145
        watchMap sync.Map
146

147
        // shutdownWatcher is the "box.shutdown" event watcher.
148
        shutdownWatcher Watcher
149
        // requestCnt is a counter of active requests.
150
        requestCnt     int64
151
        reconnectCount uint32 // count of reconnect
152
}
153

154
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
155

156
type futureList struct {
157
        first *Future
158
        last  **Future
159
}
160

161
func (list *futureList) findFuture(reqid uint32, fetch bool) *Future {
1,850✔
162
        root := &list.first
1,850✔
163
        for {
3,700✔
164
                fut := *root
1,850✔
165
                if fut == nil {
1,851✔
166
                        return nil
1✔
167
                }
1✔
168
                if fut.requestId == reqid {
3,698✔
169
                        if fetch {
3,698✔
170
                                *root = fut.next
1,849✔
171
                                if fut.next == nil {
3,698✔
172
                                        list.last = root
1,849✔
173
                                } else {
1,849✔
174
                                        fut.next = nil
×
175
                                }
×
176
                        }
177
                        return fut
1,849✔
178
                }
179
                root = &fut.next
×
180
        }
181
}
182

183
func (list *futureList) addFuture(fut *Future) {
1,853✔
184
        *list.last = fut
1,853✔
185
        list.last = &fut.next
1,853✔
186
}
1,853✔
187

188
func (list *futureList) clear(err error, conn *Connection) {
2,650,112✔
189
        fut := list.first
2,650,112✔
190
        list.first = nil
2,650,112✔
191
        list.last = &list.first
2,650,112✔
192
        for fut != nil {
2,650,116✔
193
                fut.SetError(err)
4✔
194
                conn.markDone(fut)
4✔
195
                fut, fut.next = fut.next, nil
4✔
196
        }
4✔
197
}
198

199
type connShard struct {
200
        rmut            sync.Mutex
201
        requests        [requestsMap]futureList
202
        requestsWithCtx [requestsMap]futureList
203
        bufmut          sync.Mutex
204
        buf             smallWBuf
205
        enc             *msgpack.Encoder
206
}
207

208
// RLimitActions is an enumeration type for an action to do when a rate limit
209
// is reached.
210
type RLimitAction int
211

212
const (
213
        // RLimitDrop immediately aborts the request.
214
        RLimitDrop RLimitAction = iota
215
        // RLimitWait waits during timeout period for some request to be answered.
216
        // If no request answered during timeout period, this request is aborted.
217
        // If no timeout period is set, it will wait forever.
218
        RLimitWait
219
)
220

221
// Opts is a way to configure Connection
222
type Opts struct {
223
        // Timeout for response to a particular request. The timeout is reset when
224
        // push messages are received. If Timeout is zero, any request can be
225
        // blocked infinitely.
226
        // Also used to setup net.TCPConn.Set(Read|Write)Deadline.
227
        //
228
        // Pay attention, when using contexts with request objects,
229
        // the timeout option for Connection does not affect the lifetime
230
        // of the request. For those purposes use context.WithTimeout() as
231
        // the root context.
232
        Timeout time.Duration
233
        // Timeout between reconnect attempts. If Reconnect is zero, no
234
        // reconnect attempts will be made.
235
        // If specified, then when Tarantool is not reachable or disconnected,
236
        // new connect attempt is performed after pause.
237
        // By default, no reconnection attempts are performed,
238
        // so once disconnected, connection becomes Closed.
239
        Reconnect time.Duration
240
        // Maximum number of reconnect failures; after that we give it up to
241
        // on. If MaxReconnects is zero, the client will try to reconnect
242
        // endlessly.
243
        // After MaxReconnects attempts Connection becomes closed.
244
        MaxReconnects uint
245
        // RateLimit limits number of 'in-fly' request, i.e. already put into
246
        // requests queue, but not yet answered by server or timeouted.
247
        // It is disabled by default.
248
        // See RLimitAction for possible actions when RateLimit.reached.
249
        RateLimit uint
250
        // RLimitAction tells what to do when RateLimit is reached.
251
        // It is required if RateLimit is specified.
252
        RLimitAction RLimitAction
253
        // Concurrency is amount of separate mutexes for request
254
        // queues and buffers inside of connection.
255
        // It is rounded up to nearest power of 2.
256
        // By default it is runtime.GOMAXPROCS(-1) * 4
257
        Concurrency uint32
258
        // SkipSchema disables schema loading. Without disabling schema loading,
259
        // there is no way to create Connection for currently not accessible Tarantool.
260
        SkipSchema bool
261
        // Notify is a channel which receives notifications about Connection status
262
        // changes.
263
        Notify chan<- ConnEvent
264
        // Handle is user specified value, that could be retrivied with
265
        // Handle() method.
266
        Handle interface{}
267
        // Logger is user specified logger used for error messages.
268
        Logger Logger
269
}
270

271
// Connect creates and configures a new Connection.
272
func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, err error) {
348✔
273
        conn = &Connection{
348✔
274
                dialer:           dialer,
348✔
275
                requestId:        0,
348✔
276
                contextRequestId: 1,
348✔
277
                Greeting:         &Greeting{},
348✔
278
                control:          make(chan struct{}),
348✔
279
                opts:             opts,
348✔
280
                dec:              msgpack.NewDecoder(&smallBuf{}),
348✔
281
        }
348✔
282
        maxprocs := uint32(runtime.GOMAXPROCS(-1))
348✔
283
        if conn.opts.Concurrency == 0 || conn.opts.Concurrency > maxprocs*128 {
694✔
284
                conn.opts.Concurrency = maxprocs * 4
346✔
285
        }
346✔
286
        if c := conn.opts.Concurrency; c&(c-1) != 0 {
348✔
287
                for i := uint(1); i < 32; i *= 2 {
×
288
                        c |= c >> i
×
289
                }
×
290
                conn.opts.Concurrency = c + 1
×
291
        }
292
        conn.dirtyShard = make(chan uint32, conn.opts.Concurrency*2)
348✔
293
        conn.shard = make([]connShard, conn.opts.Concurrency)
348✔
294
        for i := range conn.shard {
5,948✔
295
                shard := &conn.shard[i]
5,600✔
296
                requestsLists := []*[requestsMap]futureList{&shard.requests, &shard.requestsWithCtx}
5,600✔
297
                for _, requests := range requestsLists {
16,800✔
298
                        for j := range requests {
1,444,800✔
299
                                requests[j].last = &requests[j].first
1,433,600✔
300
                        }
1,433,600✔
301
                }
302
        }
303

304
        if conn.opts.RateLimit > 0 {
348✔
305
                conn.rlimit = make(chan struct{}, conn.opts.RateLimit)
×
306
                if conn.opts.RLimitAction != RLimitDrop && conn.opts.RLimitAction != RLimitWait {
×
307
                        return nil, errors.New("RLimitAction should be specified to RLimitDone nor RLimitWait")
×
308
                }
×
309
        }
310

311
        if conn.opts.Logger == nil {
695✔
312
                conn.opts.Logger = NewSlogLogger(slog.Default())
347✔
313
        }
347✔
314

315
        conn.cond = sync.NewCond(&conn.mutex)
348✔
316

348✔
317
        if conn.opts.Reconnect > 0 {
463✔
318
                // We don't need these mutex.Lock()/mutex.Unlock() here, but
115✔
319
                // runReconnects() expects mutex.Lock() to be set, so it's
115✔
320
                // easier to add them instead of reworking runReconnects().
115✔
321
                conn.mutex.Lock()
115✔
322
                err = conn.runReconnects(ctx)
115✔
323
                conn.mutex.Unlock()
115✔
324
                if err != nil {
118✔
325
                        return nil, err
3✔
326
                }
3✔
327
        } else {
233✔
328
                if err = conn.connect(ctx); err != nil {
256✔
329
                        return nil, err
23✔
330
                }
23✔
331
        }
332

333
        go conn.pinger()
322✔
334
        if conn.opts.Timeout > 0 {
643✔
335
                go conn.timeouts()
321✔
336
        }
321✔
337

338
        // TODO: reload schema after reconnect.
339
        if !conn.opts.SkipSchema {
622✔
340
                schema, err := GetSchema(conn)
300✔
341
                if err != nil {
335✔
342
                        conn.mutex.Lock()
35✔
343
                        defer conn.mutex.Unlock()
35✔
344
                        _ = conn.closeConnection(err, true)
35✔
345
                        return nil, err
35✔
346
                }
35✔
347
                conn.SetSchema(schema)
265✔
348
        }
349

350
        return conn, err
287✔
351
}
352

353
func (conn *Connection) logEvent(event LogEvent) {
787✔
354
        if conn.opts.Logger != nil {
1,574✔
355
                conn.opts.Logger.Report(event, conn)
787✔
356
        }
787✔
357
}
358

359
func (conn *Connection) stateToString() string {
783✔
360
        state := atomic.LoadUint32(&conn.state)
783✔
361
        switch state {
783✔
362
        case connDisconnected:
140✔
363
                return "disconnected"
140✔
364
        case connConnected:
324✔
365
                return "connected"
324✔
NEW
366
        case connShutdown:
×
NEW
367
                return "shutdown"
×
368
        case connClosed:
319✔
369
                return "closed"
319✔
NEW
370
        default:
×
NEW
371
                return "unknown"
×
372
        }
373
}
374

375
// ConnectedNow reports if connection is established at the moment.
376
func (conn *Connection) ConnectedNow() bool {
6✔
377
        return atomic.LoadUint32(&conn.state) == connConnected
6✔
378
}
6✔
379

380
// ClosedNow reports if connection is closed by user or after reconnect.
381
func (conn *Connection) ClosedNow() bool {
326✔
382
        return atomic.LoadUint32(&conn.state) == connClosed
326✔
383
}
326✔
384

385
// Close closes Connection.
386
// After this method called, there is no way to reopen this Connection.
387
func (conn *Connection) Close() error {
288✔
388
        err := ClientError{ErrConnectionClosed, "connection closed by client"}
288✔
389
        conn.mutex.Lock()
288✔
390
        defer conn.mutex.Unlock()
288✔
391
        return conn.closeConnection(err, true)
288✔
392
}
288✔
393

394
// CloseGraceful closes Connection gracefully. It waits for all requests to
395
// complete.
396
// After this method called, there is no way to reopen this Connection.
397
func (conn *Connection) CloseGraceful() error {
2✔
398
        return conn.shutdown(true)
2✔
399
}
2✔
400

401
// Addr returns a configured address of Tarantool socket.
402
func (conn *Connection) Addr() net.Addr {
1✔
403
        return conn.addr
1✔
404
}
1✔
405

406
// Handle returns a user-specified handle from Opts.
407
func (conn *Connection) Handle() interface{} {
×
408
        return conn.opts.Handle
×
409
}
×
410

411
func (conn *Connection) cancelFuture(fut *Future, err error) {
×
412
        if fut = conn.fetchFuture(fut.requestId); fut != nil {
×
413
                fut.SetError(err)
×
414
                conn.markDone(fut)
×
415
        }
×
416
}
417

418
func (conn *Connection) dial(ctx context.Context) error {
431✔
419
        opts := conn.opts
431✔
420

431✔
421
        var c Conn
431✔
422
        c, err := conn.dialer.Dial(ctx, DialOpts{
431✔
423
                IoTimeout: opts.Timeout,
431✔
424
        })
431✔
425
        if err != nil {
537✔
426
                return err
106✔
427
        }
106✔
428

429
        conn.addr = c.Addr()
325✔
430
        connGreeting := c.Greeting()
325✔
431
        conn.Greeting.Version = connGreeting.Version
325✔
432
        conn.Greeting.Salt = connGreeting.Salt
325✔
433
        conn.serverProtocolInfo = c.ProtocolInfo()
325✔
434

325✔
435
        if conn.schemaResolver == nil {
647✔
436
                namesSupported := isFeatureInSlice(iproto.IPROTO_FEATURE_SPACE_AND_INDEX_NAMES,
322✔
437
                        conn.serverProtocolInfo.Features)
322✔
438

322✔
439
                conn.schemaResolver = &noSchemaResolver{
322✔
440
                        SpaceAndIndexNamesSupported: namesSupported,
322✔
441
                }
322✔
442
        }
322✔
443

444
        // Watchers.
445
        conn.watchMap.Range(func(key, value interface{}) bool {
329✔
446
                st := value.(chan watchState)
4✔
447
                state := <-st
4✔
448
                if state.unready != nil {
4✔
449
                        st <- state
×
450
                        return true
×
451
                }
×
452

453
                req := newWatchRequest(key.(string))
4✔
454
                if err = writeRequest(ctx, c, req); err != nil {
4✔
455
                        st <- state
×
456
                        return false
×
457
                }
×
458
                state.ack = true
4✔
459

4✔
460
                st <- state
4✔
461
                return true
4✔
462
        })
463

464
        if err != nil {
325✔
465
                c.Close()
×
466
                return fmt.Errorf("unable to register watch: %w", err)
×
467
        }
×
468

469
        // Only if connected and fully initialized.
470
        conn.lockShards()
325✔
471
        conn.c = c
325✔
472
        atomic.StoreUint32(&conn.state, connConnected)
325✔
473
        conn.cond.Broadcast()
325✔
474
        conn.unlockShards()
325✔
475
        go conn.writer(c, c)
325✔
476
        go conn.reader(c, c)
325✔
477

325✔
478
        // Subscribe shutdown event to process graceful shutdown.
325✔
479
        if conn.shutdownWatcher == nil &&
325✔
480
                isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
325✔
481
                        conn.serverProtocolInfo.Features) {
642✔
482
                watcher, werr := conn.newWatcherImpl(shutdownEventKey, shutdownEventCallback)
317✔
483
                if werr != nil {
317✔
484
                        return werr
×
485
                }
×
486
                conn.shutdownWatcher = watcher
317✔
487
        }
488

489
        return nil
325✔
490
}
491

492
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
493
        req Request, streamId uint64, res SchemaResolver) (err error) {
2,522✔
494
        const uint32Code = 0xce
2,522✔
495
        const uint64Code = 0xcf
2,522✔
496
        const streamBytesLenUint64 = 10
2,522✔
497
        const streamBytesLenUint32 = 6
2,522✔
498

2,522✔
499
        hl := h.Len()
2,522✔
500

2,522✔
501
        var streamBytesLen = 0
2,522✔
502
        var streamBytes [streamBytesLenUint64]byte
2,522✔
503
        hMapLen := byte(0x82) // 2 element map.
2,522✔
504
        if streamId != ignoreStreamId {
2,569✔
505
                hMapLen = byte(0x83) // 3 element map.
47✔
506
                streamBytes[0] = byte(iproto.IPROTO_STREAM_ID)
47✔
507
                if streamId > math.MaxUint32 {
49✔
508
                        streamBytesLen = streamBytesLenUint64
2✔
509
                        streamBytes[1] = uint64Code
2✔
510
                        binary.BigEndian.PutUint64(streamBytes[2:], streamId)
2✔
511
                } else {
47✔
512
                        streamBytesLen = streamBytesLenUint32
45✔
513
                        streamBytes[1] = uint32Code
45✔
514
                        binary.BigEndian.PutUint32(streamBytes[2:], uint32(streamId))
45✔
515
                }
45✔
516
        }
517

518
        hBytes := append([]byte{
2,522✔
519
                uint32Code, 0, 0, 0, 0, // Length.
2,522✔
520
                hMapLen,
2,522✔
521
                byte(iproto.IPROTO_REQUEST_TYPE), byte(req.Type()), // Request type.
2,522✔
522
                byte(iproto.IPROTO_SYNC), uint32Code,
2,522✔
523
                byte(reqid >> 24), byte(reqid >> 16),
2,522✔
524
                byte(reqid >> 8), byte(reqid),
2,522✔
525
        }, streamBytes[:streamBytesLen]...)
2,522✔
526

2,522✔
527
        if _, err = h.Write(hBytes); err != nil {
2,522✔
528
                return
×
529
        }
×
530

531
        if err = req.Body(res, enc); err != nil {
2,522✔
532
                return
×
533
        }
×
534

535
        l := uint32(h.Len() - 5 - hl)
2,522✔
536
        h.b[hl+1] = byte(l >> 24)
2,522✔
537
        h.b[hl+2] = byte(l >> 16)
2,522✔
538
        h.b[hl+3] = byte(l >> 8)
2,522✔
539
        h.b[hl+4] = byte(l)
2,522✔
540

2,522✔
541
        return
2,522✔
542
}
543

544
func (conn *Connection) connect(ctx context.Context) error {
486✔
545
        var err error
486✔
546
        if conn.c == nil && conn.state == connDisconnected {
917✔
547
                if err = conn.dial(ctx); err == nil {
756✔
548
                        // Atomically increase the reconnect count
325✔
549
                        // (use atomic operations for thread safety)
325✔
550
                        reconnects := atomic.AddUint32(&conn.reconnectCount, 1) - 1
325✔
551

325✔
552
                        conn.logEvent(ConnectedEvent{
325✔
553
                                baseEvent:  newBaseEvent(conn.addr),
325✔
554
                                Reconnects: uint(reconnects),
325✔
555
                        })
325✔
556
                        conn.notify(Connected)
325✔
557
                        return nil
325✔
558
                }
325✔
559
        }
560
        if conn.state == connClosed {
216✔
561
                err = ClientError{ErrConnectionClosed, "using closed connection"}
55✔
562
        }
55✔
563
        return err
161✔
564
}
565

566
func (conn *Connection) closeConnection(neterr error, forever bool) (err error) {
644✔
567
        conn.lockShards()
644✔
568
        defer conn.unlockShards()
644✔
569

644✔
570
        if forever {
1,230✔
571
                if conn.state != connClosed {
906✔
572
                        close(conn.control)
320✔
573
                        atomic.StoreUint32(&conn.state, connClosed)
320✔
574
                        conn.cond.Broadcast()
320✔
575
                        // Free the resources.
320✔
576
                        if conn.shutdownWatcher != nil {
635✔
577
                                go conn.shutdownWatcher.Unregister()
315✔
578
                                conn.shutdownWatcher = nil
315✔
579
                        }
315✔
580
                        conn.logEvent(ClosedEvent{
320✔
581
                                baseEvent: newBaseEvent(conn.addr),
320✔
582
                        })
320✔
583
                        conn.notify(Closed)
320✔
584
                }
585
        } else {
58✔
586
                atomic.StoreUint32(&conn.state, connDisconnected)
58✔
587
                conn.cond.Broadcast()
58✔
588
                conn.logEvent(DisconnectedEvent{
58✔
589
                        baseEvent: newBaseEvent(conn.addr),
58✔
590
                        Reason:    neterr,
58✔
591
                })
58✔
592
                conn.notify(Disconnected)
58✔
593
        }
58✔
594
        if conn.c != nil {
967✔
595
                err = conn.c.Close()
323✔
596
                conn.c = nil
323✔
597
        }
323✔
598
        for i := range conn.shard {
10,996✔
599
                conn.shard[i].buf.Reset()
10,352✔
600
                requestsLists := []*[requestsMap]futureList{
10,352✔
601
                        &conn.shard[i].requests,
10,352✔
602
                        &conn.shard[i].requestsWithCtx,
10,352✔
603
                }
10,352✔
604
                for _, requests := range requestsLists {
31,056✔
605
                        for pos := range requests {
2,670,816✔
606
                                requests[pos].clear(neterr, conn)
2,650,112✔
607
                        }
2,650,112✔
608
                }
609
        }
610
        return
644✔
611
}
612

613
func (conn *Connection) getDialTimeout() time.Duration {
173✔
614
        dialTimeout := conn.opts.Reconnect / 2
173✔
615
        if dialTimeout == 0 {
173✔
616
                dialTimeout = 500 * time.Millisecond
×
617
        } else if dialTimeout > 5*time.Second {
173✔
618
                dialTimeout = 5 * time.Second
×
619
        }
×
620
        return dialTimeout
173✔
621
}
622

623
func (conn *Connection) runReconnects(ctx context.Context) error {
173✔
624
        dialTimeout := conn.getDialTimeout()
173✔
625
        var reconnects uint
173✔
626
        var err error
173✔
627

173✔
628
        t := time.NewTicker(conn.opts.Reconnect)
173✔
629
        defer t.Stop()
173✔
630
        for conn.opts.MaxReconnects == 0 || reconnects <= conn.opts.MaxReconnects {
426✔
631
                localCtx, cancel := context.WithTimeout(ctx, dialTimeout)
253✔
632
                err = conn.connect(localCtx)
253✔
633
                cancel()
253✔
634

253✔
635
                if err != nil {
391✔
636
                        // The error will most likely be the one that Dialer
138✔
637
                        // returns to us due to the context being cancelled.
138✔
638
                        // Although this is not guaranteed. For example,
138✔
639
                        // if the dialer may throw another error before checking
138✔
640
                        // the context, and the context has already been
138✔
641
                        // canceled. Or the context was not canceled after
138✔
642
                        // the error was thrown, but before the context was
138✔
643
                        // checked here.
138✔
644
                        if ctx.Err() != nil {
140✔
645
                                return err
2✔
646
                        }
2✔
647
                        if clientErr, ok := err.(ClientError); ok &&
136✔
648
                                clientErr.Code == ErrConnectionClosed {
191✔
649
                                return err
55✔
650
                        }
55✔
651
                } else {
115✔
652
                        return nil
115✔
653
                }
115✔
654

655
                conn.logEvent(ReconnectFailedEvent{
81✔
656
                        baseEvent:     newBaseEvent(conn.addr),
81✔
657
                        Reconnects:    reconnects,
81✔
658
                        MaxReconnects: conn.opts.MaxReconnects,
81✔
659
                        Error:         err,
81✔
660
                        IsInitial:     conn.addr == nil,
81✔
661
                })
81✔
662

81✔
663
                conn.notify(ReconnectFailed)
81✔
664
                reconnects++
81✔
665
                conn.mutex.Unlock()
81✔
666

81✔
667
                select {
81✔
UNCOV
668
                case <-ctx.Done():
×
669
                // Since the context is cancelled, we don't need to do anything.
670
                // Conn.connect() will return the correct error.
671
                case <-t.C:
81✔
672
                }
673

674
                conn.mutex.Lock()
81✔
675
        }
676

677
        conn.logEvent(ReconnectFailedEvent{
1✔
678
                baseEvent: newBaseEvent(conn.addr),
1✔
679
                Error:     err,
1✔
680
        })
1✔
681

1✔
682
        return ClientError{ErrConnectionClosed, "last reconnect failed"}
1✔
683
}
684

685
func (conn *Connection) reconnectImpl(neterr error, c Conn) {
353✔
686
        if conn.opts.Reconnect > 0 {
499✔
687
                if c == conn.c {
204✔
688
                        _ = conn.closeConnection(neterr, false)
58✔
689
                        if err := conn.runReconnects(context.Background()); err != nil {
113✔
690
                                _ = conn.closeConnection(err, true)
55✔
691
                        }
55✔
692
                }
693
        } else {
207✔
694
                _ = conn.closeConnection(neterr, true)
207✔
695
        }
207✔
696
}
697

698
func (conn *Connection) reconnect(neterr error, c Conn) {
300✔
699
        conn.mutex.Lock()
300✔
700
        defer conn.mutex.Unlock()
300✔
701
        conn.reconnectImpl(neterr, c)
300✔
702
        conn.cond.Broadcast()
300✔
703
}
300✔
704

705
func (conn *Connection) lockShards() {
1,235✔
706
        for i := range conn.shard {
21,107✔
707
                conn.shard[i].rmut.Lock()
19,872✔
708
                conn.shard[i].bufmut.Lock()
19,872✔
709
        }
19,872✔
710
}
711

712
func (conn *Connection) unlockShards() {
1,235✔
713
        for i := range conn.shard {
21,107✔
714
                conn.shard[i].rmut.Unlock()
19,872✔
715
                conn.shard[i].bufmut.Unlock()
19,872✔
716
        }
19,872✔
717
}
718

719
func (conn *Connection) pinger() {
322✔
720
        to := conn.opts.Timeout
322✔
721
        if to == 0 {
323✔
722
                to = 3 * time.Second
1✔
723
        }
1✔
724
        t := time.NewTicker(to / 3)
322✔
725
        defer t.Stop()
322✔
726
        for {
645✔
727
                select {
323✔
728
                case <-conn.control:
320✔
729
                        return
320✔
730
                case <-t.C:
1✔
731
                }
732
                conn.Do(NewPingRequest())
1✔
733
        }
734
}
735

736
func (conn *Connection) notify(kind ConnEventKind) {
840✔
737
        if conn.opts.Notify != nil {
848✔
738
                select {
8✔
739
                case conn.opts.Notify <- ConnEvent{Kind: kind, Conn: conn, When: time.Now()}:
×
740
                default:
8✔
741
                }
742
        }
743
}
744

745
func (conn *Connection) writer(w writeFlusher, c Conn) {
325✔
746
        var shardn uint32
325✔
747
        var packet smallWBuf
325✔
748
        for atomic.LoadUint32(&conn.state) != connClosed {
2,489✔
749
                select {
2,164✔
750
                case shardn = <-conn.dirtyShard:
429✔
751
                default:
1,735✔
752
                        runtime.Gosched()
1,735✔
753
                        if len(conn.dirtyShard) == 0 {
3,421✔
754
                                if err := w.Flush(); err != nil {
1,688✔
755
                                        err = ClientError{
2✔
756
                                                ErrIoError,
2✔
757
                                                fmt.Sprintf("failed to flush data to the connection: %s", err),
2✔
758
                                        }
2✔
759
                                        conn.reconnect(err, c)
2✔
760
                                        return
2✔
761
                                }
2✔
762
                        }
763
                        select {
1,733✔
764
                        case shardn = <-conn.dirtyShard:
1,420✔
765
                        case <-conn.control:
311✔
766
                                return
311✔
767
                        }
768
                }
769
                shard := &conn.shard[shardn]
1,849✔
770
                shard.bufmut.Lock()
1,849✔
771
                if conn.c != c {
1,858✔
772
                        conn.dirtyShard <- shardn
9✔
773
                        shard.bufmut.Unlock()
9✔
774
                        return
9✔
775
                }
9✔
776
                packet, shard.buf = shard.buf, packet
1,840✔
777
                shard.bufmut.Unlock()
1,840✔
778
                if packet.Len() == 0 {
1,840✔
779
                        continue
×
780
                }
781
                if _, err := w.Write(packet.b); err != nil {
1,840✔
782
                        err = ClientError{
×
783
                                ErrIoError,
×
784
                                fmt.Sprintf("failed to write data to the connection: %s", err),
×
785
                        }
×
786
                        conn.reconnect(err, c)
×
787
                        return
×
788
                }
×
789
                packet.Reset()
1,840✔
790
        }
791
}
792

793
func readWatchEvent(reader io.Reader) (connWatchEvent, error) {
416✔
794
        keyExist := false
416✔
795
        event := connWatchEvent{}
416✔
796

416✔
797
        d := getDecoder(reader)
416✔
798
        defer putDecoder(d)
416✔
799

416✔
800
        l, err := d.DecodeMapLen()
416✔
801
        if err != nil {
416✔
802
                return event, err
×
803
        }
×
804

805
        for ; l > 0; l-- {
921✔
806
                cd, err := d.DecodeInt()
505✔
807
                if err != nil {
505✔
808
                        return event, err
×
809
                }
×
810

811
                switch iproto.Key(cd) {
505✔
812
                case iproto.IPROTO_EVENT_KEY:
416✔
813
                        if event.key, err = d.DecodeString(); err != nil {
416✔
814
                                return event, err
×
815
                        }
×
816
                        keyExist = true
416✔
817
                case iproto.IPROTO_EVENT_DATA:
89✔
818
                        if event.value, err = d.DecodeInterface(); err != nil {
89✔
819
                                return event, err
×
820
                        }
×
821
                default:
×
822
                        if err = d.Skip(); err != nil {
×
823
                                return event, err
×
824
                        }
×
825
                }
826
        }
827

828
        if !keyExist {
416✔
829
                return event, errors.New("watch event does not have a key")
×
830
        }
×
831

832
        return event, nil
416✔
833
}
834

835
func (conn *Connection) reader(r io.Reader, c Conn) {
325✔
836
        events := make(chan connWatchEvent, 1024)
325✔
837
        defer close(events)
325✔
838

325✔
839
        go conn.eventer(events)
325✔
840

325✔
841
        for atomic.LoadUint32(&conn.state) != connClosed {
2,172✔
842
                respBytes, err := read(r, conn.lenbuf[:])
1,847✔
843
                if err != nil {
2,145✔
844
                        err = ClientError{
298✔
845
                                ErrIoError,
298✔
846
                                fmt.Sprintf("failed to read data from the connection: %s", err),
298✔
847
                        }
298✔
848
                        conn.reconnect(err, c)
298✔
849
                        return
298✔
850
                }
298✔
851
                buf := smallBuf{b: respBytes}
1,547✔
852
                header, code, err := decodeHeader(conn.dec, &buf)
1,547✔
853
                if err != nil {
1,547✔
854
                        err = ClientError{
×
855
                                ErrProtocolError,
×
856
                                fmt.Sprintf("failed to decode IPROTO header: %s", err),
×
857
                        }
×
858
                        conn.reconnect(err, c)
×
859
                        return
×
860
                }
×
861

862
                var fut *Future = nil
1,547✔
863
                if code == iproto.IPROTO_EVENT {
1,963✔
864
                        if event, err := readWatchEvent(&buf); err == nil {
832✔
865
                                events <- event
416✔
866
                        } else {
416✔
867
                                err = ClientError{
×
868
                                        ErrProtocolError,
×
869
                                        fmt.Sprintf("failed to decode IPROTO_EVENT: %s", err),
×
870
                                }
×
NEW
871
                                conn.logEvent(WatchEventReadFailedEvent{
×
NEW
872
                                        baseEvent: newBaseEvent(conn.addr),
×
NEW
873
                                        Error:     err,
×
NEW
874
                                })
×
UNCOV
875
                        }
×
876
                        continue
416✔
877
                } else if code == iproto.IPROTO_CHUNK {
1,132✔
878
                        conn.logEvent(BoxSessionPushUnsupportedEvent{
1✔
879
                                baseEvent: newBaseEvent(conn.addr),
1✔
880
                                RequestId: header.RequestId,
1✔
881
                        })
1✔
882
                } else {
1,131✔
883
                        if fut = conn.fetchFuture(header.RequestId); fut != nil {
2,260✔
884
                                if err := fut.SetResponse(header, &buf); err != nil {
1,131✔
885
                                        fut.SetError(fmt.Errorf("failed to set response: %w", err))
1✔
886
                                }
1✔
887
                                conn.markDone(fut)
1,130✔
888
                        }
889
                }
890

891
                if fut == nil {
1,132✔
892
                        conn.logEvent(UnexpectedResultIdEvent{
1✔
893
                                baseEvent: newBaseEvent(conn.addr),
1✔
894
                                RequestId: header.RequestId,
1✔
895
                        })
1✔
896
                }
1✔
897
        }
898
}
899

900
// eventer goroutine gets watch events and updates values for watchers.
901
func (conn *Connection) eventer(events <-chan connWatchEvent) {
325✔
902
        for event := range events {
741✔
903
                if value, ok := conn.watchMap.Load(event.key); ok {
809✔
904
                        st := value.(chan watchState)
393✔
905
                        state := <-st
393✔
906
                        state.value = event.value
393✔
907
                        if state.version == math.MaxUint {
393✔
908
                                state.version = initWatchEventVersion + 1
×
909
                        } else {
393✔
910
                                state.version += 1
393✔
911
                        }
393✔
912
                        state.ack = false
393✔
913
                        if state.changed != nil {
786✔
914
                                close(state.changed)
393✔
915
                                state.changed = nil
393✔
916
                        }
393✔
917
                        st <- state
393✔
918
                }
919
                // It is possible to get IPROTO_EVENT after we already send
920
                // IPROTO_UNWATCH due to processing on a Tarantool side or slow
921
                // read from the network, so it looks like an expected behavior.
922
        }
923
}
924

925
func (conn *Connection) newFuture(req Request) (fut *Future) {
1,874✔
926
        ctx := req.Ctx()
1,874✔
927
        fut = NewFuture(req)
1,874✔
928
        if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
1,874✔
929
                select {
×
930
                case conn.rlimit <- struct{}{}:
×
931
                default:
×
932
                        fut.err = ClientError{
×
933
                                ErrRateLimited,
×
934
                                "Request is rate limited on client",
×
935
                        }
×
936
                        fut.finish()
×
937
                        return
×
938
                }
939
        }
940
        fut.requestId = conn.nextRequestId(ctx != nil)
1,874✔
941
        shardn := fut.requestId & (conn.opts.Concurrency - 1)
1,874✔
942
        shard := &conn.shard[shardn]
1,874✔
943
        shard.rmut.Lock()
1,874✔
944
        switch atomic.LoadUint32(&conn.state) {
1,874✔
945
        case connClosed:
6✔
946
                fut.err = ClientError{
6✔
947
                        ErrConnectionClosed,
6✔
948
                        "using closed connection",
6✔
949
                }
6✔
950
                fut.finish()
6✔
951
                shard.rmut.Unlock()
6✔
952
                return
6✔
953
        case connDisconnected:
2✔
954
                fut.err = ClientError{
2✔
955
                        ErrConnectionNotReady,
2✔
956
                        "client connection is not ready",
2✔
957
                }
2✔
958
                fut.finish()
2✔
959
                shard.rmut.Unlock()
2✔
960
                return
2✔
961
        case connShutdown:
6✔
962
                fut.err = ClientError{
6✔
963
                        ErrConnectionShutdown,
6✔
964
                        "server shutdown in progress",
6✔
965
                }
6✔
966
                fut.finish()
6✔
967
                shard.rmut.Unlock()
6✔
968
                return
6✔
969
        }
970
        pos := (fut.requestId / conn.opts.Concurrency) & (requestsMap - 1)
1,860✔
971
        if ctx != nil {
1,867✔
972
                select {
7✔
973
                case <-ctx.Done():
7✔
974
                        fut.SetError(fmt.Errorf("context is done (request ID %d): %w",
7✔
975
                                fut.requestId, context.Cause(ctx)))
7✔
976
                        shard.rmut.Unlock()
7✔
977
                        return
7✔
978
                default:
×
979
                }
980
                shard.requestsWithCtx[pos].addFuture(fut)
×
981
        } else {
1,853✔
982
                shard.requests[pos].addFuture(fut)
1,853✔
983
                if conn.opts.Timeout > 0 {
3,697✔
984
                        fut.timeout = time.Since(epoch) + conn.opts.Timeout
1,844✔
985
                }
1,844✔
986
        }
987
        shard.rmut.Unlock()
1,853✔
988
        if conn.rlimit != nil && conn.opts.RLimitAction == RLimitWait {
1,853✔
989
                select {
×
990
                case conn.rlimit <- struct{}{}:
×
991
                default:
×
992
                        runtime.Gosched()
×
993
                        select {
×
994
                        case conn.rlimit <- struct{}{}:
×
995
                        case <-fut.WaitChan():
×
996
                                if fut.err == nil {
×
997
                                        panic("fut.done is closed, but err is nil")
×
998
                                }
999
                        }
1000
                }
1001
        }
1002
        return
1,853✔
1003
}
1004

1005
// This method removes a future from the internal queue if the context
1006
// is "done" before the response is come.
1007
func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
×
1008
        select {
×
1009
        case <-fut.WaitChan():
×
1010
        case <-ctx.Done():
×
1011
        }
1012

1013
        select {
×
1014
        case <-fut.WaitChan():
×
1015
                return
×
1016
        default:
×
1017
                conn.cancelFuture(fut, fmt.Errorf("context is done (request ID %d): %w",
×
1018
                        fut.requestId, context.Cause(ctx)))
×
1019
        }
1020
}
1021

1022
func (conn *Connection) incrementRequestCnt() {
1,874✔
1023
        atomic.AddInt64(&conn.requestCnt, int64(1))
1,874✔
1024
}
1,874✔
1025

1026
func (conn *Connection) decrementRequestCnt() {
1,874✔
1027
        if atomic.AddInt64(&conn.requestCnt, int64(-1)) == 0 {
3,610✔
1028
                conn.cond.Broadcast()
1,736✔
1029
        }
1,736✔
1030
}
1031

1032
func (conn *Connection) send(req Request, streamId uint64) *Future {
1,874✔
1033
        conn.incrementRequestCnt()
1,874✔
1034

1,874✔
1035
        fut := conn.newFuture(req)
1,874✔
1036

1,874✔
1037
        if fut.isFinished() {
1,895✔
1038
                conn.decrementRequestCnt()
21✔
1039
                return fut
21✔
1040
        }
21✔
1041

1042
        if req.Ctx() != nil {
1,853✔
1043
                select {
×
1044
                case <-req.Ctx().Done():
×
1045
                        conn.cancelFuture(fut, fmt.Errorf("context is done (request ID %d)", fut.requestId))
×
1046
                        return fut
×
1047
                default:
×
1048
                }
1049
                go conn.contextWatchdog(fut, req.Ctx())
×
1050
        }
1051
        conn.putFuture(fut, req, streamId)
1,853✔
1052

1,853✔
1053
        return fut
1,853✔
1054
}
1055

1056
func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
1,853✔
1057
        shardn := fut.requestId & (conn.opts.Concurrency - 1)
1,853✔
1058
        shard := &conn.shard[shardn]
1,853✔
1059
        shard.bufmut.Lock()
1,853✔
1060

1,853✔
1061
        if fut.isFinished() {
1,853✔
1062
                shard.bufmut.Unlock()
×
1063
                return
×
1064
        }
×
1065

1066
        firstWritten := shard.buf.Len() == 0
1,853✔
1067
        if shard.buf.Cap() == 0 {
3,440✔
1068
                shard.buf.b = make([]byte, 0, 128)
1,587✔
1069
                shard.enc = msgpack.NewEncoder(&shard.buf)
1,587✔
1070
        }
1,587✔
1071
        blen := shard.buf.Len()
1,853✔
1072
        reqid := fut.requestId
1,853✔
1073
        if err := pack(&shard.buf, shard.enc, reqid, req, streamId, conn.schemaResolver); err != nil {
1,853✔
1074
                shard.buf.Trunc(blen)
×
1075
                shard.bufmut.Unlock()
×
1076
                if f := conn.fetchFuture(reqid); f == fut {
×
1077
                        fut.SetError(err)
×
1078
                        conn.markDone(fut)
×
1079
                } else if f != nil {
×
1080
                        /* in theory, it is possible. In practice, you have
×
1081
                         * to have race condition that lasts hours */
×
1082
                        panic("Unknown future")
×
1083
                } else {
×
1084
                        fut.wait()
×
1085
                        if fut.err == nil {
×
1086
                                panic("Future removed from queue without error")
×
1087
                        }
1088
                        if _, ok := fut.err.(ClientError); ok {
×
1089
                                // packing error is more important than connection
×
1090
                                // error, because it is indication of programmer's
×
1091
                                // mistake.
×
1092
                                fut.SetError(err)
×
1093
                        }
×
1094
                }
1095
                return
×
1096
        }
1097
        shard.bufmut.Unlock()
1,853✔
1098

1,853✔
1099
        if firstWritten {
3,703✔
1100
                conn.dirtyShard <- shardn
1,850✔
1101
        }
1,850✔
1102

1103
        if req.Async() {
2,573✔
1104
                if fut = conn.fetchFuture(reqid); fut != nil {
1,439✔
1105
                        header := Header{
719✔
1106
                                RequestId: reqid,
719✔
1107
                                Error:     ErrorNo,
719✔
1108
                        }
719✔
1109
                        _ = fut.SetResponse(header, nil)
719✔
1110
                        conn.markDone(fut)
719✔
1111
                }
719✔
1112
        }
1113
}
1114

1115
func (conn *Connection) markDone(fut *Future) {
1,853✔
1116
        if conn.rlimit != nil {
1,853✔
1117
                <-conn.rlimit
×
1118
        }
×
1119
        conn.decrementRequestCnt()
1,853✔
1120
}
1121

1122
func (conn *Connection) fetchFuture(reqid uint32) (fut *Future) {
1,850✔
1123
        shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
1,850✔
1124
        shard.rmut.Lock()
1,850✔
1125
        fut = conn.getFutureImp(reqid, true)
1,850✔
1126
        shard.rmut.Unlock()
1,850✔
1127
        return fut
1,850✔
1128
}
1,850✔
1129

1130
func (conn *Connection) getFutureImp(reqid uint32, fetch bool) *Future {
1,850✔
1131
        shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
1,850✔
1132
        pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1)
1,850✔
1133
        // futures with even requests id belong to requests list with nil context
1,850✔
1134
        if reqid%2 == 0 {
3,700✔
1135
                return shard.requests[pos].findFuture(reqid, fetch)
1,850✔
1136
        } else {
1,850✔
1137
                return shard.requestsWithCtx[pos].findFuture(reqid, fetch)
×
1138
        }
×
1139
}
1140

1141
func (conn *Connection) timeouts() {
321✔
1142
        timeout := conn.opts.Timeout
321✔
1143
        t := time.NewTimer(timeout)
321✔
1144
        for {
642✔
1145
                var nowepoch time.Duration
321✔
1146
                select {
321✔
1147
                case <-conn.control:
320✔
1148
                        t.Stop()
320✔
1149
                        return
320✔
1150
                case <-t.C:
×
1151
                }
1152
                minNext := time.Since(epoch) + timeout
×
1153
                for i := range conn.shard {
×
1154
                        nowepoch = time.Since(epoch)
×
1155
                        shard := &conn.shard[i]
×
1156
                        for pos := range shard.requests {
×
1157
                                shard.rmut.Lock()
×
1158
                                pair := &shard.requests[pos]
×
1159
                                for pair.first != nil && pair.first.timeout < nowepoch {
×
1160
                                        shard.bufmut.Lock()
×
1161
                                        fut := pair.first
×
1162
                                        pair.first = fut.next
×
1163
                                        if fut.next == nil {
×
1164
                                                pair.last = &pair.first
×
1165
                                        } else {
×
1166
                                                fut.next = nil
×
1167
                                        }
×
1168
                                        fut.SetError(ClientError{
×
1169
                                                Code: ErrTimeouted,
×
1170
                                                Msg:  fmt.Sprintf("client timeout for request %d", fut.requestId),
×
1171
                                        })
×
1172
                                        conn.markDone(fut)
×
1173
                                        shard.bufmut.Unlock()
×
NEW
1174

×
NEW
1175
                                        conn.logEvent(TimeoutEvent{
×
NEW
1176
                                                baseEvent: newBaseEvent(conn.addr),
×
NEW
1177
                                                RequestId: fut.requestId,
×
NEW
1178
                                                Timeout:   timeout,
×
NEW
1179
                                        })
×
1180
                                }
1181
                                if pair.first != nil && pair.first.timeout < minNext {
×
1182
                                        minNext = pair.first.timeout
×
1183
                                }
×
1184
                                shard.rmut.Unlock()
×
1185
                        }
1186
                }
1187
                nowepoch = time.Since(epoch)
×
1188
                if nowepoch+time.Microsecond < minNext {
×
1189
                        t.Reset(minNext - nowepoch)
×
1190
                } else {
×
1191
                        t.Reset(time.Microsecond)
×
1192
                }
×
1193
        }
1194
}
1195

1196
func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
2,510✔
1197
        var length uint64
2,510✔
1198

2,510✔
1199
        if _, err = io.ReadFull(r, lenbuf); err != nil {
2,812✔
1200
                return
302✔
1201
        }
302✔
1202
        if lenbuf[0] != 0xce {
2,206✔
1203
                err = errors.New("wrong response header")
×
1204
                return
×
1205
        }
×
1206
        length = (uint64(lenbuf[1]) << 24) +
2,206✔
1207
                (uint64(lenbuf[2]) << 16) +
2,206✔
1208
                (uint64(lenbuf[3]) << 8) +
2,206✔
1209
                uint64(lenbuf[4])
2,206✔
1210

2,206✔
1211
        switch {
2,206✔
1212
        case length == 0:
×
1213
                err = errors.New("response should not be 0 length")
×
1214
                return
×
1215
        case length > math.MaxUint32:
×
1216
                err = errors.New("response is too big")
×
1217
                return
×
1218
        }
1219

1220
        response = make([]byte, length)
2,206✔
1221
        _, err = io.ReadFull(r, response)
2,206✔
1222

2,206✔
1223
        return
2,206✔
1224
}
1225

1226
func (conn *Connection) nextRequestId(context bool) (requestId uint32) {
1,874✔
1227
        if context {
1,881✔
1228
                return atomic.AddUint32(&conn.contextRequestId, 2)
7✔
1229
        } else {
1,874✔
1230
                return atomic.AddUint32(&conn.requestId, 2)
1,867✔
1231
        }
1,867✔
1232
}
1233

1234
// Do performs a request asynchronously on the connection.
1235
//
1236
// An error is returned if the request was formed incorrectly, or failed to
1237
// create the future.
1238
func (conn *Connection) Do(req Request) *Future {
1,827✔
1239
        if connectedReq, ok := req.(ConnectedRequest); ok {
1,833✔
1240
                if connectedReq.Conn() != conn {
7✔
1241
                        fut := NewFuture(req)
1✔
1242
                        fut.SetError(errUnknownRequest)
1✔
1243
                        return fut
1✔
1244
                }
1✔
1245
        }
1246
        return conn.send(req, ignoreStreamId)
1,826✔
1247
}
1248

1249
// ConfiguredTimeout returns a timeout from connection config.
1250
func (conn *Connection) ConfiguredTimeout() time.Duration {
×
1251
        return conn.opts.Timeout
×
UNCOV
1252
}
×
1253

1254
// SetSchema sets Schema for the connection.
1255
func (conn *Connection) SetSchema(s Schema) {
266✔
1256
        sCopy := s.copy()
266✔
1257
        spaceAndIndexNamesSupported :=
266✔
1258
                isFeatureInSlice(iproto.IPROTO_FEATURE_SPACE_AND_INDEX_NAMES,
266✔
1259
                        conn.serverProtocolInfo.Features)
266✔
1260

266✔
1261
        conn.mutex.Lock()
266✔
1262
        defer conn.mutex.Unlock()
266✔
1263
        conn.lockShards()
266✔
1264
        defer conn.unlockShards()
266✔
1265

266✔
1266
        conn.schemaResolver = &loadedSchemaResolver{
266✔
1267
                Schema:                      sCopy,
266✔
1268
                SpaceAndIndexNamesSupported: spaceAndIndexNamesSupported,
266✔
1269
        }
266✔
1270
}
266✔
1271

1272
// NewPrepared passes a sql statement to Tarantool for preparation synchronously.
1273
func (conn *Connection) NewPrepared(expr string) (*Prepared, error) {
1✔
1274
        req := NewPrepareRequest(expr)
1✔
1275
        resp, err := conn.Do(req).GetResponse()
1✔
1276
        if err != nil {
1✔
1277
                return nil, err
×
UNCOV
1278
        }
×
1279
        return NewPreparedFromResponse(conn, resp)
1✔
1280
}
1281

1282
// NewStream creates new Stream object for connection.
1283
//
1284
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
1285
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
1286
// Since 1.7.0
1287
func (conn *Connection) NewStream() (*Stream, error) {
9✔
1288
        next := atomic.AddUint64(&conn.lastStreamId, 1)
9✔
1289
        return &Stream{
9✔
1290
                Id:   next,
9✔
1291
                Conn: conn,
9✔
1292
        }, nil
9✔
1293
}
9✔
1294

1295
// watchState is the current state of the watcher. See the idea at p. 70, 105:
1296
// https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view
1297
type watchState struct {
1298
        // value is a current value.
1299
        value interface{}
1300
        // version is a current version of the value.
1301
        version uint
1302
        // ack true if the acknowledge is already sent.
1303
        ack bool
1304
        // cnt is a count of active watchers for the key.
1305
        cnt int
1306
        // changed is a channel for broadcast the value changes.
1307
        changed chan struct{}
1308
        // unready channel exists if a state is not ready to work (subscription
1309
        // or unsubscription in progress).
1310
        unready chan struct{}
1311
}
1312

1313
// initWatchEventVersion is an initial version until no events from Tarantool.
1314
const initWatchEventVersion uint = 0
1315

1316
// connWatcher is an internal implementation of the Watcher interface.
1317
type connWatcher struct {
1318
        unregister sync.Once
1319
        // done is closed when the watcher is unregistered, but the watcher
1320
        // goroutine is not yet finished.
1321
        done chan struct{}
1322
        // finished is closed when the watcher is unregistered and the watcher
1323
        // goroutine is finished.
1324
        finished chan struct{}
1325
}
1326

1327
// Unregister unregisters the connection watcher.
1328
func (w *connWatcher) Unregister() {
2,378✔
1329
        w.unregister.Do(func() {
3,757✔
1330
                close(w.done)
1,379✔
1331
        })
1,379✔
1332
        <-w.finished
2,378✔
1333
}
1334

1335
// subscribeWatchChannel returns an existing one or a new watch state channel
1336
// for the key. It also increases a counter of active watchers for the channel.
1337
func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error) {
1,383✔
1338
        var st chan watchState
1,383✔
1339

1,383✔
1340
        for st == nil {
2,766✔
1341
                if val, ok := conn.watchMap.Load(key); !ok {
1,710✔
1342
                        st = make(chan watchState, 1)
327✔
1343
                        state := watchState{
327✔
1344
                                value:   nil,
327✔
1345
                                version: initWatchEventVersion,
327✔
1346
                                ack:     false,
327✔
1347
                                cnt:     0,
327✔
1348
                                changed: nil,
327✔
1349
                                unready: make(chan struct{}),
327✔
1350
                        }
327✔
1351
                        st <- state
327✔
1352

327✔
1353
                        if val, loaded := conn.watchMap.LoadOrStore(key, st); !loaded {
654✔
1354
                                if _, err := conn.Do(newWatchRequest(key)).Get(); err != nil {
329✔
1355
                                        conn.watchMap.Delete(key)
2✔
1356
                                        close(state.unready)
2✔
1357
                                        return nil, err
2✔
1358
                                }
2✔
1359
                                // It is a successful subsctiption to a watch events by itself.
1360
                                state = <-st
325✔
1361
                                state.cnt = 1
325✔
1362
                                close(state.unready)
325✔
1363
                                state.unready = nil
325✔
1364
                                st <- state
325✔
1365
                                continue
325✔
1366
                        } else {
×
1367
                                close(state.unready)
×
1368
                                close(st)
×
1369
                                st = val.(chan watchState)
×
UNCOV
1370
                        }
×
1371
                } else {
1,056✔
1372
                        st = val.(chan watchState)
1,056✔
1373
                }
1,056✔
1374

1375
                // It is an existing channel created outside. It may be in the
1376
                // unready state.
1377
                state := <-st
1,056✔
1378
                if state.unready == nil {
2,112✔
1379
                        state.cnt += 1
1,056✔
1380
                }
1,056✔
1381
                st <- state
1,056✔
1382

1,056✔
1383
                if state.unready != nil {
1,056✔
1384
                        // Wait for an update and retry.
×
1385
                        <-state.unready
×
1386
                        st = nil
×
UNCOV
1387
                }
×
1388
        }
1389

1390
        return st, nil
1,381✔
1391
}
1392

1393
func isFeatureInSlice(expected iproto.Feature, actualSlice []iproto.Feature) bool {
1,976✔
1394
        for _, actual := range actualSlice {
11,008✔
1395
                if expected == actual {
10,998✔
1396
                        return true
1,966✔
1397
                }
1,966✔
1398
        }
1399
        return false
10✔
1400
}
1401

1402
// NewWatcher creates a new Watcher object for the connection.
1403
//
1404
// Server must support IPROTO_FEATURE_WATCHERS to use watchers.
1405
//
1406
// After watcher creation, the watcher callback is invoked for the first time.
1407
// In this case, the callback is triggered whether or not the key has already
1408
// been broadcast. All subsequent invocations are triggered with
1409
// box.broadcast() called on the remote host. If a watcher is subscribed for a
1410
// key that has not been broadcast yet, the callback is triggered only once,
1411
// after the registration of the watcher.
1412
//
1413
// The watcher callbacks are always invoked in a separate goroutine. A watcher
1414
// callback is never executed in parallel with itself, but they can be executed
1415
// in parallel to other watchers.
1416
//
1417
// If the key is updated while the watcher callback is running, the callback
1418
// will be invoked again with the latest value as soon as it returns.
1419
//
1420
// Watchers survive reconnection. All registered watchers are automatically
1421
// resubscribed when the connection is reestablished.
1422
//
1423
// Keep in mind that garbage collection of a watcher handle doesn’t lead to the
1424
// watcher’s destruction. In this case, the watcher remains registered. You
1425
// need to call Unregister() directly.
1426
//
1427
// Unregister() guarantees that there will be no the watcher's callback calls
1428
// after it, but Unregister() call from the callback leads to a deadlock.
1429
//
1430
// See:
1431
// https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_events/#box-watchers
1432
//
1433
// Since 1.10.0
1434
func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, error) {
1,066✔
1435
        // We need to check the feature because the IPROTO_WATCH request is
1,066✔
1436
        // asynchronous. We do not expect any response from a Tarantool instance
1,066✔
1437
        // That's why we can't just check the Tarantool response for an unsupported
1,066✔
1438
        // request error.
1,066✔
1439
        if !isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
1,066✔
1440
                conn.serverProtocolInfo.Features) {
1,066✔
1441
                err := fmt.Errorf("the feature %s must be supported by connection "+
×
1442
                        "to create a watcher", iproto.IPROTO_FEATURE_WATCHERS)
×
1443
                return nil, err
×
UNCOV
1444
        }
×
1445

1446
        return conn.newWatcherImpl(key, callback)
1,066✔
1447
}
1448

1449
func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watcher, error) {
1,383✔
1450
        st, err := subscribeWatchChannel(conn, key)
1,383✔
1451
        if err != nil {
1,385✔
1452
                return nil, err
2✔
1453
        }
2✔
1454

1455
        // Start the watcher goroutine.
1456
        done := make(chan struct{})
1,381✔
1457
        finished := make(chan struct{})
1,381✔
1458

1,381✔
1459
        go func() {
2,762✔
1460
                version := initWatchEventVersion
1,381✔
1461
                for {
3,233✔
1462
                        state := <-st
1,852✔
1463
                        if state.changed == nil {
2,570✔
1464
                                state.changed = make(chan struct{})
718✔
1465
                        }
718✔
1466
                        st <- state
1,852✔
1467

1,852✔
1468
                        if state.version != version {
3,356✔
1469
                                callback(WatchEvent{
1,504✔
1470
                                        Conn:  conn,
1,504✔
1471
                                        Key:   key,
1,504✔
1472
                                        Value: state.value,
1,504✔
1473
                                })
1,504✔
1474
                                version = state.version
1,504✔
1475

1,504✔
1476
                                // Do we need to acknowledge the notification?
1,504✔
1477
                                state = <-st
1,504✔
1478
                                sendAck := !state.ack && version == state.version
1,504✔
1479
                                if sendAck {
1,897✔
1480
                                        state.ack = true
393✔
1481
                                }
393✔
1482
                                st <- state
1,504✔
1483

1,504✔
1484
                                if sendAck {
1,897✔
1485
                                        // We expect a reconnect and re-subscribe if it fails to
393✔
1486
                                        // send the watch request. So it looks ok do not check a
393✔
1487
                                        // result. But we need to make sure that the re-watch
393✔
1488
                                        // request will not be finished by a small per-request
393✔
1489
                                        // timeout.
393✔
1490
                                        req := newWatchRequest(key).Context(context.Background())
393✔
1491
                                        _, _ = conn.Do(req).Get()
393✔
1492
                                }
393✔
1493
                        }
1494

1495
                        select {
1,852✔
1496
                        case <-done:
1,379✔
1497
                                state := <-st
1,379✔
1498
                                state.cnt -= 1
1,379✔
1499
                                if state.cnt == 0 {
1,702✔
1500
                                        state.unready = make(chan struct{})
323✔
1501
                                }
323✔
1502
                                st <- state
1,379✔
1503

1,379✔
1504
                                if state.cnt == 0 {
1,702✔
1505
                                        // The last one sends IPROTO_UNWATCH.
323✔
1506
                                        if !conn.ClosedNow() {
331✔
1507
                                                // conn.ClosedNow() check is a workaround for calling
8✔
1508
                                                // Unregister from connectionClose().
8✔
1509
                                                //
8✔
1510
                                                // We need to make sure that the unwatch request will
8✔
1511
                                                // not be finished by a small per-request timeout to
8✔
1512
                                                // avoid lost of the request.
8✔
1513
                                                req := newUnwatchRequest(key).Context(context.Background())
8✔
1514
                                                _, _ = conn.Do(req).Get()
8✔
1515
                                        }
8✔
1516
                                        conn.watchMap.Delete(key)
323✔
1517
                                        close(state.unready)
323✔
1518
                                }
1519

1520
                                close(finished)
1,379✔
1521
                                return
1,379✔
1522
                        case <-state.changed:
471✔
1523
                        }
1524
                }
1525
        }()
1526

1527
        return &connWatcher{
1,381✔
1528
                done:     done,
1,381✔
1529
                finished: finished,
1,381✔
1530
        }, nil
1,381✔
1531
}
1532

1533
// ProtocolInfo returns protocol version and protocol features
1534
// supported by connected Tarantool server. Beware that values might be
1535
// outdated if connection is in a disconnected state.
1536
// Since 2.0.0
1537
func (conn *Connection) ProtocolInfo() ProtocolInfo {
5✔
1538
        return conn.serverProtocolInfo.Clone()
5✔
1539
}
5✔
1540

1541
func shutdownEventCallback(event WatchEvent) {
374✔
1542
        // Receives "true" on server shutdown.
374✔
1543
        // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
374✔
1544
        // step 2.
374✔
1545
        val, ok := event.Value.(bool)
374✔
1546
        if ok && val {
428✔
1547
                go func() {
108✔
1548
                        _ = event.Conn.shutdown(false)
54✔
1549
                }()
54✔
1550
        }
1551
}
1552

1553
func (conn *Connection) shutdown(forever bool) error {
56✔
1554
        // Forbid state changes.
56✔
1555
        conn.mutex.Lock()
56✔
1556
        defer conn.mutex.Unlock()
56✔
1557

56✔
1558
        if !atomic.CompareAndSwapUint32(&conn.state, connConnected, connShutdown) {
56✔
1559
                if forever {
×
1560
                        err := ClientError{ErrConnectionClosed, "connection closed by client"}
×
1561
                        return conn.closeConnection(err, true)
×
UNCOV
1562
                }
×
UNCOV
1563
                return nil
×
1564
        }
1565

1566
        if forever {
58✔
1567
                // We don't want to reconnect any more.
2✔
1568
                conn.opts.Reconnect = 0
2✔
1569
                conn.opts.MaxReconnects = 0
2✔
1570
        }
2✔
1571

1572
        conn.cond.Broadcast()
56✔
1573
        conn.notify(Shutdown)
56✔
1574

56✔
1575
        c := conn.c
56✔
1576
        for {
168✔
1577
                if (atomic.LoadUint32(&conn.state) != connShutdown) || (c != conn.c) {
114✔
1578
                        return nil
2✔
1579
                }
2✔
1580
                if atomic.LoadInt64(&conn.requestCnt) == 0 {
164✔
1581
                        break
54✔
1582
                }
1583
                // Use cond var on conn.mutex since request execution may
1584
                // call reconnect(). It is ok if state changes as part of
1585
                // reconnect since Tarantool server won't allow to reconnect
1586
                // in the middle of shutting down.
1587
                conn.cond.Wait()
56✔
1588
        }
1589

1590
        if forever {
55✔
1591
                err := ClientError{ErrConnectionClosed, "connection closed by client"}
1✔
1592
                return conn.closeConnection(err, true)
1✔
1593
        } else {
54✔
1594
                // Start to reconnect based on common rules, same as in net.box.
53✔
1595
                // Reconnect also closes the connection: server waits until all
53✔
1596
                // subscribed connections are terminated.
53✔
1597
                // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
53✔
1598
                // step 3.
53✔
1599
                conn.reconnectImpl(ClientError{
53✔
1600
                        ErrConnectionClosed,
53✔
1601
                        "connection closed after server shutdown",
53✔
1602
                }, conn.c)
53✔
1603
                return nil
53✔
1604
        }
53✔
1605
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc