• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / go-tarantool / 16046576138

03 Jul 2025 09:20AM UTC coverage: 75.44% (-0.02%) from 75.464%
16046576138

Pull #450

github

bigbes
tests: added test for Body() methods of requests using golden files
Pull Request #450: tests: added test for Body() methods of requests using golden files

3087 of 4092 relevant lines covered (75.44%)

9875.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.32
/connection.go
1
// Package with implementation of methods and structures for work with
2
// Tarantool instance.
3
package tarantool
4

5
import (
6
        "context"
7
        "encoding/binary"
8
        "errors"
9
        "fmt"
10
        "io"
11
        "log"
12
        "math"
13
        "net"
14
        "runtime"
15
        "sync"
16
        "sync/atomic"
17
        "time"
18

19
        "github.com/tarantool/go-iproto"
20
        "github.com/vmihailenco/msgpack/v5"
21
)
22

23
const requestsMap = 128
24
const ignoreStreamId = 0
25
const (
26
        connDisconnected = 0
27
        connConnected    = 1
28
        connShutdown     = 2
29
        connClosed       = 3
30
)
31

32
const shutdownEventKey = "box.shutdown"
33

34
type ConnEventKind int
35
type ConnLogKind int
36

37
var (
38
        errUnknownRequest = errors.New("the passed connected request doesn't belong " +
39
                "to the current connection or connection pool")
40
)
41

42
const (
43
        // Connected signals that connection is established or reestablished.
44
        Connected ConnEventKind = iota + 1
45
        // Disconnected signals that connection is broken.
46
        Disconnected
47
        // ReconnectFailed signals that attempt to reconnect has failed.
48
        ReconnectFailed
49
        // Shutdown signals that shutdown callback is processing.
50
        Shutdown
51
        // Either reconnect attempts exhausted, or explicit Close is called.
52
        Closed
53

54
        // LogReconnectFailed is logged when reconnect attempt failed.
55
        LogReconnectFailed ConnLogKind = iota + 1
56
        // LogLastReconnectFailed is logged when last reconnect attempt failed,
57
        // connection will be closed after that.
58
        LogLastReconnectFailed
59
        // LogUnexpectedResultId is logged when response with unknown id was received.
60
        // Most probably it is due to request timeout.
61
        LogUnexpectedResultId
62
        // LogWatchEventReadFailed is logged when failed to read a watch event.
63
        LogWatchEventReadFailed
64
        // LogAppendPushFailed is logged when failed to append a push response.
65
        LogAppendPushFailed
66
)
67

68
// ConnEvent is sent throw Notify channel specified in Opts.
69
type ConnEvent struct {
70
        Conn *Connection
71
        Kind ConnEventKind
72
        When time.Time
73
}
74

75
// A raw watch event.
76
type connWatchEvent struct {
77
        key   string
78
        value interface{}
79
}
80

81
var epoch = time.Now()
82

83
// Logger is logger type expected to be passed in options.
84
type Logger interface {
85
        Report(event ConnLogKind, conn *Connection, v ...interface{})
86
}
87

88
type defaultLogger struct{}
89

90
func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interface{}) {
83✔
91
        switch event {
83✔
92
        case LogReconnectFailed:
82✔
93
                reconnects := v[0].(uint)
82✔
94
                err := v[1].(error)
82✔
95
                addr := conn.Addr()
82✔
96
                if addr == nil {
98✔
97
                        log.Printf("tarantool: connect (%d/%d) failed: %s",
16✔
98
                                reconnects, conn.opts.MaxReconnects, err)
16✔
99
                } else {
82✔
100
                        log.Printf("tarantool: reconnect (%d/%d) to %s failed: %s",
66✔
101
                                reconnects, conn.opts.MaxReconnects, addr, err)
66✔
102
                }
66✔
103
        case LogLastReconnectFailed:
1✔
104
                err := v[0].(error)
1✔
105
                addr := conn.Addr()
1✔
106
                if addr == nil {
2✔
107
                        log.Printf("tarantool: last connect failed: %s, giving it up",
1✔
108
                                err)
1✔
109
                } else {
1✔
110
                        log.Printf("tarantool: last reconnect to %s failed: %s, giving it up",
×
111
                                addr, err)
×
112
                }
×
113
        case LogUnexpectedResultId:
×
114
                header := v[0].(Header)
×
115
                log.Printf("tarantool: connection %s got unexpected request ID (%d) in response "+
×
116
                        "(probably cancelled request)",
×
117
                        conn.Addr(), header.RequestId)
×
118
        case LogWatchEventReadFailed:
×
119
                err := v[0].(error)
×
120
                log.Printf("tarantool: unable to parse watch event: %s", err)
×
121
        case LogAppendPushFailed:
×
122
                err := v[0].(error)
×
123
                log.Printf("tarantool: unable to append a push response: %s", err)
×
124
        default:
×
125
                args := append([]interface{}{"tarantool: unexpected event ", event, conn}, v...)
×
126
                log.Print(args...)
×
127
        }
128
}
129

130
// Connection is a handle with a single connection to a Tarantool instance.
131
//
132
// It is created and configured with Connect function, and could not be
133
// reconfigured later.
134
//
135
// Connection could be in three possible states:
136
//
137
// - In "Connected" state it sends queries to Tarantool.
138
//
139
// - In "Disconnected" state it rejects queries with ClientError{Code:
140
// ErrConnectionNotReady}
141
//
142
// - In "Shutdown" state it rejects queries with ClientError{Code:
143
// ErrConnectionShutdown}. The state indicates that a graceful shutdown
144
// in progress. The connection waits for all active requests to
145
// complete.
146
//
147
// - In "Closed" state it rejects queries with ClientError{Code:
148
// ErrConnectionClosed}. Connection could become "Closed" when
149
// Connection.Close() method called, or when Tarantool disconnected and
150
// Reconnect pause is not specified or MaxReconnects is specified and
151
// MaxReconnect reconnect attempts already performed.
152
//
153
// You may perform data manipulation operation by calling its methods:
154
// Call*, Insert*, Replace*, Update*, Upsert*, Call*, Eval*.
155
//
156
// In any method that accepts space you my pass either space number or space
157
// name (in this case it will be looked up in schema). Same is true for index.
158
//
159
// ATTENTION: tuple, key, ops and args arguments for any method should be
160
// and array or should serialize to msgpack array.
161
//
162
// ATTENTION: result argument for *Typed methods should deserialize from
163
// msgpack array, cause Tarantool always returns result as an array.
164
// For all space related methods and Call16* (but not Call17*) methods Tarantool
165
// always returns array of array (array of tuples for space related methods).
166
// For Eval* and Call* Tarantool always returns array, but does not forces
167
// array of arrays.
168
//
169
// If connected to Tarantool 2.10 or newer, connection supports server graceful
170
// shutdown. In this case, server will wait until all client requests will be
171
// finished and client disconnects before going down (server also may go down
172
// by timeout). Client reconnect will happen if connection options enable
173
// reconnect. Beware that graceful shutdown event initialization is asynchronous.
174
//
175
// More on graceful shutdown:
176
// https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
177
type Connection struct {
178
        addr   net.Addr
179
        dialer Dialer
180
        c      Conn
181
        mutex  sync.Mutex
182
        cond   *sync.Cond
183
        // schemaResolver contains a SchemaResolver implementation.
184
        schemaResolver SchemaResolver
185
        // requestId contains the last request ID for requests with nil context.
186
        requestId uint32
187
        // contextRequestId contains the last request ID for requests with context.
188
        contextRequestId uint32
189
        // Greeting contains first message sent by Tarantool.
190
        Greeting *Greeting
191

192
        shard      []connShard
193
        dirtyShard chan uint32
194

195
        control chan struct{}
196
        rlimit  chan struct{}
197
        opts    Opts
198
        state   uint32
199
        dec     *msgpack.Decoder
200
        lenbuf  [packetLengthBytes]byte
201

202
        lastStreamId uint64
203

204
        serverProtocolInfo ProtocolInfo
205
        // watchMap is a map of key -> chan watchState.
206
        watchMap sync.Map
207

208
        // shutdownWatcher is the "box.shutdown" event watcher.
209
        shutdownWatcher Watcher
210
        // requestCnt is a counter of active requests.
211
        requestCnt int64
212
}
213

214
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
215

216
type futureList struct {
217
        first *Future
218
        last  **Future
219
}
220

221
func (list *futureList) findFuture(reqid uint32, fetch bool) *Future {
1,884✔
222
        root := &list.first
1,884✔
223
        for {
3,768✔
224
                fut := *root
1,884✔
225
                if fut == nil {
1,898✔
226
                        return nil
14✔
227
                }
14✔
228
                if fut.requestId == reqid {
3,740✔
229
                        if fetch {
3,740✔
230
                                *root = fut.next
1,870✔
231
                                if fut.next == nil {
3,740✔
232
                                        list.last = root
1,870✔
233
                                } else {
1,870✔
234
                                        fut.next = nil
×
235
                                }
×
236
                        }
237
                        return fut
1,870✔
238
                }
239
                root = &fut.next
×
240
        }
241
}
242

243
func (list *futureList) addFuture(fut *Future) {
1,870✔
244
        *list.last = fut
1,870✔
245
        list.last = &fut.next
1,870✔
246
}
1,870✔
247

248
func (list *futureList) clear(err error, conn *Connection) {
2,699,264✔
249
        fut := list.first
2,699,264✔
250
        list.first = nil
2,699,264✔
251
        list.last = &list.first
2,699,264✔
252
        for fut != nil {
2,699,277✔
253
                fut.SetError(err)
13✔
254
                conn.markDone(fut)
13✔
255
                fut, fut.next = fut.next, nil
13✔
256
        }
13✔
257
}
258

259
type connShard struct {
260
        rmut            sync.Mutex
261
        requests        [requestsMap]futureList
262
        requestsWithCtx [requestsMap]futureList
263
        bufmut          sync.Mutex
264
        buf             smallWBuf
265
        enc             *msgpack.Encoder
266
}
267

268
// RLimitActions is an enumeration type for an action to do when a rate limit
269
// is reached.
270
type RLimitAction int
271

272
const (
273
        // RLimitDrop immediately aborts the request.
274
        RLimitDrop RLimitAction = iota
275
        // RLimitWait waits during timeout period for some request to be answered.
276
        // If no request answered during timeout period, this request is aborted.
277
        // If no timeout period is set, it will wait forever.
278
        RLimitWait
279
)
280

281
// Opts is a way to configure Connection
282
type Opts struct {
283
        // Timeout for response to a particular request. The timeout is reset when
284
        // push messages are received. If Timeout is zero, any request can be
285
        // blocked infinitely.
286
        // Also used to setup net.TCPConn.Set(Read|Write)Deadline.
287
        //
288
        // Pay attention, when using contexts with request objects,
289
        // the timeout option for Connection does not affect the lifetime
290
        // of the request. For those purposes use context.WithTimeout() as
291
        // the root context.
292
        Timeout time.Duration
293
        // Timeout between reconnect attempts. If Reconnect is zero, no
294
        // reconnect attempts will be made.
295
        // If specified, then when Tarantool is not reachable or disconnected,
296
        // new connect attempt is performed after pause.
297
        // By default, no reconnection attempts are performed,
298
        // so once disconnected, connection becomes Closed.
299
        Reconnect time.Duration
300
        // Maximum number of reconnect failures; after that we give it up to
301
        // on. If MaxReconnects is zero, the client will try to reconnect
302
        // endlessly.
303
        // After MaxReconnects attempts Connection becomes closed.
304
        MaxReconnects uint
305
        // RateLimit limits number of 'in-fly' request, i.e. already put into
306
        // requests queue, but not yet answered by server or timeouted.
307
        // It is disabled by default.
308
        // See RLimitAction for possible actions when RateLimit.reached.
309
        RateLimit uint
310
        // RLimitAction tells what to do when RateLimit is reached.
311
        // It is required if RateLimit is specified.
312
        RLimitAction RLimitAction
313
        // Concurrency is amount of separate mutexes for request
314
        // queues and buffers inside of connection.
315
        // It is rounded up to nearest power of 2.
316
        // By default it is runtime.GOMAXPROCS(-1) * 4
317
        Concurrency uint32
318
        // SkipSchema disables schema loading. Without disabling schema loading,
319
        // there is no way to create Connection for currently not accessible Tarantool.
320
        SkipSchema bool
321
        // Notify is a channel which receives notifications about Connection status
322
        // changes.
323
        Notify chan<- ConnEvent
324
        // Handle is user specified value, that could be retrivied with
325
        // Handle() method.
326
        Handle interface{}
327
        // Logger is user specified logger used for error messages.
328
        Logger Logger
329
}
330

331
// Connect creates and configures a new Connection.
332
func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, err error) {
347✔
333
        conn = &Connection{
347✔
334
                dialer:           dialer,
347✔
335
                requestId:        0,
347✔
336
                contextRequestId: 1,
347✔
337
                Greeting:         &Greeting{},
347✔
338
                control:          make(chan struct{}),
347✔
339
                opts:             opts,
347✔
340
                dec:              msgpack.NewDecoder(&smallBuf{}),
347✔
341
        }
347✔
342
        maxprocs := uint32(runtime.GOMAXPROCS(-1))
347✔
343
        if conn.opts.Concurrency == 0 || conn.opts.Concurrency > maxprocs*128 {
692✔
344
                conn.opts.Concurrency = maxprocs * 4
345✔
345
        }
345✔
346
        if c := conn.opts.Concurrency; c&(c-1) != 0 {
347✔
347
                for i := uint(1); i < 32; i *= 2 {
×
348
                        c |= c >> i
×
349
                }
×
350
                conn.opts.Concurrency = c + 1
×
351
        }
352
        conn.dirtyShard = make(chan uint32, conn.opts.Concurrency*2)
347✔
353
        conn.shard = make([]connShard, conn.opts.Concurrency)
347✔
354
        for i := range conn.shard {
5,931✔
355
                shard := &conn.shard[i]
5,584✔
356
                requestsLists := []*[requestsMap]futureList{&shard.requests, &shard.requestsWithCtx}
5,584✔
357
                for _, requests := range requestsLists {
16,752✔
358
                        for j := range requests {
1,440,672✔
359
                                requests[j].last = &requests[j].first
1,429,504✔
360
                        }
1,429,504✔
361
                }
362
        }
363

364
        if conn.opts.RateLimit > 0 {
347✔
365
                conn.rlimit = make(chan struct{}, conn.opts.RateLimit)
×
366
                if conn.opts.RLimitAction != RLimitDrop && conn.opts.RLimitAction != RLimitWait {
×
367
                        return nil, errors.New("RLimitAction should be specified to RLimitDone nor RLimitWait")
×
368
                }
×
369
        }
370

371
        if conn.opts.Logger == nil {
694✔
372
                conn.opts.Logger = defaultLogger{}
347✔
373
        }
347✔
374

375
        conn.cond = sync.NewCond(&conn.mutex)
347✔
376

347✔
377
        if conn.opts.Reconnect > 0 {
462✔
378
                // We don't need these mutex.Lock()/mutex.Unlock() here, but
115✔
379
                // runReconnects() expects mutex.Lock() to be set, so it's
115✔
380
                // easier to add them instead of reworking runReconnects().
115✔
381
                conn.mutex.Lock()
115✔
382
                err = conn.runReconnects(ctx)
115✔
383
                conn.mutex.Unlock()
115✔
384
                if err != nil {
118✔
385
                        return nil, err
3✔
386
                }
3✔
387
        } else {
232✔
388
                if err = conn.connect(ctx); err != nil {
255✔
389
                        return nil, err
23✔
390
                }
23✔
391
        }
392

393
        go conn.pinger()
321✔
394
        if conn.opts.Timeout > 0 {
641✔
395
                go conn.timeouts()
320✔
396
        }
320✔
397

398
        // TODO: reload schema after reconnect.
399
        if !conn.opts.SkipSchema {
620✔
400
                schema, err := GetSchema(conn)
299✔
401
                if err != nil {
353✔
402
                        conn.mutex.Lock()
54✔
403
                        defer conn.mutex.Unlock()
54✔
404
                        conn.closeConnection(err, true)
54✔
405
                        return nil, err
54✔
406
                }
54✔
407
                conn.SetSchema(schema)
245✔
408
        }
409

410
        return conn, err
267✔
411
}
412

413
// ConnectedNow reports if connection is established at the moment.
414
func (conn *Connection) ConnectedNow() bool {
6✔
415
        return atomic.LoadUint32(&conn.state) == connConnected
6✔
416
}
6✔
417

418
// ClosedNow reports if connection is closed by user or after reconnect.
419
func (conn *Connection) ClosedNow() bool {
325✔
420
        return atomic.LoadUint32(&conn.state) == connClosed
325✔
421
}
325✔
422

423
// Close closes Connection.
424
// After this method called, there is no way to reopen this Connection.
425
func (conn *Connection) Close() error {
268✔
426
        err := ClientError{ErrConnectionClosed, "connection closed by client"}
268✔
427
        conn.mutex.Lock()
268✔
428
        defer conn.mutex.Unlock()
268✔
429
        return conn.closeConnection(err, true)
268✔
430
}
268✔
431

432
// CloseGraceful closes Connection gracefully. It waits for all requests to
433
// complete.
434
// After this method called, there is no way to reopen this Connection.
435
func (conn *Connection) CloseGraceful() error {
2✔
436
        return conn.shutdown(true)
2✔
437
}
2✔
438

439
// Addr returns a configured address of Tarantool socket.
440
func (conn *Connection) Addr() net.Addr {
84✔
441
        return conn.addr
84✔
442
}
84✔
443

444
// Handle returns a user-specified handle from Opts.
445
func (conn *Connection) Handle() interface{} {
×
446
        return conn.opts.Handle
×
447
}
×
448

449
func (conn *Connection) cancelFuture(fut *Future, err error) {
4✔
450
        if fut = conn.fetchFuture(fut.requestId); fut != nil {
4✔
451
                fut.SetError(err)
×
452
                conn.markDone(fut)
×
453
        }
×
454
}
455

456
func (conn *Connection) dial(ctx context.Context) error {
431✔
457
        opts := conn.opts
431✔
458

431✔
459
        var c Conn
431✔
460
        c, err := conn.dialer.Dial(ctx, DialOpts{
431✔
461
                IoTimeout: opts.Timeout,
431✔
462
        })
431✔
463
        if err != nil {
538✔
464
                return err
107✔
465
        }
107✔
466

467
        conn.addr = c.Addr()
324✔
468
        connGreeting := c.Greeting()
324✔
469
        conn.Greeting.Version = connGreeting.Version
324✔
470
        conn.Greeting.Salt = connGreeting.Salt
324✔
471
        conn.serverProtocolInfo = c.ProtocolInfo()
324✔
472

324✔
473
        if conn.schemaResolver == nil {
645✔
474
                namesSupported := isFeatureInSlice(iproto.IPROTO_FEATURE_SPACE_AND_INDEX_NAMES,
321✔
475
                        conn.serverProtocolInfo.Features)
321✔
476

321✔
477
                conn.schemaResolver = &noSchemaResolver{
321✔
478
                        SpaceAndIndexNamesSupported: namesSupported,
321✔
479
                }
321✔
480
        }
321✔
481

482
        // Watchers.
483
        conn.watchMap.Range(func(key, value interface{}) bool {
328✔
484
                st := value.(chan watchState)
4✔
485
                state := <-st
4✔
486
                if state.unready != nil {
4✔
487
                        st <- state
×
488
                        return true
×
489
                }
×
490

491
                req := newWatchRequest(key.(string))
4✔
492
                if err = writeRequest(ctx, c, req); err != nil {
4✔
493
                        st <- state
×
494
                        return false
×
495
                }
×
496
                state.ack = true
4✔
497

4✔
498
                st <- state
4✔
499
                return true
4✔
500
        })
501

502
        if err != nil {
324✔
503
                c.Close()
×
504
                return fmt.Errorf("unable to register watch: %w", err)
×
505
        }
×
506

507
        // Only if connected and fully initialized.
508
        conn.lockShards()
324✔
509
        conn.c = c
324✔
510
        atomic.StoreUint32(&conn.state, connConnected)
324✔
511
        conn.cond.Broadcast()
324✔
512
        conn.unlockShards()
324✔
513
        go conn.writer(c, c)
324✔
514
        go conn.reader(c, c)
324✔
515

324✔
516
        // Subscribe shutdown event to process graceful shutdown.
324✔
517
        if conn.shutdownWatcher == nil &&
324✔
518
                isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
324✔
519
                        conn.serverProtocolInfo.Features) {
640✔
520
                watcher, werr := conn.newWatcherImpl(shutdownEventKey, shutdownEventCallback)
316✔
521
                if werr != nil {
316✔
522
                        return werr
×
523
                }
×
524
                conn.shutdownWatcher = watcher
316✔
525
        }
526

527
        return nil
324✔
528
}
529

530
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
531
        req Request, streamId uint64, res SchemaResolver) (err error) {
2,537✔
532
        const uint32Code = 0xce
2,537✔
533
        const uint64Code = 0xcf
2,537✔
534
        const streamBytesLenUint64 = 10
2,537✔
535
        const streamBytesLenUint32 = 6
2,537✔
536

2,537✔
537
        hl := h.Len()
2,537✔
538

2,537✔
539
        var streamBytesLen = 0
2,537✔
540
        var streamBytes [streamBytesLenUint64]byte
2,537✔
541
        hMapLen := byte(0x82) // 2 element map.
2,537✔
542
        if streamId != ignoreStreamId {
2,584✔
543
                hMapLen = byte(0x83) // 3 element map.
47✔
544
                streamBytes[0] = byte(iproto.IPROTO_STREAM_ID)
47✔
545
                if streamId > math.MaxUint32 {
49✔
546
                        streamBytesLen = streamBytesLenUint64
2✔
547
                        streamBytes[1] = uint64Code
2✔
548
                        binary.BigEndian.PutUint64(streamBytes[2:], streamId)
2✔
549
                } else {
47✔
550
                        streamBytesLen = streamBytesLenUint32
45✔
551
                        streamBytes[1] = uint32Code
45✔
552
                        binary.BigEndian.PutUint32(streamBytes[2:], uint32(streamId))
45✔
553
                }
45✔
554
        }
555

556
        hBytes := append([]byte{
2,537✔
557
                uint32Code, 0, 0, 0, 0, // Length.
2,537✔
558
                hMapLen,
2,537✔
559
                byte(iproto.IPROTO_REQUEST_TYPE), byte(req.Type()), // Request type.
2,537✔
560
                byte(iproto.IPROTO_SYNC), uint32Code,
2,537✔
561
                byte(reqid >> 24), byte(reqid >> 16),
2,537✔
562
                byte(reqid >> 8), byte(reqid),
2,537✔
563
        }, streamBytes[:streamBytesLen]...)
2,537✔
564

2,537✔
565
        h.Write(hBytes)
2,537✔
566

2,537✔
567
        if err = req.Body(res, enc); err != nil {
2,537✔
568
                return
×
569
        }
×
570

571
        l := uint32(h.Len() - 5 - hl)
2,537✔
572
        h.b[hl+1] = byte(l >> 24)
2,537✔
573
        h.b[hl+2] = byte(l >> 16)
2,537✔
574
        h.b[hl+3] = byte(l >> 8)
2,537✔
575
        h.b[hl+4] = byte(l)
2,537✔
576

2,537✔
577
        return
2,537✔
578
}
579

580
func (conn *Connection) connect(ctx context.Context) error {
492✔
581
        var err error
492✔
582
        if conn.c == nil && conn.state == connDisconnected {
923✔
583
                if err = conn.dial(ctx); err == nil {
755✔
584
                        conn.notify(Connected)
324✔
585
                        return nil
324✔
586
                }
324✔
587
        }
588
        if conn.state == connClosed {
229✔
589
                err = ClientError{ErrConnectionClosed, "using closed connection"}
61✔
590
        }
61✔
591
        return err
168✔
592
}
593

594
func (conn *Connection) closeConnection(neterr error, forever bool) (err error) {
656✔
595
        conn.lockShards()
656✔
596
        defer conn.unlockShards()
656✔
597
        if forever {
1,248✔
598
                if conn.state != connClosed {
911✔
599
                        close(conn.control)
319✔
600
                        atomic.StoreUint32(&conn.state, connClosed)
319✔
601
                        conn.cond.Broadcast()
319✔
602
                        // Free the resources.
319✔
603
                        if conn.shutdownWatcher != nil {
633✔
604
                                go conn.shutdownWatcher.Unregister()
314✔
605
                                conn.shutdownWatcher = nil
314✔
606
                        }
314✔
607
                        conn.notify(Closed)
319✔
608
                }
609
        } else {
64✔
610
                atomic.StoreUint32(&conn.state, connDisconnected)
64✔
611
                conn.cond.Broadcast()
64✔
612
                conn.notify(Disconnected)
64✔
613
        }
64✔
614
        if conn.c != nil {
978✔
615
                err = conn.c.Close()
322✔
616
                conn.c = nil
322✔
617
        }
322✔
618
        for i := range conn.shard {
11,200✔
619
                conn.shard[i].buf.Reset()
10,544✔
620
                requestsLists := []*[requestsMap]futureList{
10,544✔
621
                        &conn.shard[i].requests,
10,544✔
622
                        &conn.shard[i].requestsWithCtx,
10,544✔
623
                }
10,544✔
624
                for _, requests := range requestsLists {
31,632✔
625
                        for pos := range requests {
2,720,352✔
626
                                requests[pos].clear(neterr, conn)
2,699,264✔
627
                        }
2,699,264✔
628
                }
629
        }
630
        return
656✔
631
}
632

633
func (conn *Connection) getDialTimeout() time.Duration {
179✔
634
        dialTimeout := conn.opts.Reconnect / 2
179✔
635
        if dialTimeout == 0 {
179✔
636
                dialTimeout = 500 * time.Millisecond
×
637
        } else if dialTimeout > 5*time.Second {
179✔
638
                dialTimeout = 5 * time.Second
×
639
        }
×
640
        return dialTimeout
179✔
641
}
642

643
func (conn *Connection) runReconnects(ctx context.Context) error {
179✔
644
        dialTimeout := conn.getDialTimeout()
179✔
645
        var reconnects uint
179✔
646
        var err error
179✔
647

179✔
648
        t := time.NewTicker(conn.opts.Reconnect)
179✔
649
        defer t.Stop()
179✔
650
        for conn.opts.MaxReconnects == 0 || reconnects <= conn.opts.MaxReconnects {
439✔
651
                localCtx, cancel := context.WithTimeout(ctx, dialTimeout)
260✔
652
                err = conn.connect(localCtx)
260✔
653
                cancel()
260✔
654

260✔
655
                if err != nil {
405✔
656
                        // The error will most likely be the one that Dialer
145✔
657
                        // returns to us due to the context being cancelled.
145✔
658
                        // Although this is not guaranteed. For example,
145✔
659
                        // if the dialer may throw another error before checking
145✔
660
                        // the context, and the context has already been
145✔
661
                        // canceled. Or the context was not canceled after
145✔
662
                        // the error was thrown, but before the context was
145✔
663
                        // checked here.
145✔
664
                        if ctx.Err() != nil {
147✔
665
                                return err
2✔
666
                        }
2✔
667
                        if clientErr, ok := err.(ClientError); ok &&
143✔
668
                                clientErr.Code == ErrConnectionClosed {
204✔
669
                                return err
61✔
670
                        }
61✔
671
                } else {
115✔
672
                        return nil
115✔
673
                }
115✔
674

675
                conn.opts.Logger.Report(LogReconnectFailed, conn, reconnects, err)
82✔
676
                conn.notify(ReconnectFailed)
82✔
677
                reconnects++
82✔
678
                conn.mutex.Unlock()
82✔
679

82✔
680
                select {
82✔
681
                case <-ctx.Done():
×
682
                        // Since the context is cancelled, we don't need to do anything.
683
                        // Conn.connect() will return the correct error.
684
                case <-t.C:
82✔
685
                }
686

687
                conn.mutex.Lock()
82✔
688
        }
689

690
        conn.opts.Logger.Report(LogLastReconnectFailed, conn, err)
1✔
691
        // mark connection as closed to avoid reopening by another goroutine
1✔
692
        return ClientError{ErrConnectionClosed, "last reconnect failed"}
1✔
693
}
694

695
func (conn *Connection) reconnectImpl(neterr error, c Conn) {
369✔
696
        if conn.opts.Reconnect > 0 {
530✔
697
                if c == conn.c {
225✔
698
                        conn.closeConnection(neterr, false)
64✔
699
                        if err := conn.runReconnects(context.Background()); err != nil {
125✔
700
                                conn.closeConnection(err, true)
61✔
701
                        }
61✔
702
                }
703
        } else {
208✔
704
                conn.closeConnection(neterr, true)
208✔
705
        }
208✔
706
}
707

708
func (conn *Connection) reconnect(neterr error, c Conn) {
310✔
709
        conn.mutex.Lock()
310✔
710
        defer conn.mutex.Unlock()
310✔
711
        conn.reconnectImpl(neterr, c)
310✔
712
        conn.cond.Broadcast()
310✔
713
}
310✔
714

715
func (conn *Connection) lockShards() {
1,226✔
716
        for i := range conn.shard {
20,954✔
717
                conn.shard[i].rmut.Lock()
19,728✔
718
                conn.shard[i].bufmut.Lock()
19,728✔
719
        }
19,728✔
720
}
721

722
func (conn *Connection) unlockShards() {
1,226✔
723
        for i := range conn.shard {
20,954✔
724
                conn.shard[i].rmut.Unlock()
19,728✔
725
                conn.shard[i].bufmut.Unlock()
19,728✔
726
        }
19,728✔
727
}
728

729
func (conn *Connection) pinger() {
321✔
730
        to := conn.opts.Timeout
321✔
731
        if to == 0 {
322✔
732
                to = 3 * time.Second
1✔
733
        }
1✔
734
        t := time.NewTicker(to / 3)
321✔
735
        defer t.Stop()
321✔
736
        for {
643✔
737
                select {
322✔
738
                case <-conn.control:
319✔
739
                        return
319✔
740
                case <-t.C:
1✔
741
                }
742
                conn.Ping()
1✔
743
        }
744
}
745

746
func (conn *Connection) notify(kind ConnEventKind) {
851✔
747
        if conn.opts.Notify != nil {
859✔
748
                select {
8✔
749
                case conn.opts.Notify <- ConnEvent{Kind: kind, Conn: conn, When: time.Now()}:
×
750
                default:
8✔
751
                }
752
        }
753
}
754

755
func (conn *Connection) writer(w writeFlusher, c Conn) {
324✔
756
        var shardn uint32
324✔
757
        var packet smallWBuf
324✔
758
        for atomic.LoadUint32(&conn.state) != connClosed {
2,499✔
759
                select {
2,175✔
760
                case shardn = <-conn.dirtyShard:
450✔
761
                default:
1,725✔
762
                        runtime.Gosched()
1,725✔
763
                        if len(conn.dirtyShard) == 0 {
3,400✔
764
                                if err := w.Flush(); err != nil {
1,693✔
765
                                        err = ClientError{
18✔
766
                                                ErrIoError,
18✔
767
                                                fmt.Sprintf("failed to flush data to the connection: %s", err),
18✔
768
                                        }
18✔
769
                                        conn.reconnect(err, c)
18✔
770
                                        return
18✔
771
                                }
18✔
772
                        }
773
                        select {
1,707✔
774
                        case shardn = <-conn.dirtyShard:
1,421✔
775
                        case <-conn.control:
284✔
776
                                return
284✔
777
                        }
778
                }
779
                shard := &conn.shard[shardn]
1,871✔
780
                shard.bufmut.Lock()
1,871✔
781
                if conn.c != c {
1,887✔
782
                        conn.dirtyShard <- shardn
16✔
783
                        shard.bufmut.Unlock()
16✔
784
                        return
16✔
785
                }
16✔
786
                packet, shard.buf = shard.buf, packet
1,855✔
787
                shard.bufmut.Unlock()
1,855✔
788
                if packet.Len() == 0 {
1,855✔
789
                        continue
×
790
                }
791
                if _, err := w.Write(packet.b); err != nil {
1,855✔
792
                        err = ClientError{
×
793
                                ErrIoError,
×
794
                                fmt.Sprintf("failed to write data to the connection: %s", err),
×
795
                        }
×
796
                        conn.reconnect(err, c)
×
797
                        return
×
798
                }
×
799
                packet.Reset()
1,855✔
800
        }
801
}
802

803
func readWatchEvent(reader io.Reader) (connWatchEvent, error) {
432✔
804
        keyExist := false
432✔
805
        event := connWatchEvent{}
432✔
806

432✔
807
        d := getDecoder(reader)
432✔
808
        defer putDecoder(d)
432✔
809

432✔
810
        l, err := d.DecodeMapLen()
432✔
811
        if err != nil {
432✔
812
                return event, err
×
813
        }
×
814

815
        for ; l > 0; l-- {
970✔
816
                cd, err := d.DecodeInt()
538✔
817
                if err != nil {
538✔
818
                        return event, err
×
819
                }
×
820

821
                switch iproto.Key(cd) {
538✔
822
                case iproto.IPROTO_EVENT_KEY:
432✔
823
                        if event.key, err = d.DecodeString(); err != nil {
432✔
824
                                return event, err
×
825
                        }
×
826
                        keyExist = true
432✔
827
                case iproto.IPROTO_EVENT_DATA:
106✔
828
                        if event.value, err = d.DecodeInterface(); err != nil {
106✔
829
                                return event, err
×
830
                        }
×
831
                default:
×
832
                        if err = d.Skip(); err != nil {
×
833
                                return event, err
×
834
                        }
×
835
                }
836
        }
837

838
        if !keyExist {
432✔
839
                return event, errors.New("watch event does not have a key")
×
840
        }
×
841

842
        return event, nil
432✔
843
}
844

845
func (conn *Connection) reader(r io.Reader, c Conn) {
324✔
846
        events := make(chan connWatchEvent, 1024)
324✔
847
        defer close(events)
324✔
848

324✔
849
        go conn.eventer(events)
324✔
850

324✔
851
        for atomic.LoadUint32(&conn.state) != connClosed {
2,196✔
852
                respBytes, err := read(r, conn.lenbuf[:])
1,872✔
853
                if err != nil {
2,164✔
854
                        err = ClientError{
292✔
855
                                ErrIoError,
292✔
856
                                fmt.Sprintf("failed to read data from the connection: %s", err),
292✔
857
                        }
292✔
858
                        conn.reconnect(err, c)
292✔
859
                        return
292✔
860
                }
292✔
861
                buf := smallBuf{b: respBytes}
1,578✔
862
                header, code, err := decodeHeader(conn.dec, &buf)
1,578✔
863
                if err != nil {
1,578✔
864
                        err = ClientError{
×
865
                                ErrProtocolError,
×
866
                                fmt.Sprintf("failed to decode IPROTO header: %s", err),
×
867
                        }
×
868
                        conn.reconnect(err, c)
×
869
                        return
×
870
                }
×
871

872
                var fut *Future = nil
1,578✔
873
                if code == iproto.IPROTO_EVENT {
2,010✔
874
                        if event, err := readWatchEvent(&buf); err == nil {
864✔
875
                                events <- event
432✔
876
                        } else {
432✔
877
                                err = ClientError{
×
878
                                        ErrProtocolError,
×
879
                                        fmt.Sprintf("failed to decode IPROTO_EVENT: %s", err),
×
880
                                }
×
881
                                conn.opts.Logger.Report(LogWatchEventReadFailed, conn, err)
×
882
                        }
×
883
                        continue
432✔
884
                } else if code == iproto.IPROTO_CHUNK {
1,159✔
885
                        if fut = conn.peekFuture(header.RequestId); fut != nil {
26✔
886
                                if err := fut.AppendPush(header, &buf); err != nil {
13✔
887
                                        err = ClientError{
×
888
                                                ErrProtocolError,
×
889
                                                fmt.Sprintf("failed to append push response: %s", err),
×
890
                                        }
×
891
                                        conn.opts.Logger.Report(LogAppendPushFailed, conn, err)
×
892
                                }
×
893
                        }
894
                } else {
1,133✔
895
                        if fut = conn.fetchFuture(header.RequestId); fut != nil {
2,266✔
896
                                if err := fut.SetResponse(header, &buf); err != nil {
1,134✔
897
                                        fut.SetError(fmt.Errorf("failed to set response: %w", err))
1✔
898
                                }
1✔
899
                                conn.markDone(fut)
1,133✔
900
                        }
901
                }
902

903
                if fut == nil {
1,146✔
904
                        conn.opts.Logger.Report(LogUnexpectedResultId, conn, header)
×
905
                }
×
906
        }
907
}
908

909
// eventer goroutine gets watch events and updates values for watchers.
910
func (conn *Connection) eventer(events <-chan connWatchEvent) {
324✔
911
        for event := range events {
756✔
912
                if value, ok := conn.watchMap.Load(event.key); ok {
838✔
913
                        st := value.(chan watchState)
406✔
914
                        state := <-st
406✔
915
                        state.value = event.value
406✔
916
                        if state.version == math.MaxUint {
406✔
917
                                state.version = initWatchEventVersion + 1
×
918
                        } else {
406✔
919
                                state.version += 1
406✔
920
                        }
406✔
921
                        state.ack = false
406✔
922
                        if state.changed != nil {
812✔
923
                                close(state.changed)
406✔
924
                                state.changed = nil
406✔
925
                        }
406✔
926
                        st <- state
406✔
927
                }
928
                // It is possible to get IPROTO_EVENT after we already send
929
                // IPROTO_UNWATCH due to processing on a Tarantool side or slow
930
                // read from the network, so it looks like an expected behavior.
931
        }
932
}
933

934
func (conn *Connection) newFuture(req Request) (fut *Future) {
1,886✔
935
        ctx := req.Ctx()
1,886✔
936
        fut = NewFuture(req)
1,886✔
937
        if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
1,886✔
938
                select {
×
939
                case conn.rlimit <- struct{}{}:
×
940
                default:
×
941
                        fut.err = ClientError{
×
942
                                ErrRateLimited,
×
943
                                "Request is rate limited on client",
×
944
                        }
×
945
                        fut.ready = nil
×
946
                        fut.done = nil
×
947
                        return
×
948
                }
949
        }
950
        fut.requestId = conn.nextRequestId(ctx != nil)
1,886✔
951
        shardn := fut.requestId & (conn.opts.Concurrency - 1)
1,886✔
952
        shard := &conn.shard[shardn]
1,886✔
953
        shard.rmut.Lock()
1,886✔
954
        switch atomic.LoadUint32(&conn.state) {
1,886✔
955
        case connClosed:
5✔
956
                fut.err = ClientError{
5✔
957
                        ErrConnectionClosed,
5✔
958
                        "using closed connection",
5✔
959
                }
5✔
960
                fut.ready = nil
5✔
961
                fut.done = nil
5✔
962
                shard.rmut.Unlock()
5✔
963
                return
5✔
964
        case connDisconnected:
2✔
965
                fut.err = ClientError{
2✔
966
                        ErrConnectionNotReady,
2✔
967
                        "client connection is not ready",
2✔
968
                }
2✔
969
                fut.ready = nil
2✔
970
                fut.done = nil
2✔
971
                shard.rmut.Unlock()
2✔
972
                return
2✔
973
        case connShutdown:
5✔
974
                fut.err = ClientError{
5✔
975
                        ErrConnectionShutdown,
5✔
976
                        "server shutdown in progress",
5✔
977
                }
5✔
978
                fut.ready = nil
5✔
979
                fut.done = nil
5✔
980
                shard.rmut.Unlock()
5✔
981
                return
5✔
982
        }
983
        pos := (fut.requestId / conn.opts.Concurrency) & (requestsMap - 1)
1,874✔
984
        if ctx != nil {
1,878✔
985
                select {
4✔
986
                case <-ctx.Done():
4✔
987
                        fut.SetError(fmt.Errorf("context is done (request ID %d)", fut.requestId))
4✔
988
                        shard.rmut.Unlock()
4✔
989
                        return
4✔
990
                default:
×
991
                }
992
                shard.requestsWithCtx[pos].addFuture(fut)
×
993
        } else {
1,870✔
994
                shard.requests[pos].addFuture(fut)
1,870✔
995
                if conn.opts.Timeout > 0 {
3,731✔
996
                        fut.timeout = time.Since(epoch) + conn.opts.Timeout
1,861✔
997
                }
1,861✔
998
        }
999
        shard.rmut.Unlock()
1,870✔
1000
        if conn.rlimit != nil && conn.opts.RLimitAction == RLimitWait {
1,870✔
1001
                select {
×
1002
                case conn.rlimit <- struct{}{}:
×
1003
                default:
×
1004
                        runtime.Gosched()
×
1005
                        select {
×
1006
                        case conn.rlimit <- struct{}{}:
×
1007
                        case <-fut.done:
×
1008
                                if fut.err == nil {
×
1009
                                        panic("fut.done is closed, but err is nil")
×
1010
                                }
1011
                        }
1012
                }
1013
        }
1014
        return
1,870✔
1015
}
1016

1017
// This method removes a future from the internal queue if the context
1018
// is "done" before the response is come.
1019
func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
×
1020
        select {
×
1021
        case <-fut.done:
×
1022
        case <-ctx.Done():
×
1023
        }
1024

1025
        select {
×
1026
        case <-fut.done:
×
1027
                return
×
1028
        default:
×
1029
                conn.cancelFuture(fut, fmt.Errorf("context is done (request ID %d)", fut.requestId))
×
1030
        }
1031
}
1032

1033
func (conn *Connection) incrementRequestCnt() {
1,886✔
1034
        atomic.AddInt64(&conn.requestCnt, int64(1))
1,886✔
1035
}
1,886✔
1036

1037
func (conn *Connection) decrementRequestCnt() {
1,882✔
1038
        if atomic.AddInt64(&conn.requestCnt, int64(-1)) == 0 {
3,669✔
1039
                conn.cond.Broadcast()
1,787✔
1040
        }
1,787✔
1041
}
1042

1043
func (conn *Connection) send(req Request, streamId uint64) *Future {
1,886✔
1044
        conn.incrementRequestCnt()
1,886✔
1045

1,886✔
1046
        fut := conn.newFuture(req)
1,886✔
1047
        if fut.ready == nil {
1,898✔
1048
                conn.decrementRequestCnt()
12✔
1049
                return fut
12✔
1050
        }
12✔
1051

1052
        if req.Ctx() != nil {
1,878✔
1053
                select {
4✔
1054
                case <-req.Ctx().Done():
4✔
1055
                        conn.cancelFuture(fut, fmt.Errorf("context is done (request ID %d)", fut.requestId))
4✔
1056
                        return fut
4✔
1057
                default:
×
1058
                }
1059
                go conn.contextWatchdog(fut, req.Ctx())
×
1060
        }
1061
        conn.putFuture(fut, req, streamId)
1,870✔
1062

1,870✔
1063
        return fut
1,870✔
1064
}
1065

1066
func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
1,870✔
1067
        shardn := fut.requestId & (conn.opts.Concurrency - 1)
1,870✔
1068
        shard := &conn.shard[shardn]
1,870✔
1069
        shard.bufmut.Lock()
1,870✔
1070
        select {
1,870✔
1071
        case <-fut.done:
×
1072
                shard.bufmut.Unlock()
×
1073
                return
×
1074
        default:
1,870✔
1075
        }
1076
        firstWritten := shard.buf.Len() == 0
1,870✔
1077
        if shard.buf.Cap() == 0 {
3,474✔
1078
                shard.buf.b = make([]byte, 0, 128)
1,604✔
1079
                shard.enc = msgpack.NewEncoder(&shard.buf)
1,604✔
1080
        }
1,604✔
1081
        blen := shard.buf.Len()
1,870✔
1082
        reqid := fut.requestId
1,870✔
1083
        if err := pack(&shard.buf, shard.enc, reqid, req, streamId, conn.schemaResolver); err != nil {
1,870✔
1084
                shard.buf.Trunc(blen)
×
1085
                shard.bufmut.Unlock()
×
1086
                if f := conn.fetchFuture(reqid); f == fut {
×
1087
                        fut.SetError(err)
×
1088
                        conn.markDone(fut)
×
1089
                } else if f != nil {
×
1090
                        /* in theory, it is possible. In practice, you have
×
1091
                         * to have race condition that lasts hours */
×
1092
                        panic("Unknown future")
×
1093
                } else {
×
1094
                        fut.wait()
×
1095
                        if fut.err == nil {
×
1096
                                panic("Future removed from queue without error")
×
1097
                        }
1098
                        if _, ok := fut.err.(ClientError); ok {
×
1099
                                // packing error is more important than connection
×
1100
                                // error, because it is indication of programmer's
×
1101
                                // mistake.
×
1102
                                fut.SetError(err)
×
1103
                        }
×
1104
                }
1105
                return
×
1106
        }
1107
        shard.bufmut.Unlock()
1,870✔
1108

1,870✔
1109
        if firstWritten {
3,740✔
1110
                conn.dirtyShard <- shardn
1,870✔
1111
        }
1,870✔
1112

1113
        if req.Async() {
2,604✔
1114
                if fut = conn.fetchFuture(reqid); fut != nil {
1,458✔
1115
                        header := Header{
724✔
1116
                                RequestId: reqid,
724✔
1117
                                Error:     ErrorNo,
724✔
1118
                        }
724✔
1119
                        fut.SetResponse(header, nil)
724✔
1120
                        conn.markDone(fut)
724✔
1121
                }
724✔
1122
        }
1123
}
1124

1125
func (conn *Connection) markDone(fut *Future) {
1,870✔
1126
        if conn.rlimit != nil {
1,870✔
1127
                <-conn.rlimit
×
1128
        }
×
1129
        conn.decrementRequestCnt()
1,870✔
1130
}
1131

1132
func (conn *Connection) peekFuture(reqid uint32) (fut *Future) {
13✔
1133
        shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
13✔
1134
        pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1)
13✔
1135
        shard.rmut.Lock()
13✔
1136
        defer shard.rmut.Unlock()
13✔
1137

13✔
1138
        if conn.opts.Timeout > 0 {
26✔
1139
                if fut = conn.getFutureImp(reqid, true); fut != nil {
26✔
1140
                        pair := &shard.requests[pos]
13✔
1141
                        *pair.last = fut
13✔
1142
                        pair.last = &fut.next
13✔
1143
                        fut.timeout = time.Since(epoch) + conn.opts.Timeout
13✔
1144
                }
13✔
1145
        } else {
×
1146
                fut = conn.getFutureImp(reqid, false)
×
1147
        }
×
1148

1149
        return fut
13✔
1150
}
1151

1152
func (conn *Connection) fetchFuture(reqid uint32) (fut *Future) {
1,871✔
1153
        shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
1,871✔
1154
        shard.rmut.Lock()
1,871✔
1155
        fut = conn.getFutureImp(reqid, true)
1,871✔
1156
        shard.rmut.Unlock()
1,871✔
1157
        return fut
1,871✔
1158
}
1,871✔
1159

1160
func (conn *Connection) getFutureImp(reqid uint32, fetch bool) *Future {
1,884✔
1161
        shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
1,884✔
1162
        pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1)
1,884✔
1163
        // futures with even requests id belong to requests list with nil context
1,884✔
1164
        if reqid%2 == 0 {
3,764✔
1165
                return shard.requests[pos].findFuture(reqid, fetch)
1,880✔
1166
        } else {
1,884✔
1167
                return shard.requestsWithCtx[pos].findFuture(reqid, fetch)
4✔
1168
        }
4✔
1169
}
1170

1171
func (conn *Connection) timeouts() {
320✔
1172
        timeout := conn.opts.Timeout
320✔
1173
        t := time.NewTimer(timeout)
320✔
1174
        for {
640✔
1175
                var nowepoch time.Duration
320✔
1176
                select {
320✔
1177
                case <-conn.control:
319✔
1178
                        t.Stop()
319✔
1179
                        return
319✔
1180
                case <-t.C:
×
1181
                }
1182
                minNext := time.Since(epoch) + timeout
×
1183
                for i := range conn.shard {
×
1184
                        nowepoch = time.Since(epoch)
×
1185
                        shard := &conn.shard[i]
×
1186
                        for pos := range shard.requests {
×
1187
                                shard.rmut.Lock()
×
1188
                                pair := &shard.requests[pos]
×
1189
                                for pair.first != nil && pair.first.timeout < nowepoch {
×
1190
                                        shard.bufmut.Lock()
×
1191
                                        fut := pair.first
×
1192
                                        pair.first = fut.next
×
1193
                                        if fut.next == nil {
×
1194
                                                pair.last = &pair.first
×
1195
                                        } else {
×
1196
                                                fut.next = nil
×
1197
                                        }
×
1198
                                        fut.SetError(ClientError{
×
1199
                                                Code: ErrTimeouted,
×
1200
                                                Msg:  fmt.Sprintf("client timeout for request %d", fut.requestId),
×
1201
                                        })
×
1202
                                        conn.markDone(fut)
×
1203
                                        shard.bufmut.Unlock()
×
1204
                                }
1205
                                if pair.first != nil && pair.first.timeout < minNext {
×
1206
                                        minNext = pair.first.timeout
×
1207
                                }
×
1208
                                shard.rmut.Unlock()
×
1209
                        }
1210
                }
1211
                nowepoch = time.Since(epoch)
×
1212
                if nowepoch+time.Microsecond < minNext {
×
1213
                        t.Reset(minNext - nowepoch)
×
1214
                } else {
×
1215
                        t.Reset(time.Microsecond)
×
1216
                }
×
1217
        }
1218
}
1219

1220
func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
2,533✔
1221
        var length uint64
2,533✔
1222

2,533✔
1223
        if _, err = io.ReadFull(r, lenbuf); err != nil {
2,829✔
1224
                return
296✔
1225
        }
296✔
1226
        if lenbuf[0] != 0xce {
2,235✔
1227
                err = errors.New("wrong response header")
×
1228
                return
×
1229
        }
×
1230
        length = (uint64(lenbuf[1]) << 24) +
2,235✔
1231
                (uint64(lenbuf[2]) << 16) +
2,235✔
1232
                (uint64(lenbuf[3]) << 8) +
2,235✔
1233
                uint64(lenbuf[4])
2,235✔
1234

2,235✔
1235
        switch {
2,235✔
1236
        case length == 0:
×
1237
                err = errors.New("response should not be 0 length")
×
1238
                return
×
1239
        case length > math.MaxUint32:
×
1240
                err = errors.New("response is too big")
×
1241
                return
×
1242
        }
1243

1244
        response = make([]byte, length)
2,235✔
1245
        _, err = io.ReadFull(r, response)
2,235✔
1246

2,235✔
1247
        return
2,235✔
1248
}
1249

1250
func (conn *Connection) nextRequestId(context bool) (requestId uint32) {
1,886✔
1251
        if context {
1,890✔
1252
                return atomic.AddUint32(&conn.contextRequestId, 2)
4✔
1253
        } else {
1,886✔
1254
                return atomic.AddUint32(&conn.requestId, 2)
1,882✔
1255
        }
1,882✔
1256
}
1257

1258
// Do performs a request asynchronously on the connection.
1259
//
1260
// An error is returned if the request was formed incorrectly, or failed to
1261
// create the future.
1262
func (conn *Connection) Do(req Request) *Future {
1,839✔
1263
        if connectedReq, ok := req.(ConnectedRequest); ok {
1,845✔
1264
                if connectedReq.Conn() != conn {
7✔
1265
                        fut := NewFuture(req)
1✔
1266
                        fut.SetError(errUnknownRequest)
1✔
1267
                        return fut
1✔
1268
                }
1✔
1269
        }
1270
        return conn.send(req, ignoreStreamId)
1,838✔
1271
}
1272

1273
// ConfiguredTimeout returns a timeout from connection config.
1274
func (conn *Connection) ConfiguredTimeout() time.Duration {
×
1275
        return conn.opts.Timeout
×
1276
}
×
1277

1278
// SetSchema sets Schema for the connection.
1279
func (conn *Connection) SetSchema(s Schema) {
246✔
1280
        sCopy := s.copy()
246✔
1281
        spaceAndIndexNamesSupported :=
246✔
1282
                isFeatureInSlice(iproto.IPROTO_FEATURE_SPACE_AND_INDEX_NAMES,
246✔
1283
                        conn.serverProtocolInfo.Features)
246✔
1284

246✔
1285
        conn.mutex.Lock()
246✔
1286
        defer conn.mutex.Unlock()
246✔
1287
        conn.lockShards()
246✔
1288
        defer conn.unlockShards()
246✔
1289

246✔
1290
        conn.schemaResolver = &loadedSchemaResolver{
246✔
1291
                Schema:                      sCopy,
246✔
1292
                SpaceAndIndexNamesSupported: spaceAndIndexNamesSupported,
246✔
1293
        }
246✔
1294
}
246✔
1295

1296
// NewPrepared passes a sql statement to Tarantool for preparation synchronously.
1297
func (conn *Connection) NewPrepared(expr string) (*Prepared, error) {
1✔
1298
        req := NewPrepareRequest(expr)
1✔
1299
        resp, err := conn.Do(req).GetResponse()
1✔
1300
        if err != nil {
1✔
1301
                return nil, err
×
1302
        }
×
1303
        return NewPreparedFromResponse(conn, resp)
1✔
1304
}
1305

1306
// NewStream creates new Stream object for connection.
1307
//
1308
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
1309
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
1310
// Since 1.7.0
1311
func (conn *Connection) NewStream() (*Stream, error) {
9✔
1312
        next := atomic.AddUint64(&conn.lastStreamId, 1)
9✔
1313
        return &Stream{
9✔
1314
                Id:   next,
9✔
1315
                Conn: conn,
9✔
1316
        }, nil
9✔
1317
}
9✔
1318

1319
// watchState is the current state of the watcher. See the idea at p. 70, 105:
1320
// https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view
1321
type watchState struct {
1322
        // value is a current value.
1323
        value interface{}
1324
        // version is a current version of the value.
1325
        version uint
1326
        // ack true if the acknowledge is already sent.
1327
        ack bool
1328
        // cnt is a count of active watchers for the key.
1329
        cnt int
1330
        // changed is a channel for broadcast the value changes.
1331
        changed chan struct{}
1332
        // unready channel exists if a state is not ready to work (subscription
1333
        // or unsubscription in progress).
1334
        unready chan struct{}
1335
}
1336

1337
// initWatchEventVersion is an initial version until no events from Tarantool.
1338
const initWatchEventVersion uint = 0
1339

1340
// connWatcher is an internal implementation of the Watcher interface.
1341
type connWatcher struct {
1342
        unregister sync.Once
1343
        // done is closed when the watcher is unregistered, but the watcher
1344
        // goroutine is not yet finished.
1345
        done chan struct{}
1346
        // finished is closed when the watcher is unregistered and the watcher
1347
        // goroutine is finished.
1348
        finished chan struct{}
1349
}
1350

1351
// Unregister unregisters the connection watcher.
1352
func (w *connWatcher) Unregister() {
2,377✔
1353
        w.unregister.Do(func() {
3,755✔
1354
                close(w.done)
1,378✔
1355
        })
1,378✔
1356
        <-w.finished
2,377✔
1357
}
1358

1359
// subscribeWatchChannel returns an existing one or a new watch state channel
1360
// for the key. It also increases a counter of active watchers for the channel.
1361
func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error) {
1,382✔
1362
        var st chan watchState
1,382✔
1363

1,382✔
1364
        for st == nil {
2,764✔
1365
                if val, ok := conn.watchMap.Load(key); !ok {
1,708✔
1366
                        st = make(chan watchState, 1)
326✔
1367
                        state := watchState{
326✔
1368
                                value:   nil,
326✔
1369
                                version: initWatchEventVersion,
326✔
1370
                                ack:     false,
326✔
1371
                                cnt:     0,
326✔
1372
                                changed: nil,
326✔
1373
                                unready: make(chan struct{}),
326✔
1374
                        }
326✔
1375
                        st <- state
326✔
1376

326✔
1377
                        if val, loaded := conn.watchMap.LoadOrStore(key, st); !loaded {
652✔
1378
                                if _, err := conn.Do(newWatchRequest(key)).Get(); err != nil {
328✔
1379
                                        conn.watchMap.Delete(key)
2✔
1380
                                        close(state.unready)
2✔
1381
                                        return nil, err
2✔
1382
                                }
2✔
1383
                                // It is a successful subsctiption to a watch events by itself.
1384
                                state = <-st
324✔
1385
                                state.cnt = 1
324✔
1386
                                close(state.unready)
324✔
1387
                                state.unready = nil
324✔
1388
                                st <- state
324✔
1389
                                continue
324✔
1390
                        } else {
×
1391
                                close(state.unready)
×
1392
                                close(st)
×
1393
                                st = val.(chan watchState)
×
1394
                        }
×
1395
                } else {
1,056✔
1396
                        st = val.(chan watchState)
1,056✔
1397
                }
1,056✔
1398

1399
                // It is an existing channel created outside. It may be in the
1400
                // unready state.
1401
                state := <-st
1,056✔
1402
                if state.unready == nil {
2,112✔
1403
                        state.cnt += 1
1,056✔
1404
                }
1,056✔
1405
                st <- state
1,056✔
1406

1,056✔
1407
                if state.unready != nil {
1,056✔
1408
                        // Wait for an update and retry.
×
1409
                        <-state.unready
×
1410
                        st = nil
×
1411
                }
×
1412
        }
1413

1414
        return st, nil
1,380✔
1415
}
1416

1417
func isFeatureInSlice(expected iproto.Feature, actualSlice []iproto.Feature) bool {
1,954✔
1418
        for _, actual := range actualSlice {
10,856✔
1419
                if expected == actual {
10,846✔
1420
                        return true
1,944✔
1421
                }
1,944✔
1422
        }
1423
        return false
10✔
1424
}
1425

1426
// NewWatcher creates a new Watcher object for the connection.
1427
//
1428
// Server must support IPROTO_FEATURE_WATCHERS to use watchers.
1429
//
1430
// After watcher creation, the watcher callback is invoked for the first time.
1431
// In this case, the callback is triggered whether or not the key has already
1432
// been broadcast. All subsequent invocations are triggered with
1433
// box.broadcast() called on the remote host. If a watcher is subscribed for a
1434
// key that has not been broadcast yet, the callback is triggered only once,
1435
// after the registration of the watcher.
1436
//
1437
// The watcher callbacks are always invoked in a separate goroutine. A watcher
1438
// callback is never executed in parallel with itself, but they can be executed
1439
// in parallel to other watchers.
1440
//
1441
// If the key is updated while the watcher callback is running, the callback
1442
// will be invoked again with the latest value as soon as it returns.
1443
//
1444
// Watchers survive reconnection. All registered watchers are automatically
1445
// resubscribed when the connection is reestablished.
1446
//
1447
// Keep in mind that garbage collection of a watcher handle doesn’t lead to the
1448
// watcher’s destruction. In this case, the watcher remains registered. You
1449
// need to call Unregister() directly.
1450
//
1451
// Unregister() guarantees that there will be no the watcher's callback calls
1452
// after it, but Unregister() call from the callback leads to a deadlock.
1453
//
1454
// See:
1455
// https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_events/#box-watchers
1456
//
1457
// Since 1.10.0
1458
func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, error) {
1,066✔
1459
        // We need to check the feature because the IPROTO_WATCH request is
1,066✔
1460
        // asynchronous. We do not expect any response from a Tarantool instance
1,066✔
1461
        // That's why we can't just check the Tarantool response for an unsupported
1,066✔
1462
        // request error.
1,066✔
1463
        if !isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
1,066✔
1464
                conn.serverProtocolInfo.Features) {
1,066✔
1465
                err := fmt.Errorf("the feature %s must be supported by connection "+
×
1466
                        "to create a watcher", iproto.IPROTO_FEATURE_WATCHERS)
×
1467
                return nil, err
×
1468
        }
×
1469

1470
        return conn.newWatcherImpl(key, callback)
1,066✔
1471
}
1472

1473
func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watcher, error) {
1,382✔
1474
        st, err := subscribeWatchChannel(conn, key)
1,382✔
1475
        if err != nil {
1,384✔
1476
                return nil, err
2✔
1477
        }
2✔
1478

1479
        // Start the watcher goroutine.
1480
        done := make(chan struct{})
1,380✔
1481
        finished := make(chan struct{})
1,380✔
1482

1,380✔
1483
        go func() {
2,760✔
1484
                version := initWatchEventVersion
1,380✔
1485
                for {
3,248✔
1486
                        state := <-st
1,868✔
1487
                        if state.changed == nil {
2,598✔
1488
                                state.changed = make(chan struct{})
730✔
1489
                        }
730✔
1490
                        st <- state
1,868✔
1491

1,868✔
1492
                        if state.version != version {
3,385✔
1493
                                callback(WatchEvent{
1,517✔
1494
                                        Conn:  conn,
1,517✔
1495
                                        Key:   key,
1,517✔
1496
                                        Value: state.value,
1,517✔
1497
                                })
1,517✔
1498
                                version = state.version
1,517✔
1499

1,517✔
1500
                                // Do we need to acknowledge the notification?
1,517✔
1501
                                state = <-st
1,517✔
1502
                                sendAck := !state.ack && version == state.version
1,517✔
1503
                                if sendAck {
1,923✔
1504
                                        state.ack = true
406✔
1505
                                }
406✔
1506
                                st <- state
1,517✔
1507

1,517✔
1508
                                if sendAck {
1,923✔
1509
                                        // We expect a reconnect and re-subscribe if it fails to
406✔
1510
                                        // send the watch request. So it looks ok do not check a
406✔
1511
                                        // result. But we need to make sure that the re-watch
406✔
1512
                                        // request will not be finished by a small per-request
406✔
1513
                                        // timeout.
406✔
1514
                                        req := newWatchRequest(key).Context(context.Background())
406✔
1515
                                        conn.Do(req).Get()
406✔
1516
                                }
406✔
1517
                        }
1518

1519
                        select {
1,868✔
1520
                        case <-done:
1,378✔
1521
                                state := <-st
1,378✔
1522
                                state.cnt -= 1
1,378✔
1523
                                if state.cnt == 0 {
1,700✔
1524
                                        state.unready = make(chan struct{})
322✔
1525
                                }
322✔
1526
                                st <- state
1,378✔
1527

1,378✔
1528
                                if state.cnt == 0 {
1,700✔
1529
                                        // The last one sends IPROTO_UNWATCH.
322✔
1530
                                        if !conn.ClosedNow() {
330✔
1531
                                                // conn.ClosedNow() check is a workaround for calling
8✔
1532
                                                // Unregister from connectionClose().
8✔
1533
                                                //
8✔
1534
                                                // We need to make sure that the unwatch request will
8✔
1535
                                                // not be finished by a small per-request timeout to
8✔
1536
                                                // avoid lost of the request.
8✔
1537
                                                req := newUnwatchRequest(key).Context(context.Background())
8✔
1538
                                                conn.Do(req).Get()
8✔
1539
                                        }
8✔
1540
                                        conn.watchMap.Delete(key)
322✔
1541
                                        close(state.unready)
322✔
1542
                                }
1543

1544
                                close(finished)
1,378✔
1545
                                return
1,378✔
1546
                        case <-state.changed:
488✔
1547
                        }
1548
                }
1549
        }()
1550

1551
        return &connWatcher{
1,380✔
1552
                done:     done,
1,380✔
1553
                finished: finished,
1,380✔
1554
        }, nil
1,380✔
1555
}
1556

1557
// ProtocolInfo returns protocol version and protocol features
1558
// supported by connected Tarantool server. Beware that values might be
1559
// outdated if connection is in a disconnected state.
1560
// Since 2.0.0
1561
func (conn *Connection) ProtocolInfo() ProtocolInfo {
5✔
1562
        return conn.serverProtocolInfo.Clone()
5✔
1563
}
5✔
1564

1565
func shutdownEventCallback(event WatchEvent) {
387✔
1566
        // Receives "true" on server shutdown.
387✔
1567
        // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
387✔
1568
        // step 2.
387✔
1569
        val, ok := event.Value.(bool)
387✔
1570
        if ok && val {
455✔
1571
                go event.Conn.shutdown(false)
68✔
1572
        }
68✔
1573
}
1574

1575
func (conn *Connection) shutdown(forever bool) error {
70✔
1576
        // Forbid state changes.
70✔
1577
        conn.mutex.Lock()
70✔
1578
        defer conn.mutex.Unlock()
70✔
1579

70✔
1580
        if !atomic.CompareAndSwapUint32(&conn.state, connConnected, connShutdown) {
78✔
1581
                if forever {
8✔
1582
                        err := ClientError{ErrConnectionClosed, "connection closed by client"}
×
1583
                        return conn.closeConnection(err, true)
×
1584
                }
×
1585
                return nil
8✔
1586
        }
1587

1588
        if forever {
64✔
1589
                // We don't want to reconnect any more.
2✔
1590
                conn.opts.Reconnect = 0
2✔
1591
                conn.opts.MaxReconnects = 0
2✔
1592
        }
2✔
1593

1594
        conn.cond.Broadcast()
62✔
1595
        conn.notify(Shutdown)
62✔
1596

62✔
1597
        c := conn.c
62✔
1598
        for {
180✔
1599
                if (atomic.LoadUint32(&conn.state) != connShutdown) || (c != conn.c) {
120✔
1600
                        return nil
2✔
1601
                }
2✔
1602
                if atomic.LoadInt64(&conn.requestCnt) == 0 {
176✔
1603
                        break
60✔
1604
                }
1605
                // Use cond var on conn.mutex since request execution may
1606
                // call reconnect(). It is ok if state changes as part of
1607
                // reconnect since Tarantool server won't allow to reconnect
1608
                // in the middle of shutting down.
1609
                conn.cond.Wait()
56✔
1610
        }
1611

1612
        if forever {
61✔
1613
                err := ClientError{ErrConnectionClosed, "connection closed by client"}
1✔
1614
                return conn.closeConnection(err, true)
1✔
1615
        } else {
60✔
1616
                // Start to reconnect based on common rules, same as in net.box.
59✔
1617
                // Reconnect also closes the connection: server waits until all
59✔
1618
                // subscribed connections are terminated.
59✔
1619
                // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
59✔
1620
                // step 3.
59✔
1621
                conn.reconnectImpl(ClientError{
59✔
1622
                        ErrConnectionClosed,
59✔
1623
                        "connection closed after server shutdown",
59✔
1624
                }, conn.c)
59✔
1625
                return nil
59✔
1626
        }
59✔
1627
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc