• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / go-tarantool / 7656911175

25 Jan 2024 03:39PM UTC coverage: 74.243% (+0.05%) from 74.194%
7656911175

push

github

oleg-jukovec
pool: add Instance type

The type Instance:
```
type Instance struct {
	// Name is an unique name of the instance.
	Name   string
	// Dialer will be used to create a connection to the instance.
	Dialer tarantool.Dialer
	// Opts will be used to specify a connection options.
	Opts   tarantool.Opts
}
```

The type allows to specify a dialer and connection options per a
pool instance. It is used in `pool.Connect`, `pool.ConnectWithOpts`
and `pool.Add` to specify an instance configuration now.

Closes #356

85 of 96 new or added lines in 2 files covered. (88.54%)

180 existing lines in 4 files now uncovered.

6010 of 8095 relevant lines covered (74.24%)

10248.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.55
/connection.go
1
// Package with implementation of methods and structures for work with
2
// Tarantool instance.
3
package tarantool
4

5
import (
6
        "context"
7
        "encoding/binary"
8
        "errors"
9
        "fmt"
10
        "io"
11
        "log"
12
        "math"
13
        "net"
14
        "runtime"
15
        "sync"
16
        "sync/atomic"
17
        "time"
18

19
        "github.com/tarantool/go-iproto"
20
        "github.com/vmihailenco/msgpack/v5"
21
)
22

23
const requestsMap = 128
24
const ignoreStreamId = 0
25
const (
26
        connDisconnected = 0
27
        connConnected    = 1
28
        connShutdown     = 2
29
        connClosed       = 3
30
)
31

32
const shutdownEventKey = "box.shutdown"
33

34
type ConnEventKind int
35
type ConnLogKind int
36

37
var (
38
        errUnknownRequest = errors.New("the passed connected request doesn't belong " +
39
                "to the current connection or connection pool")
40
)
41

42
const (
43
        // Connected signals that connection is established or reestablished.
44
        Connected ConnEventKind = iota + 1
45
        // Disconnected signals that connection is broken.
46
        Disconnected
47
        // ReconnectFailed signals that attempt to reconnect has failed.
48
        ReconnectFailed
49
        // Shutdown signals that shutdown callback is processing.
50
        Shutdown
51
        // Either reconnect attempts exhausted, or explicit Close is called.
52
        Closed
53

54
        // LogReconnectFailed is logged when reconnect attempt failed.
55
        LogReconnectFailed ConnLogKind = iota + 1
56
        // LogLastReconnectFailed is logged when last reconnect attempt failed,
57
        // connection will be closed after that.
58
        LogLastReconnectFailed
59
        // LogUnexpectedResultId is logged when response with unknown id was received.
60
        // Most probably it is due to request timeout.
61
        LogUnexpectedResultId
62
        // LogWatchEventReadFailed is logged when failed to read a watch event.
63
        LogWatchEventReadFailed
64
        // LogAppendPushFailed is logged when failed to append a push response.
65
        LogAppendPushFailed
66
)
67

68
// ConnEvent is sent throw Notify channel specified in Opts.
69
type ConnEvent struct {
70
        Conn *Connection
71
        Kind ConnEventKind
72
        When time.Time
73
}
74

75
// A raw watch event.
76
type connWatchEvent struct {
77
        key   string
78
        value interface{}
79
}
80

81
var epoch = time.Now()
82

83
// Logger is logger type expected to be passed in options.
84
type Logger interface {
85
        Report(event ConnLogKind, conn *Connection, v ...interface{})
86
}
87

88
type defaultLogger struct{}
89

90
func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interface{}) {
121✔
91
        switch event {
121✔
92
        case LogReconnectFailed:
121✔
93
                reconnects := v[0].(uint)
121✔
94
                err := v[1].(error)
121✔
95
                log.Printf("tarantool: reconnect (%d/%d) to %s failed: %s",
121✔
96
                        reconnects, conn.opts.MaxReconnects, conn.Addr(), err)
121✔
97
        case LogLastReconnectFailed:
×
98
                err := v[0].(error)
×
99
                log.Printf("tarantool: last reconnect to %s failed: %s, giving it up",
×
100
                        conn.Addr(), err)
×
101
        case LogUnexpectedResultId:
×
102
                header := v[0].(Header)
×
103
                log.Printf("tarantool: connection %s got unexpected resultId (%d) in response",
×
104
                        conn.Addr(), header.RequestId)
×
105
        case LogWatchEventReadFailed:
×
106
                err := v[0].(error)
×
107
                log.Printf("tarantool: unable to parse watch event: %s", err)
×
108
        case LogAppendPushFailed:
×
109
                err := v[0].(error)
×
110
                log.Printf("tarantool: unable to append a push response: %s", err)
×
111
        default:
×
112
                args := append([]interface{}{"tarantool: unexpected event ", event, conn}, v...)
×
113
                log.Print(args...)
×
114
        }
115
}
116

117
// Connection is a handle with a single connection to a Tarantool instance.
118
//
119
// It is created and configured with Connect function, and could not be
120
// reconfigured later.
121
//
122
// Connection could be in three possible states:
123
//
124
// - In "Connected" state it sends queries to Tarantool.
125
//
126
// - In "Disconnected" state it rejects queries with ClientError{Code:
127
// ErrConnectionNotReady}
128
//
129
// - In "Shutdown" state it rejects queries with ClientError{Code:
130
// ErrConnectionShutdown}. The state indicates that a graceful shutdown
131
// in progress. The connection waits for all active requests to
132
// complete.
133
//
134
// - In "Closed" state it rejects queries with ClientError{Code:
135
// ErrConnectionClosed}. Connection could become "Closed" when
136
// Connection.Close() method called, or when Tarantool disconnected and
137
// Reconnect pause is not specified or MaxReconnects is specified and
138
// MaxReconnect reconnect attempts already performed.
139
//
140
// You may perform data manipulation operation by calling its methods:
141
// Call*, Insert*, Replace*, Update*, Upsert*, Call*, Eval*.
142
//
143
// In any method that accepts space you my pass either space number or space
144
// name (in this case it will be looked up in schema). Same is true for index.
145
//
146
// ATTENTION: tuple, key, ops and args arguments for any method should be
147
// and array or should serialize to msgpack array.
148
//
149
// ATTENTION: result argument for *Typed methods should deserialize from
150
// msgpack array, cause Tarantool always returns result as an array.
151
// For all space related methods and Call16* (but not Call17*) methods Tarantool
152
// always returns array of array (array of tuples for space related methods).
153
// For Eval* and Call* Tarantool always returns array, but does not forces
154
// array of arrays.
155
//
156
// If connected to Tarantool 2.10 or newer, connection supports server graceful
157
// shutdown. In this case, server will wait until all client requests will be
158
// finished and client disconnects before going down (server also may go down
159
// by timeout). Client reconnect will happen if connection options enable
160
// reconnect. Beware that graceful shutdown event initialization is asynchronous.
161
//
162
// More on graceful shutdown:
163
// https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
164
type Connection struct {
165
        addr   net.Addr
166
        dialer Dialer
167
        c      Conn
168
        mutex  sync.Mutex
169
        cond   *sync.Cond
170
        // schemaResolver contains a SchemaResolver implementation.
171
        schemaResolver SchemaResolver
172
        // requestId contains the last request ID for requests with nil context.
173
        requestId uint32
174
        // contextRequestId contains the last request ID for requests with context.
175
        contextRequestId uint32
176
        // Greeting contains first message sent by Tarantool.
177
        Greeting *Greeting
178

179
        shard      []connShard
180
        dirtyShard chan uint32
181

182
        control chan struct{}
183
        rlimit  chan struct{}
184
        opts    Opts
185
        state   uint32
186
        dec     *msgpack.Decoder
187
        lenbuf  [packetLengthBytes]byte
188

189
        lastStreamId uint64
190

191
        serverProtocolInfo ProtocolInfo
192
        // watchMap is a map of key -> chan watchState.
193
        watchMap sync.Map
194

195
        // shutdownWatcher is the "box.shutdown" event watcher.
196
        shutdownWatcher Watcher
197
        // requestCnt is a counter of active requests.
198
        requestCnt int64
199
}
200

201
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
202

203
type futureList struct {
204
        first *Future
205
        last  **Future
206
}
207

208
func (list *futureList) findFuture(reqid uint32, fetch bool) *Future {
3,704✔
209
        root := &list.first
3,704✔
210
        for {
7,408✔
211
                fut := *root
3,704✔
212
                if fut == nil {
3,713✔
213
                        return nil
9✔
214
                }
9✔
215
                if fut.requestId == reqid {
7,390✔
216
                        if fetch {
7,390✔
217
                                *root = fut.next
3,695✔
218
                                if fut.next == nil {
7,390✔
219
                                        list.last = root
3,695✔
220
                                } else {
3,695✔
221
                                        fut.next = nil
×
222
                                }
×
223
                        }
224
                        return fut
3,695✔
225
                }
226
                root = &fut.next
×
227
        }
228
}
229

230
func (list *futureList) addFuture(fut *Future) {
3,676✔
231
        *list.last = fut
3,676✔
232
        list.last = &fut.next
3,676✔
233
}
3,676✔
234

235
func (list *futureList) clear(err error, conn *Connection) {
5,447,680✔
236
        fut := list.first
5,447,680✔
237
        list.first = nil
5,447,680✔
238
        list.last = &list.first
5,447,680✔
239
        for fut != nil {
5,447,687✔
240
                fut.SetError(err)
7✔
241
                conn.markDone(fut)
7✔
242
                fut, fut.next = fut.next, nil
7✔
243
        }
7✔
244
}
245

246
type connShard struct {
247
        rmut            sync.Mutex
248
        requests        [requestsMap]futureList
249
        requestsWithCtx [requestsMap]futureList
250
        bufmut          sync.Mutex
251
        buf             smallWBuf
252
        enc             *msgpack.Encoder
253
}
254

255
// RLimitActions is an enumeration type for an action to do when a rate limit
256
// is reached.
257
type RLimitAction int
258

259
const (
260
        // RLimitDrop immediately aborts the request.
261
        RLimitDrop RLimitAction = iota
262
        // RLimitWait waits during timeout period for some request to be answered.
263
        // If no request answered during timeout period, this request is aborted.
264
        // If no timeout period is set, it will wait forever.
265
        RLimitWait
266
)
267

268
// Opts is a way to configure Connection
269
type Opts struct {
270
        // Timeout for response to a particular request. The timeout is reset when
271
        // push messages are received. If Timeout is zero, any request can be
272
        // blocked infinitely.
273
        // Also used to setup net.TCPConn.Set(Read|Write)Deadline.
274
        //
275
        // Pay attention, when using contexts with request objects,
276
        // the timeout option for Connection does not affect the lifetime
277
        // of the request. For those purposes use context.WithTimeout() as
278
        // the root context.
279
        Timeout time.Duration
280
        // Timeout between reconnect attempts. If Reconnect is zero, no
281
        // reconnect attempts will be made.
282
        // If specified, then when Tarantool is not reachable or disconnected,
283
        // new connect attempt is performed after pause.
284
        // By default, no reconnection attempts are performed,
285
        // so once disconnected, connection becomes Closed.
286
        Reconnect time.Duration
287
        // Maximum number of reconnect failures; after that we give it up to
288
        // on. If MaxReconnects is zero, the client will try to reconnect
289
        // endlessly.
290
        // After MaxReconnects attempts Connection becomes closed.
291
        MaxReconnects uint
292
        // RateLimit limits number of 'in-fly' request, i.e. already put into
293
        // requests queue, but not yet answered by server or timeouted.
294
        // It is disabled by default.
295
        // See RLimitAction for possible actions when RateLimit.reached.
296
        RateLimit uint
297
        // RLimitAction tells what to do when RateLimit is reached.
298
        // It is required if RateLimit is specified.
299
        RLimitAction RLimitAction
300
        // Concurrency is amount of separate mutexes for request
301
        // queues and buffers inside of connection.
302
        // It is rounded up to nearest power of 2.
303
        // By default it is runtime.GOMAXPROCS(-1) * 4
304
        Concurrency uint32
305
        // SkipSchema disables schema loading. Without disabling schema loading,
306
        // there is no way to create Connection for currently not accessible Tarantool.
307
        SkipSchema bool
308
        // Notify is a channel which receives notifications about Connection status
309
        // changes.
310
        Notify chan<- ConnEvent
311
        // Handle is user specified value, that could be retrivied with
312
        // Handle() method.
313
        Handle interface{}
314
        // Logger is user specified logger used for error messages.
315
        Logger Logger
316
}
317

318
// Connect creates and configures a new Connection.
319
func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, err error) {
675✔
320
        conn = &Connection{
675✔
321
                dialer:           dialer,
675✔
322
                requestId:        0,
675✔
323
                contextRequestId: 1,
675✔
324
                Greeting:         &Greeting{},
675✔
325
                control:          make(chan struct{}),
675✔
326
                opts:             opts,
675✔
327
                dec:              msgpack.NewDecoder(&smallBuf{}),
675✔
328
        }
675✔
329
        maxprocs := uint32(runtime.GOMAXPROCS(-1))
675✔
330
        if conn.opts.Concurrency == 0 || conn.opts.Concurrency > maxprocs*128 {
1,346✔
331
                conn.opts.Concurrency = maxprocs * 4
671✔
332
        }
671✔
333
        if c := conn.opts.Concurrency; c&(c-1) != 0 {
675✔
334
                for i := uint(1); i < 32; i *= 2 {
×
335
                        c |= c >> i
×
336
                }
×
337
                conn.opts.Concurrency = c + 1
×
338
        }
339
        conn.dirtyShard = make(chan uint32, conn.opts.Concurrency*2)
675✔
340
        conn.shard = make([]connShard, conn.opts.Concurrency)
675✔
341
        for i := range conn.shard {
11,539✔
342
                shard := &conn.shard[i]
10,864✔
343
                requestsLists := []*[requestsMap]futureList{&shard.requests, &shard.requestsWithCtx}
10,864✔
344
                for _, requests := range requestsLists {
32,592✔
345
                        for j := range requests {
2,802,912✔
346
                                requests[j].last = &requests[j].first
2,781,184✔
347
                        }
2,781,184✔
348
                }
349
        }
350

351
        if conn.opts.RateLimit > 0 {
675✔
352
                conn.rlimit = make(chan struct{}, conn.opts.RateLimit)
×
353
                if conn.opts.RLimitAction != RLimitDrop && conn.opts.RLimitAction != RLimitWait {
×
354
                        return nil, errors.New("RLimitAction should be specified to RLimitDone nor RLimitWait")
×
355
                }
×
356
        }
357

358
        if conn.opts.Logger == nil {
1,350✔
359
                conn.opts.Logger = defaultLogger{}
675✔
360
        }
675✔
361

362
        conn.cond = sync.NewCond(&conn.mutex)
675✔
363

675✔
364
        if err = conn.createConnection(ctx); err != nil {
692✔
365
                return nil, err
17✔
366
        }
17✔
367

368
        go conn.pinger()
658✔
369
        if conn.opts.Timeout > 0 {
1,314✔
370
                go conn.timeouts()
656✔
371
        }
656✔
372

373
        // TODO: reload schema after reconnect.
374
        if !conn.opts.SkipSchema {
1,241✔
375
                schema, err := GetSchema(conn)
583✔
376
                if err != nil {
642✔
377
                        conn.mutex.Lock()
59✔
378
                        defer conn.mutex.Unlock()
59✔
379
                        conn.closeConnection(err, true)
59✔
380
                        return nil, err
59✔
381
                }
59✔
382
                conn.SetSchema(schema)
524✔
383
        }
384

385
        return conn, err
599✔
386
}
387

388
// ConnectedNow reports if connection is established at the moment.
389
func (conn *Connection) ConnectedNow() bool {
10✔
390
        return atomic.LoadUint32(&conn.state) == connConnected
10✔
391
}
10✔
392

393
// ClosedNow reports if connection is closed by user or after reconnect.
394
func (conn *Connection) ClosedNow() bool {
666✔
395
        return atomic.LoadUint32(&conn.state) == connClosed
666✔
396
}
666✔
397

398
// Close closes Connection.
399
// After this method called, there is no way to reopen this Connection.
400
func (conn *Connection) Close() error {
601✔
401
        err := ClientError{ErrConnectionClosed, "connection closed by client"}
601✔
402
        conn.mutex.Lock()
601✔
403
        defer conn.mutex.Unlock()
601✔
404
        return conn.closeConnection(err, true)
601✔
405
}
601✔
406

407
// CloseGraceful closes Connection gracefully. It waits for all requests to
408
// complete.
409
// After this method called, there is no way to reopen this Connection.
410
func (conn *Connection) CloseGraceful() error {
4✔
411
        return conn.shutdown(true)
4✔
412
}
4✔
413

414
// Addr returns a configured address of Tarantool socket.
415
func (conn *Connection) Addr() net.Addr {
123✔
416
        return conn.addr
123✔
417
}
123✔
418

419
// Handle returns a user-specified handle from Opts.
420
func (conn *Connection) Handle() interface{} {
×
421
        return conn.opts.Handle
×
422
}
×
423

424
func (conn *Connection) cancelFuture(fut *Future, err error) {
8✔
425
        if fut = conn.fetchFuture(fut.requestId); fut != nil {
8✔
426
                fut.SetError(err)
×
427
                conn.markDone(fut)
×
428
        }
×
429
}
430

431
func (conn *Connection) dial(ctx context.Context) error {
800✔
432
        opts := conn.opts
800✔
433

800✔
434
        var c Conn
800✔
435
        c, err := conn.dialer.Dial(ctx, DialOpts{
800✔
436
                IoTimeout: opts.Timeout,
800✔
437
        })
800✔
438
        if err != nil {
938✔
439
                return err
138✔
440
        }
138✔
441

442
        conn.addr = c.Addr()
662✔
443
        conn.Greeting.Version = c.Greeting().Version
662✔
444
        conn.serverProtocolInfo = c.ProtocolInfo()
662✔
445

662✔
446
        spaceAndIndexNamesSupported :=
662✔
447
                isFeatureInSlice(iproto.IPROTO_FEATURE_SPACE_AND_INDEX_NAMES,
662✔
448
                        conn.serverProtocolInfo.Features)
662✔
449

662✔
450
        conn.schemaResolver = &noSchemaResolver{
662✔
451
                SpaceAndIndexNamesSupported: spaceAndIndexNamesSupported,
662✔
452
        }
662✔
453

662✔
454
        // Watchers.
662✔
455
        conn.watchMap.Range(func(key, value interface{}) bool {
668✔
456
                st := value.(chan watchState)
6✔
457
                state := <-st
6✔
458
                if state.unready != nil {
6✔
459
                        st <- state
×
460
                        return true
×
461
                }
×
462

463
                req := newWatchRequest(key.(string))
6✔
464
                if err = writeRequest(c, req); err != nil {
6✔
465
                        st <- state
×
466
                        return false
×
467
                }
×
468
                state.ack = true
6✔
469

6✔
470
                st <- state
6✔
471
                return true
6✔
472
        })
473

474
        if err != nil {
662✔
475
                c.Close()
×
476
                return fmt.Errorf("unable to register watch: %w", err)
×
477
        }
×
478

479
        // Only if connected and fully initialized.
480
        conn.lockShards()
662✔
481
        conn.c = c
662✔
482
        atomic.StoreUint32(&conn.state, connConnected)
662✔
483
        conn.cond.Broadcast()
662✔
484
        conn.unlockShards()
662✔
485
        go conn.writer(c, c)
662✔
486
        go conn.reader(c, c)
662✔
487

662✔
488
        // Subscribe shutdown event to process graceful shutdown.
662✔
489
        if conn.shutdownWatcher == nil &&
662✔
490
                isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
662✔
491
                        conn.serverProtocolInfo.Features) {
1,310✔
492
                watcher, werr := conn.newWatcherImpl(shutdownEventKey, shutdownEventCallback)
648✔
493
                if werr != nil {
648✔
494
                        return werr
×
495
                }
×
496
                conn.shutdownWatcher = watcher
648✔
497
        }
498

499
        return nil
662✔
500
}
501

502
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
503
        req Request, streamId uint64, res SchemaResolver) (err error) {
5,092✔
504
        const uint32Code = 0xce
5,092✔
505
        const uint64Code = 0xcf
5,092✔
506
        const streamBytesLenUint64 = 10
5,092✔
507
        const streamBytesLenUint32 = 6
5,092✔
508

5,092✔
509
        hl := h.Len()
5,092✔
510

5,092✔
511
        var streamBytesLen = 0
5,092✔
512
        var streamBytes [streamBytesLenUint64]byte
5,092✔
513
        hMapLen := byte(0x82) // 2 element map.
5,092✔
514
        if streamId != ignoreStreamId {
5,168✔
515
                hMapLen = byte(0x83) // 3 element map.
76✔
516
                streamBytes[0] = byte(iproto.IPROTO_STREAM_ID)
76✔
517
                if streamId > math.MaxUint32 {
80✔
518
                        streamBytesLen = streamBytesLenUint64
4✔
519
                        streamBytes[1] = uint64Code
4✔
520
                        binary.BigEndian.PutUint64(streamBytes[2:], streamId)
4✔
521
                } else {
76✔
522
                        streamBytesLen = streamBytesLenUint32
72✔
523
                        streamBytes[1] = uint32Code
72✔
524
                        binary.BigEndian.PutUint32(streamBytes[2:], uint32(streamId))
72✔
525
                }
72✔
526
        }
527

528
        hBytes := append([]byte{
5,092✔
529
                uint32Code, 0, 0, 0, 0, // Length.
5,092✔
530
                hMapLen,
5,092✔
531
                byte(iproto.IPROTO_REQUEST_TYPE), byte(req.Type()), // Request type.
5,092✔
532
                byte(iproto.IPROTO_SYNC), uint32Code,
5,092✔
533
                byte(reqid >> 24), byte(reqid >> 16),
5,092✔
534
                byte(reqid >> 8), byte(reqid),
5,092✔
535
        }, streamBytes[:streamBytesLen]...)
5,092✔
536

5,092✔
537
        h.Write(hBytes)
5,092✔
538

5,092✔
539
        if err = req.Body(res, enc); err != nil {
5,092✔
540
                return
×
541
        }
×
542

543
        l := uint32(h.Len() - 5 - hl)
5,092✔
544
        h.b[hl+1] = byte(l >> 24)
5,092✔
545
        h.b[hl+2] = byte(l >> 16)
5,092✔
546
        h.b[hl+3] = byte(l >> 8)
5,092✔
547
        h.b[hl+4] = byte(l)
5,092✔
548

5,092✔
549
        return
5,092✔
550
}
551

552
func (conn *Connection) createConnection(ctx context.Context) error {
917✔
553
        var err error
917✔
554
        if conn.c == nil && conn.state == connDisconnected {
1,717✔
555
                if err = conn.dial(ctx); err == nil {
1,462✔
556
                        conn.notify(Connected)
662✔
557
                        return nil
662✔
558
                }
662✔
559
        }
560
        if conn.state == connClosed {
372✔
561
                err = ClientError{ErrConnectionClosed, "using closed connection"}
117✔
562
        }
117✔
563
        return err
255✔
564
}
565

566
func (conn *Connection) closeConnection(neterr error, forever bool) (err error) {
1,324✔
567
        conn.lockShards()
1,324✔
568
        defer conn.unlockShards()
1,324✔
569
        if forever {
2,527✔
570
                if conn.state != connClosed {
1,857✔
571
                        close(conn.control)
654✔
572
                        atomic.StoreUint32(&conn.state, connClosed)
654✔
573
                        conn.cond.Broadcast()
654✔
574
                        // Free the resources.
654✔
575
                        if conn.shutdownWatcher != nil {
1,298✔
576
                                go conn.shutdownWatcher.Unregister()
644✔
577
                                conn.shutdownWatcher = nil
644✔
578
                        }
644✔
579
                        conn.notify(Closed)
654✔
580
                }
581
        } else {
121✔
582
                atomic.StoreUint32(&conn.state, connDisconnected)
121✔
583
                conn.cond.Broadcast()
121✔
584
                conn.notify(Disconnected)
121✔
585
        }
121✔
586
        if conn.c != nil {
1,982✔
587
                err = conn.c.Close()
658✔
588
                conn.c = nil
658✔
589
        }
658✔
590
        for i := range conn.shard {
22,604✔
591
                conn.shard[i].buf.Reset()
21,280✔
592
                requestsLists := []*[requestsMap]futureList{
21,280✔
593
                        &conn.shard[i].requests,
21,280✔
594
                        &conn.shard[i].requestsWithCtx,
21,280✔
595
                }
21,280✔
596
                for _, requests := range requestsLists {
63,840✔
597
                        for pos := range requests {
5,490,240✔
598
                                requests[pos].clear(neterr, conn)
5,447,680✔
599
                        }
5,447,680✔
600
                }
601
        }
602
        return
1,324✔
603
}
604

605
func (conn *Connection) getDialTimeout() time.Duration {
121✔
606
        dialTimeout := conn.opts.Reconnect / 2
121✔
607
        if dialTimeout == 0 {
121✔
608
                dialTimeout = 500 * time.Millisecond
×
609
        } else if dialTimeout > 5*time.Second {
121✔
610
                dialTimeout = 5 * time.Second
×
611
        }
×
612
        return dialTimeout
121✔
613
}
614

615
func (conn *Connection) runReconnects() error {
121✔
616
        dialTimeout := conn.getDialTimeout()
121✔
617
        var reconnects uint
121✔
618
        var err error
121✔
619

121✔
620
        for conn.opts.MaxReconnects == 0 || reconnects <= conn.opts.MaxReconnects {
363✔
621
                now := time.Now()
242✔
622

242✔
623
                ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
242✔
624
                err = conn.createConnection(ctx)
242✔
625
                cancel()
242✔
626

242✔
627
                if err != nil {
480✔
628
                        if clientErr, ok := err.(ClientError); ok &&
238✔
629
                                clientErr.Code == ErrConnectionClosed {
355✔
630
                                return err
117✔
631
                        }
117✔
632
                } else {
4✔
633
                        return nil
4✔
634
                }
4✔
635

636
                conn.opts.Logger.Report(LogReconnectFailed, conn, reconnects, err)
121✔
637
                conn.notify(ReconnectFailed)
121✔
638
                reconnects++
121✔
639
                conn.mutex.Unlock()
121✔
640

121✔
641
                time.Sleep(time.Until(now.Add(conn.opts.Reconnect)))
121✔
642

121✔
643
                conn.mutex.Lock()
121✔
644
        }
645

646
        conn.opts.Logger.Report(LogLastReconnectFailed, conn, err)
×
647
        // mark connection as closed to avoid reopening by another goroutine
×
648
        return ClientError{ErrConnectionClosed, "last reconnect failed"}
×
649
}
650

651
func (conn *Connection) reconnectImpl(neterr error, c Conn) {
753✔
652
        if conn.opts.Reconnect > 0 {
1,082✔
653
                if c == conn.c {
450✔
654
                        conn.closeConnection(neterr, false)
121✔
655
                        if err := conn.runReconnects(); err != nil {
238✔
656
                                conn.closeConnection(err, true)
117✔
657
                        }
117✔
658
                }
659
        } else {
424✔
660
                conn.closeConnection(neterr, true)
424✔
661
        }
424✔
662
}
663

664
func (conn *Connection) reconnect(neterr error, c Conn) {
636✔
665
        conn.mutex.Lock()
636✔
666
        defer conn.mutex.Unlock()
636✔
667
        conn.reconnectImpl(neterr, c)
636✔
668
        conn.cond.Broadcast()
636✔
669
}
636✔
670

671
func (conn *Connection) lockShards() {
2,512✔
672
        for i := range conn.shard {
42,928✔
673
                conn.shard[i].rmut.Lock()
40,416✔
674
                conn.shard[i].bufmut.Lock()
40,416✔
675
        }
40,416✔
676
}
677

678
func (conn *Connection) unlockShards() {
2,512✔
679
        for i := range conn.shard {
42,928✔
680
                conn.shard[i].rmut.Unlock()
40,416✔
681
                conn.shard[i].bufmut.Unlock()
40,416✔
682
        }
40,416✔
683
}
684

685
func (conn *Connection) pinger() {
658✔
686
        to := conn.opts.Timeout
658✔
687
        if to == 0 {
660✔
688
                to = 3 * time.Second
2✔
689
        }
2✔
690
        t := time.NewTicker(to / 3)
658✔
691
        defer t.Stop()
658✔
692
        for {
1,318✔
693
                select {
660✔
694
                case <-conn.control:
654✔
695
                        return
654✔
696
                case <-t.C:
2✔
697
                }
698
                conn.Ping()
2✔
699
        }
700
}
701

702
func (conn *Connection) notify(kind ConnEventKind) {
1,684✔
703
        if conn.opts.Notify != nil {
1,684✔
704
                select {
×
705
                case conn.opts.Notify <- ConnEvent{Kind: kind, Conn: conn, When: time.Now()}:
×
706
                default:
×
707
                }
708
        }
709
}
710

711
func (conn *Connection) writer(w writeFlusher, c Conn) {
662✔
712
        var shardn uint32
662✔
713
        var packet smallWBuf
662✔
714
        for atomic.LoadUint32(&conn.state) != connClosed {
4,955✔
715
                select {
4,293✔
716
                case shardn = <-conn.dirtyShard:
1,039✔
717
                default:
3,254✔
718
                        runtime.Gosched()
3,254✔
719
                        if len(conn.dirtyShard) == 0 {
6,374✔
720
                                if err := w.Flush(); err != nil {
3,146✔
721
                                        err = ClientError{
26✔
722
                                                ErrIoError,
26✔
723
                                                fmt.Sprintf("failed to flush data to the connection: %s", err),
26✔
724
                                        }
26✔
725
                                        conn.reconnect(err, c)
26✔
726
                                        return
26✔
727
                                }
26✔
728
                        }
729
                        select {
3,228✔
730
                        case shardn = <-conn.dirtyShard:
2,627✔
731
                        case <-conn.control:
597✔
732
                                return
597✔
733
                        }
734
                }
735
                shard := &conn.shard[shardn]
3,666✔
736
                shard.bufmut.Lock()
3,666✔
737
                if conn.c != c {
3,686✔
738
                        conn.dirtyShard <- shardn
20✔
739
                        shard.bufmut.Unlock()
20✔
740
                        return
20✔
741
                }
20✔
742
                packet, shard.buf = shard.buf, packet
3,646✔
743
                shard.bufmut.Unlock()
3,646✔
744
                if packet.Len() == 0 {
3,646✔
745
                        continue
×
746
                }
747
                if _, err := w.Write(packet.b); err != nil {
3,646✔
748
                        err = ClientError{
×
749
                                ErrIoError,
×
750
                                fmt.Sprintf("failed to write data to the connection: %s", err),
×
751
                        }
×
752
                        conn.reconnect(err, c)
×
753
                        return
×
754
                }
×
755
                packet.Reset()
3,646✔
756
        }
757
}
758

759
func readWatchEvent(reader io.Reader) (connWatchEvent, error) {
826✔
760
        keyExist := false
826✔
761
        event := connWatchEvent{}
826✔
762
        d := msgpack.NewDecoder(reader)
826✔
763

826✔
764
        l, err := d.DecodeMapLen()
826✔
765
        if err != nil {
826✔
766
                return event, err
×
767
        }
×
768

769
        for ; l > 0; l-- {
1,828✔
770
                cd, err := d.DecodeInt()
1,002✔
771
                if err != nil {
1,002✔
772
                        return event, err
×
773
                }
×
774

775
                switch iproto.Key(cd) {
1,002✔
776
                case iproto.IPROTO_EVENT_KEY:
826✔
777
                        if event.key, err = d.DecodeString(); err != nil {
826✔
778
                                return event, err
×
779
                        }
×
780
                        keyExist = true
826✔
781
                case iproto.IPROTO_EVENT_DATA:
176✔
782
                        if event.value, err = d.DecodeInterface(); err != nil {
176✔
783
                                return event, err
×
784
                        }
×
785
                default:
×
786
                        if err = d.Skip(); err != nil {
×
787
                                return event, err
×
788
                        }
×
789
                }
790
        }
791

792
        if !keyExist {
826✔
793
                return event, errors.New("watch event does not have a key")
×
794
        }
×
795

796
        return event, nil
826✔
797
}
798

799
func (conn *Connection) reader(r io.Reader, c Conn) {
662✔
800
        events := make(chan connWatchEvent, 1024)
662✔
801
        defer close(events)
662✔
802

662✔
803
        go conn.eventer(events)
662✔
804

662✔
805
        for atomic.LoadUint32(&conn.state) != connClosed {
4,356✔
806
                respBytes, err := read(r, conn.lenbuf[:])
3,694✔
807
                if err != nil {
4,304✔
808
                        err = ClientError{
610✔
809
                                ErrIoError,
610✔
810
                                fmt.Sprintf("failed to read data from the connection: %s", err),
610✔
811
                        }
610✔
812
                        conn.reconnect(err, c)
610✔
813
                        return
610✔
814
                }
610✔
815
                buf := smallBuf{b: respBytes}
3,080✔
816
                header, code, err := decodeHeader(conn.dec, &buf)
3,080✔
817
                if err != nil {
3,080✔
818
                        err = ClientError{
×
819
                                ErrProtocolError,
×
820
                                fmt.Sprintf("failed to decode IPROTO header: %s", err),
×
821
                        }
×
822
                        conn.reconnect(err, c)
×
823
                        return
×
824
                }
×
825

826
                var fut *Future = nil
3,080✔
827
                if code == iproto.IPROTO_EVENT {
3,906✔
828
                        if event, err := readWatchEvent(&buf); err == nil {
1,652✔
829
                                events <- event
826✔
830
                        } else {
826✔
831
                                err = ClientError{
×
832
                                        ErrProtocolError,
×
833
                                        fmt.Sprintf("failed to decode IPROTO_EVENT: %s", err),
×
834
                                }
×
835
                                conn.opts.Logger.Report(LogWatchEventReadFailed, conn, err)
×
836
                        }
×
837
                        continue
826✔
838
                } else if code == iproto.IPROTO_CHUNK {
2,280✔
839
                        if fut = conn.peekFuture(header.RequestId); fut != nil {
52✔
840
                                if err := fut.AppendPush(header, &buf); err != nil {
26✔
841
                                        err = ClientError{
×
842
                                                ErrProtocolError,
×
843
                                                fmt.Sprintf("failed to append push response: %s", err),
×
844
                                        }
×
845
                                        conn.opts.Logger.Report(LogAppendPushFailed, conn, err)
×
846
                                }
×
847
                        }
848
                } else {
2,228✔
849
                        if fut = conn.fetchFuture(header.RequestId); fut != nil {
4,456✔
850
                                if err := fut.SetResponse(header, &buf); err != nil {
2,230✔
851
                                        fut.SetError(fmt.Errorf("failed to set response: %w", err))
2✔
852
                                }
2✔
853
                                conn.markDone(fut)
2,228✔
854
                        }
855
                }
856

857
                if fut == nil {
2,254✔
858
                        conn.opts.Logger.Report(LogUnexpectedResultId, conn, header)
×
859
                }
×
860
        }
861
}
862

863
// eventer goroutine gets watch events and updates values for watchers.
864
func (conn *Connection) eventer(events <-chan connWatchEvent) {
662✔
865
        for event := range events {
1,488✔
866
                if value, ok := conn.watchMap.Load(event.key); ok {
1,627✔
867
                        st := value.(chan watchState)
801✔
868
                        state := <-st
801✔
869
                        state.value = event.value
801✔
870
                        if state.version == math.MaxUint64 {
801✔
871
                                state.version = initWatchEventVersion + 1
×
872
                        } else {
801✔
873
                                state.version += 1
801✔
874
                        }
801✔
875
                        state.ack = false
801✔
876
                        if state.changed != nil {
1,602✔
877
                                close(state.changed)
801✔
878
                                state.changed = nil
801✔
879
                        }
801✔
880
                        st <- state
801✔
881
                }
882
                // It is possible to get IPROTO_EVENT after we already send
883
                // IPROTO_UNWATCH due to processing on a Tarantool side or slow
884
                // read from the network, so it looks like an expected behavior.
885
        }
886
}
887

888
func (conn *Connection) newFuture(req Request) (fut *Future) {
3,733✔
889
        ctx := req.Ctx()
3,733✔
890
        fut = NewFuture(req)
3,733✔
891
        if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
3,733✔
892
                select {
×
893
                case conn.rlimit <- struct{}{}:
×
894
                default:
×
895
                        fut.err = ClientError{
×
896
                                ErrRateLimited,
×
897
                                "Request is rate limited on client",
×
898
                        }
×
899
                        fut.ready = nil
×
900
                        fut.done = nil
×
901
                        return
×
902
                }
903
        }
904
        fut.requestId = conn.nextRequestId(ctx != nil)
3,733✔
905
        shardn := fut.requestId & (conn.opts.Concurrency - 1)
3,733✔
906
        shard := &conn.shard[shardn]
3,733✔
907
        shard.rmut.Lock()
3,733✔
908
        switch atomic.LoadUint32(&conn.state) {
3,733✔
909
        case connClosed:
41✔
910
                fut.err = ClientError{
41✔
911
                        ErrConnectionClosed,
41✔
912
                        "using closed connection",
41✔
913
                }
41✔
914
                fut.ready = nil
41✔
915
                fut.done = nil
41✔
916
                shard.rmut.Unlock()
41✔
917
                return
41✔
918
        case connDisconnected:
×
919
                fut.err = ClientError{
×
920
                        ErrConnectionNotReady,
×
921
                        "client connection is not ready",
×
922
                }
×
923
                fut.ready = nil
×
924
                fut.done = nil
×
925
                shard.rmut.Unlock()
×
926
                return
×
927
        case connShutdown:
8✔
928
                fut.err = ClientError{
8✔
929
                        ErrConnectionShutdown,
8✔
930
                        "server shutdown in progress",
8✔
931
                }
8✔
932
                fut.ready = nil
8✔
933
                fut.done = nil
8✔
934
                shard.rmut.Unlock()
8✔
935
                return
8✔
936
        }
937
        pos := (fut.requestId / conn.opts.Concurrency) & (requestsMap - 1)
3,684✔
938
        if ctx != nil {
3,692✔
939
                select {
8✔
940
                case <-ctx.Done():
8✔
941
                        fut.SetError(fmt.Errorf("context is done"))
8✔
942
                        shard.rmut.Unlock()
8✔
943
                        return
8✔
944
                default:
×
945
                }
946
                shard.requestsWithCtx[pos].addFuture(fut)
×
947
        } else {
3,676✔
948
                shard.requests[pos].addFuture(fut)
3,676✔
949
                if conn.opts.Timeout > 0 {
7,334✔
950
                        fut.timeout = time.Since(epoch) + conn.opts.Timeout
3,658✔
951
                }
3,658✔
952
        }
953
        shard.rmut.Unlock()
3,676✔
954
        if conn.rlimit != nil && conn.opts.RLimitAction == RLimitWait {
3,676✔
955
                select {
×
956
                case conn.rlimit <- struct{}{}:
×
957
                default:
×
958
                        runtime.Gosched()
×
959
                        select {
×
960
                        case conn.rlimit <- struct{}{}:
×
961
                        case <-fut.done:
×
962
                                if fut.err == nil {
×
963
                                        panic("fut.done is closed, but err is nil")
×
964
                                }
965
                        }
966
                }
967
        }
968
        return
3,676✔
969
}
970

971
// This method removes a future from the internal queue if the context
972
// is "done" before the response is come.
973
func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
×
974
        select {
×
975
        case <-fut.done:
×
976
        case <-ctx.Done():
×
977
        }
978

979
        select {
×
980
        case <-fut.done:
×
981
                return
×
982
        default:
×
983
                conn.cancelFuture(fut, fmt.Errorf("context is done"))
×
984
        }
985
}
986

987
func (conn *Connection) incrementRequestCnt() {
3,733✔
988
        atomic.AddInt64(&conn.requestCnt, int64(1))
3,733✔
989
}
3,733✔
990

991
func (conn *Connection) decrementRequestCnt() {
3,725✔
992
        if atomic.AddInt64(&conn.requestCnt, int64(-1)) == 0 {
7,251✔
993
                conn.cond.Broadcast()
3,526✔
994
        }
3,526✔
995
}
996

997
func (conn *Connection) send(req Request, streamId uint64) *Future {
3,733✔
998
        conn.incrementRequestCnt()
3,733✔
999

3,733✔
1000
        fut := conn.newFuture(req)
3,733✔
1001
        if fut.ready == nil {
3,782✔
1002
                conn.decrementRequestCnt()
49✔
1003
                return fut
49✔
1004
        }
49✔
1005

1006
        if req.Ctx() != nil {
3,692✔
1007
                select {
8✔
1008
                case <-req.Ctx().Done():
8✔
1009
                        conn.cancelFuture(fut, fmt.Errorf("context is done"))
8✔
1010
                        return fut
8✔
1011
                default:
×
1012
                }
1013
                go conn.contextWatchdog(fut, req.Ctx())
×
1014
        }
1015
        conn.putFuture(fut, req, streamId)
3,676✔
1016

3,676✔
1017
        return fut
3,676✔
1018
}
1019

1020
func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
3,676✔
1021
        shardn := fut.requestId & (conn.opts.Concurrency - 1)
3,676✔
1022
        shard := &conn.shard[shardn]
3,676✔
1023
        shard.bufmut.Lock()
3,676✔
1024
        select {
3,676✔
1025
        case <-fut.done:
×
1026
                shard.bufmut.Unlock()
×
1027
                return
×
1028
        default:
3,676✔
1029
        }
1030
        firstWritten := shard.buf.Len() == 0
3,676✔
1031
        if shard.buf.Cap() == 0 {
6,818✔
1032
                shard.buf.b = make([]byte, 0, 128)
3,142✔
1033
                shard.enc = msgpack.NewEncoder(&shard.buf)
3,142✔
1034
        }
3,142✔
1035
        blen := shard.buf.Len()
3,676✔
1036
        reqid := fut.requestId
3,676✔
1037
        if err := pack(&shard.buf, shard.enc, reqid, req, streamId, conn.schemaResolver); err != nil {
3,676✔
1038
                shard.buf.Trunc(blen)
×
1039
                shard.bufmut.Unlock()
×
1040
                if f := conn.fetchFuture(reqid); f == fut {
×
1041
                        fut.SetError(err)
×
1042
                        conn.markDone(fut)
×
1043
                } else if f != nil {
×
1044
                        /* in theory, it is possible. In practice, you have
×
1045
                         * to have race condition that lasts hours */
×
1046
                        panic("Unknown future")
×
1047
                } else {
×
1048
                        fut.wait()
×
1049
                        if fut.err == nil {
×
1050
                                panic("Future removed from queue without error")
×
1051
                        }
1052
                        if _, ok := fut.err.(ClientError); ok {
×
1053
                                // packing error is more important than connection
×
1054
                                // error, because it is indication of programmer's
×
1055
                                // mistake.
×
1056
                                fut.SetError(err)
×
1057
                        }
×
1058
                }
1059
                return
×
1060
        }
1061
        shard.bufmut.Unlock()
3,676✔
1062

3,676✔
1063
        if firstWritten {
7,352✔
1064
                conn.dirtyShard <- shardn
3,676✔
1065
        }
3,676✔
1066

1067
        if req.Async() {
5,118✔
1068
                if fut = conn.fetchFuture(reqid); fut != nil {
2,883✔
1069
                        header := Header{
1,441✔
1070
                                RequestId: reqid,
1,441✔
1071
                                Error:     ErrorNo,
1,441✔
1072
                        }
1,441✔
1073
                        fut.SetResponse(header, nil)
1,441✔
1074
                        conn.markDone(fut)
1,441✔
1075
                }
1,441✔
1076
        }
1077
}
1078

1079
func (conn *Connection) markDone(fut *Future) {
3,676✔
1080
        if conn.rlimit != nil {
3,676✔
1081
                <-conn.rlimit
×
1082
        }
×
1083
        conn.decrementRequestCnt()
3,676✔
1084
}
1085

1086
func (conn *Connection) peekFuture(reqid uint32) (fut *Future) {
26✔
1087
        shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
26✔
1088
        pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1)
26✔
1089
        shard.rmut.Lock()
26✔
1090
        defer shard.rmut.Unlock()
26✔
1091

26✔
1092
        if conn.opts.Timeout > 0 {
52✔
1093
                if fut = conn.getFutureImp(reqid, true); fut != nil {
52✔
1094
                        pair := &shard.requests[pos]
26✔
1095
                        *pair.last = fut
26✔
1096
                        pair.last = &fut.next
26✔
1097
                        fut.timeout = time.Since(epoch) + conn.opts.Timeout
26✔
1098
                }
26✔
1099
        } else {
×
1100
                fut = conn.getFutureImp(reqid, false)
×
1101
        }
×
1102

1103
        return fut
26✔
1104
}
1105

1106
func (conn *Connection) fetchFuture(reqid uint32) (fut *Future) {
3,678✔
1107
        shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
3,678✔
1108
        shard.rmut.Lock()
3,678✔
1109
        fut = conn.getFutureImp(reqid, true)
3,678✔
1110
        shard.rmut.Unlock()
3,678✔
1111
        return fut
3,678✔
1112
}
3,678✔
1113

1114
func (conn *Connection) getFutureImp(reqid uint32, fetch bool) *Future {
3,704✔
1115
        shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
3,704✔
1116
        pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1)
3,704✔
1117
        // futures with even requests id belong to requests list with nil context
3,704✔
1118
        if reqid%2 == 0 {
7,400✔
1119
                return shard.requests[pos].findFuture(reqid, fetch)
3,696✔
1120
        } else {
3,704✔
1121
                return shard.requestsWithCtx[pos].findFuture(reqid, fetch)
8✔
1122
        }
8✔
1123
}
1124

1125
func (conn *Connection) timeouts() {
656✔
1126
        timeout := conn.opts.Timeout
656✔
1127
        t := time.NewTimer(timeout)
656✔
1128
        for {
1,312✔
1129
                var nowepoch time.Duration
656✔
1130
                select {
656✔
1131
                case <-conn.control:
654✔
1132
                        t.Stop()
654✔
1133
                        return
654✔
1134
                case <-t.C:
×
1135
                }
1136
                minNext := time.Since(epoch) + timeout
×
1137
                for i := range conn.shard {
×
1138
                        nowepoch = time.Since(epoch)
×
1139
                        shard := &conn.shard[i]
×
1140
                        for pos := range shard.requests {
×
1141
                                shard.rmut.Lock()
×
1142
                                pair := &shard.requests[pos]
×
1143
                                for pair.first != nil && pair.first.timeout < nowepoch {
×
1144
                                        shard.bufmut.Lock()
×
1145
                                        fut := pair.first
×
1146
                                        pair.first = fut.next
×
1147
                                        if fut.next == nil {
×
1148
                                                pair.last = &pair.first
×
1149
                                        } else {
×
1150
                                                fut.next = nil
×
1151
                                        }
×
1152
                                        fut.SetError(ClientError{
×
1153
                                                Code: ErrTimeouted,
×
1154
                                                Msg:  fmt.Sprintf("client timeout for request %d", fut.requestId),
×
1155
                                        })
×
1156
                                        conn.markDone(fut)
×
1157
                                        shard.bufmut.Unlock()
×
1158
                                }
1159
                                if pair.first != nil && pair.first.timeout < minNext {
×
1160
                                        minNext = pair.first.timeout
×
1161
                                }
×
1162
                                shard.rmut.Unlock()
×
1163
                        }
1164
                }
1165
                nowepoch = time.Since(epoch)
×
1166
                if nowepoch+time.Microsecond < minNext {
×
1167
                        t.Reset(minNext - nowepoch)
×
1168
                } else {
×
1169
                        t.Reset(time.Microsecond)
×
1170
                }
×
1171
        }
1172
}
1173

1174
func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
5,104✔
1175
        var length uint64
5,104✔
1176

5,104✔
1177
        if _, err = io.ReadFull(r, lenbuf); err != nil {
5,724✔
1178
                return
620✔
1179
        }
620✔
1180
        if lenbuf[0] != 0xce {
4,480✔
1181
                err = errors.New("wrong response header")
×
1182
                return
×
1183
        }
×
1184
        length = (uint64(lenbuf[1]) << 24) +
4,480✔
1185
                (uint64(lenbuf[2]) << 16) +
4,480✔
1186
                (uint64(lenbuf[3]) << 8) +
4,480✔
1187
                uint64(lenbuf[4])
4,480✔
1188

4,480✔
1189
        switch {
4,480✔
1190
        case length == 0:
×
1191
                err = errors.New("response should not be 0 length")
×
1192
                return
×
1193
        case length > math.MaxUint32:
×
1194
                err = errors.New("response is too big")
×
1195
                return
×
1196
        }
1197

1198
        response = make([]byte, length)
4,480✔
1199
        _, err = io.ReadFull(r, response)
4,480✔
1200

4,480✔
1201
        return
4,480✔
1202
}
1203

1204
func (conn *Connection) nextRequestId(context bool) (requestId uint32) {
3,733✔
1205
        if context {
3,741✔
1206
                return atomic.AddUint32(&conn.contextRequestId, 2)
8✔
1207
        } else {
3,733✔
1208
                return atomic.AddUint32(&conn.requestId, 2)
3,725✔
1209
        }
3,725✔
1210
}
1211

1212
// Do performs a request asynchronously on the connection.
1213
//
1214
// An error is returned if the request was formed incorrectly, or failed to
1215
// create the future.
1216
func (conn *Connection) Do(req Request) *Future {
3,657✔
1217
        if connectedReq, ok := req.(ConnectedRequest); ok {
3,669✔
1218
                if connectedReq.Conn() != conn {
14✔
1219
                        fut := NewFuture(req)
2✔
1220
                        fut.SetError(errUnknownRequest)
2✔
1221
                        return fut
2✔
1222
                }
2✔
1223
        }
1224
        return conn.send(req, ignoreStreamId)
3,655✔
1225
}
1226

1227
// ConfiguredTimeout returns a timeout from connection config.
1228
func (conn *Connection) ConfiguredTimeout() time.Duration {
×
1229
        return conn.opts.Timeout
×
1230
}
×
1231

1232
// SetSchema sets Schema for the connection.
1233
func (conn *Connection) SetSchema(s Schema) {
526✔
1234
        sCopy := s.copy()
526✔
1235
        spaceAndIndexNamesSupported :=
526✔
1236
                isFeatureInSlice(iproto.IPROTO_FEATURE_SPACE_AND_INDEX_NAMES,
526✔
1237
                        conn.serverProtocolInfo.Features)
526✔
1238

526✔
1239
        conn.mutex.Lock()
526✔
1240
        defer conn.mutex.Unlock()
526✔
1241
        conn.lockShards()
526✔
1242
        defer conn.unlockShards()
526✔
1243

526✔
1244
        conn.schemaResolver = &loadedSchemaResolver{
526✔
1245
                Schema:                      sCopy,
526✔
1246
                SpaceAndIndexNamesSupported: spaceAndIndexNamesSupported,
526✔
1247
        }
526✔
1248
}
526✔
1249

1250
// NewPrepared passes a sql statement to Tarantool for preparation synchronously.
1251
func (conn *Connection) NewPrepared(expr string) (*Prepared, error) {
2✔
1252
        req := NewPrepareRequest(expr)
2✔
1253
        resp, err := conn.Do(req).GetResponse()
2✔
1254
        if err != nil {
2✔
1255
                return nil, err
×
1256
        }
×
1257
        return NewPreparedFromResponse(conn, resp)
2✔
1258
}
1259

1260
// NewStream creates new Stream object for connection.
1261
//
1262
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
1263
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
1264
// Since 1.7.0
1265
func (conn *Connection) NewStream() (*Stream, error) {
12✔
1266
        next := atomic.AddUint64(&conn.lastStreamId, 1)
12✔
1267
        return &Stream{
12✔
1268
                Id:   next,
12✔
1269
                Conn: conn,
12✔
1270
        }, nil
12✔
1271
}
12✔
1272

1273
// watchState is the current state of the watcher. See the idea at p. 70, 105:
1274
// https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view
1275
type watchState struct {
1276
        // value is a current value.
1277
        value interface{}
1278
        // version is a current version of the value. The only reason for uint64:
1279
        // go 1.13 has no math.Uint.
1280
        version uint64
1281
        // ack true if the acknowledge is already sent.
1282
        ack bool
1283
        // cnt is a count of active watchers for the key.
1284
        cnt int
1285
        // changed is a channel for broadcast the value changes.
1286
        changed chan struct{}
1287
        // unready channel exists if a state is not ready to work (subscription
1288
        // or unsubscription in progress).
1289
        unready chan struct{}
1290
}
1291

1292
// initWatchEventVersion is an initial version until no events from Tarantool.
1293
const initWatchEventVersion uint64 = 0
1294

1295
// connWatcher is an internal implementation of the Watcher interface.
1296
type connWatcher struct {
1297
        unregister sync.Once
1298
        // done is closed when the watcher is unregistered, but the watcher
1299
        // goroutine is not yet finished.
1300
        done chan struct{}
1301
        // finished is closed when the watcher is unregistered and the watcher
1302
        // goroutine is finished.
1303
        finished chan struct{}
1304
}
1305

1306
// Unregister unregisters the connection watcher.
1307
func (w *connWatcher) Unregister() {
4,770✔
1308
        w.unregister.Do(func() {
7,542✔
1309
                close(w.done)
2,772✔
1310
        })
2,772✔
1311
        <-w.finished
4,770✔
1312
}
1313

1314
// subscribeWatchChannel returns an existing one or a new watch state channel
1315
// for the key. It also increases a counter of active watchers for the channel.
1316
func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error) {
2,776✔
1317
        var st chan watchState
2,776✔
1318

2,776✔
1319
        for st == nil {
5,554✔
1320
                if val, ok := conn.watchMap.Load(key); !ok {
3,444✔
1321
                        st = make(chan watchState, 1)
666✔
1322
                        state := watchState{
666✔
1323
                                value:   nil,
666✔
1324
                                version: initWatchEventVersion,
666✔
1325
                                ack:     false,
666✔
1326
                                cnt:     0,
666✔
1327
                                changed: nil,
666✔
1328
                                unready: make(chan struct{}),
666✔
1329
                        }
666✔
1330
                        st <- state
666✔
1331

666✔
1332
                        if val, loaded := conn.watchMap.LoadOrStore(key, st); !loaded {
1,330✔
1333
                                if _, err := conn.Do(newWatchRequest(key)).Get(); err != nil {
664✔
1334
                                        conn.watchMap.Delete(key)
×
1335
                                        close(state.unready)
×
1336
                                        return nil, err
×
1337
                                }
×
1338
                                // It is a successful subsctiption to a watch events by itself.
1339
                                state = <-st
664✔
1340
                                state.cnt = 1
664✔
1341
                                close(state.unready)
664✔
1342
                                state.unready = nil
664✔
1343
                                st <- state
664✔
1344
                                continue
664✔
1345
                        } else {
2✔
1346
                                close(state.unready)
2✔
1347
                                close(st)
2✔
1348
                                st = val.(chan watchState)
2✔
1349
                        }
2✔
1350
                } else {
2,112✔
1351
                        st = val.(chan watchState)
2,112✔
1352
                }
2,112✔
1353

1354
                // It is an existing channel created outside. It may be in the
1355
                // unready state.
1356
                state := <-st
2,114✔
1357
                if state.unready == nil {
4,226✔
1358
                        state.cnt += 1
2,112✔
1359
                }
2,112✔
1360
                st <- state
2,114✔
1361

2,114✔
1362
                if state.unready != nil {
2,116✔
1363
                        // Wait for an update and retry.
2✔
1364
                        <-state.unready
2✔
1365
                        st = nil
2✔
1366
                }
2✔
1367
        }
1368

1369
        return st, nil
2,776✔
1370
}
1371

1372
func isFeatureInSlice(expected iproto.Feature, actualSlice []iproto.Feature) bool {
3,974✔
1373
        for _, actual := range actualSlice {
21,562✔
1374
                if expected == actual {
20,954✔
1375
                        return true
3,366✔
1376
                }
3,366✔
1377
        }
1378
        return false
608✔
1379
}
1380

1381
// NewWatcher creates a new Watcher object for the connection.
1382
//
1383
// Server must support IPROTO_FEATURE_WATCHERS to use watchers.
1384
//
1385
// After watcher creation, the watcher callback is invoked for the first time.
1386
// In this case, the callback is triggered whether or not the key has already
1387
// been broadcast. All subsequent invocations are triggered with
1388
// box.broadcast() called on the remote host. If a watcher is subscribed for a
1389
// key that has not been broadcast yet, the callback is triggered only once,
1390
// after the registration of the watcher.
1391
//
1392
// The watcher callbacks are always invoked in a separate goroutine. A watcher
1393
// callback is never executed in parallel with itself, but they can be executed
1394
// in parallel to other watchers.
1395
//
1396
// If the key is updated while the watcher callback is running, the callback
1397
// will be invoked again with the latest value as soon as it returns.
1398
//
1399
// Watchers survive reconnection. All registered watchers are automatically
1400
// resubscribed when the connection is reestablished.
1401
//
1402
// Keep in mind that garbage collection of a watcher handle doesn’t lead to the
1403
// watcher’s destruction. In this case, the watcher remains registered. You
1404
// need to call Unregister() directly.
1405
//
1406
// Unregister() guarantees that there will be no the watcher's callback calls
1407
// after it, but Unregister() call from the callback leads to a deadlock.
1408
//
1409
// See:
1410
// https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_events/#box-watchers
1411
//
1412
// Since 1.10.0
1413
func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, error) {
2,128✔
1414
        // We need to check the feature because the IPROTO_WATCH request is
2,128✔
1415
        // asynchronous. We do not expect any response from a Tarantool instance
2,128✔
1416
        // That's why we can't just check the Tarantool response for an unsupported
2,128✔
1417
        // request error.
2,128✔
1418
        if !isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
2,128✔
1419
                conn.c.ProtocolInfo().Features) {
2,128✔
1420
                err := fmt.Errorf("the feature %s must be supported by connection "+
×
1421
                        "to create a watcher", iproto.IPROTO_FEATURE_WATCHERS)
×
1422
                return nil, err
×
1423
        }
×
1424

1425
        return conn.newWatcherImpl(key, callback)
2,128✔
1426
}
1427

1428
func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watcher, error) {
2,776✔
1429
        st, err := subscribeWatchChannel(conn, key)
2,776✔
1430
        if err != nil {
2,776✔
1431
                return nil, err
×
1432
        }
×
1433

1434
        // Start the watcher goroutine.
1435
        done := make(chan struct{})
2,776✔
1436
        finished := make(chan struct{})
2,776✔
1437

2,776✔
1438
        go func() {
5,552✔
1439
                version := initWatchEventVersion
2,776✔
1440
                for {
6,527✔
1441
                        state := <-st
3,751✔
1442
                        if state.changed == nil {
5,216✔
1443
                                state.changed = make(chan struct{})
1,465✔
1444
                        }
1,465✔
1445
                        st <- state
3,751✔
1446

3,751✔
1447
                        if state.version != version {
6,774✔
1448
                                callback(WatchEvent{
3,023✔
1449
                                        Conn:  conn,
3,023✔
1450
                                        Key:   key,
3,023✔
1451
                                        Value: state.value,
3,023✔
1452
                                })
3,023✔
1453
                                version = state.version
3,023✔
1454

3,023✔
1455
                                // Do we need to acknowledge the notification?
3,023✔
1456
                                state = <-st
3,023✔
1457
                                sendAck := !state.ack && version == state.version
3,023✔
1458
                                if sendAck {
3,824✔
1459
                                        state.ack = true
801✔
1460
                                }
801✔
1461
                                st <- state
3,023✔
1462

3,023✔
1463
                                if sendAck {
3,824✔
1464
                                        // We expect a reconnect and re-subscribe if it fails to
801✔
1465
                                        // send the watch request. So it looks ok do not check a
801✔
1466
                                        // result. But we need to make sure that the re-watch
801✔
1467
                                        // request will not be finished by a small per-request
801✔
1468
                                        // timeout.
801✔
1469
                                        req := newWatchRequest(key).Context(context.Background())
801✔
1470
                                        conn.Do(req).Get()
801✔
1471
                                }
801✔
1472
                        }
1473

1474
                        select {
3,751✔
1475
                        case <-done:
2,772✔
1476
                                state := <-st
2,772✔
1477
                                state.cnt -= 1
2,772✔
1478
                                if state.cnt == 0 {
3,432✔
1479
                                        state.unready = make(chan struct{})
660✔
1480
                                }
660✔
1481
                                st <- state
2,772✔
1482

2,772✔
1483
                                if state.cnt == 0 {
3,432✔
1484
                                        // The last one sends IPROTO_UNWATCH.
660✔
1485
                                        if !conn.ClosedNow() {
676✔
1486
                                                // conn.ClosedNow() check is a workaround for calling
16✔
1487
                                                // Unregister from connectionClose().
16✔
1488
                                                //
16✔
1489
                                                // We need to make sure that the unwatch request will
16✔
1490
                                                // not be finished by a small per-request timeout to
16✔
1491
                                                // avoid lost of the request.
16✔
1492
                                                req := newUnwatchRequest(key).Context(context.Background())
16✔
1493
                                                conn.Do(req).Get()
16✔
1494
                                        }
16✔
1495
                                        conn.watchMap.Delete(key)
660✔
1496
                                        close(state.unready)
660✔
1497
                                }
1498

1499
                                close(finished)
2,772✔
1500
                                return
2,772✔
1501
                        case <-state.changed:
975✔
1502
                        }
1503
                }
1504
        }()
1505

1506
        return &connWatcher{
2,776✔
1507
                done:     done,
2,776✔
1508
                finished: finished,
2,776✔
1509
        }, nil
2,776✔
1510
}
1511

1512
// ProtocolInfo returns protocol version and protocol features
1513
// supported by connected Tarantool server. Beware that values might be
1514
// outdated if connection is in a disconnected state.
1515
// Since 2.0.0
1516
func (conn *Connection) ProtocolInfo() ProtocolInfo {
10✔
1517
        return conn.serverProtocolInfo.Clone()
10✔
1518
}
10✔
1519

1520
func shutdownEventCallback(event WatchEvent) {
763✔
1521
        // Receives "true" on server shutdown.
763✔
1522
        // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
763✔
1523
        // step 2.
763✔
1524
        val, ok := event.Value.(bool)
763✔
1525
        if ok && val {
891✔
1526
                go event.Conn.shutdown(false)
128✔
1527
        }
128✔
1528
}
1529

1530
func (conn *Connection) shutdown(forever bool) error {
132✔
1531
        // Forbid state changes.
132✔
1532
        conn.mutex.Lock()
132✔
1533
        defer conn.mutex.Unlock()
132✔
1534

132✔
1535
        if !atomic.CompareAndSwapUint32(&conn.state, connConnected, connShutdown) {
138✔
UNCOV
1536
                if forever {
6✔
1537
                        err := ClientError{ErrConnectionClosed, "connection closed by client"}
×
1538
                        return conn.closeConnection(err, true)
×
1539
                }
×
UNCOV
1540
                return nil
6✔
1541
        }
1542

1543
        if forever {
130✔
1544
                // We don't want to reconnect any more.
4✔
1545
                conn.opts.Reconnect = 0
4✔
1546
                conn.opts.MaxReconnects = 0
4✔
1547
        }
4✔
1548

1549
        conn.cond.Broadcast()
126✔
1550
        conn.notify(Shutdown)
126✔
1551

126✔
1552
        c := conn.c
126✔
1553
        for {
364✔
1554
                if (atomic.LoadUint32(&conn.state) != connShutdown) || (c != conn.c) {
245✔
1555
                        return nil
7✔
1556
                }
7✔
1557
                if atomic.LoadInt64(&conn.requestCnt) == 0 {
350✔
1558
                        break
119✔
1559
                }
1560
                // Use cond var on conn.mutex since request execution may
1561
                // call reconnect(). It is ok if state changes as part of
1562
                // reconnect since Tarantool server won't allow to reconnect
1563
                // in the middle of shutting down.
1564
                conn.cond.Wait()
112✔
1565
        }
1566

1567
        if forever {
121✔
1568
                err := ClientError{ErrConnectionClosed, "connection closed by client"}
2✔
1569
                return conn.closeConnection(err, true)
2✔
1570
        } else {
119✔
1571
                // Start to reconnect based on common rules, same as in net.box.
117✔
1572
                // Reconnect also closes the connection: server waits until all
117✔
1573
                // subscribed connections are terminated.
117✔
1574
                // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
117✔
1575
                // step 3.
117✔
1576
                conn.reconnectImpl(ClientError{
117✔
1577
                        ErrConnectionClosed,
117✔
1578
                        "connection closed after server shutdown",
117✔
1579
                }, conn.c)
117✔
1580
                return nil
117✔
1581
        }
117✔
1582
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc