• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / go-tarantool / 6933294594

20 Nov 2023 04:56PM UTC coverage: 79.76% (+0.08%) from 79.68%
6933294594

Pull #349

github

askalt
docs: update according to the changes
Pull Request #349: connection: support connection via an existing socket fd

255 of 309 new or added lines in 7 files covered. (82.52%)

24 existing lines in 3 files now uncovered.

5647 of 7080 relevant lines covered (79.76%)

11113.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.31
/connection.go
1
// Package with implementation of methods and structures for work with
2
// Tarantool instance.
3
package tarantool
4

5
import (
6
        "context"
7
        "encoding/binary"
8
        "errors"
9
        "fmt"
10
        "io"
11
        "log"
12
        "math"
13
        "net"
14
        "runtime"
15
        "sync"
16
        "sync/atomic"
17
        "time"
18

19
        "github.com/tarantool/go-iproto"
20
        "github.com/vmihailenco/msgpack/v5"
21
)
22

23
const requestsMap = 128
24
const ignoreStreamId = 0
25
const (
26
        connDisconnected = 0
27
        connConnected    = 1
28
        connShutdown     = 2
29
        connClosed       = 3
30
)
31

32
const shutdownEventKey = "box.shutdown"
33

34
type ConnEventKind int
35
type ConnLogKind int
36

37
var (
38
        errUnknownRequest = errors.New("the passed connected request doesn't belong " +
39
                "to the current connection or connection pool")
40
)
41

42
const (
43
        // Connected signals that connection is established or reestablished.
44
        Connected ConnEventKind = iota + 1
45
        // Disconnected signals that connection is broken.
46
        Disconnected
47
        // ReconnectFailed signals that attempt to reconnect has failed.
48
        ReconnectFailed
49
        // Shutdown signals that shutdown callback is processing.
50
        Shutdown
51
        // Either reconnect attempts exhausted, or explicit Close is called.
52
        Closed
53

54
        // LogReconnectFailed is logged when reconnect attempt failed.
55
        LogReconnectFailed ConnLogKind = iota + 1
56
        // LogLastReconnectFailed is logged when last reconnect attempt failed,
57
        // connection will be closed after that.
58
        LogLastReconnectFailed
59
        // LogUnexpectedResultId is logged when response with unknown id was received.
60
        // Most probably it is due to request timeout.
61
        LogUnexpectedResultId
62
        // LogWatchEventReadFailed is logged when failed to read a watch event.
63
        LogWatchEventReadFailed
64
)
65

66
// ConnEvent is sent throw Notify channel specified in Opts.
67
type ConnEvent struct {
68
        Conn *Connection
69
        Kind ConnEventKind
70
        When time.Time
71
}
72

73
// A raw watch event.
74
type connWatchEvent struct {
75
        key   string
76
        value interface{}
77
}
78

79
var epoch = time.Now()
80

81
// Logger is logger type expected to be passed in options.
82
type Logger interface {
83
        Report(event ConnLogKind, conn *Connection, v ...interface{})
84
}
85

86
type defaultLogger struct{}
87

88
func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interface{}) {
126✔
89
        switch event {
126✔
90
        case LogReconnectFailed:
126✔
91
                reconnects := v[0].(uint)
126✔
92
                err := v[1].(error)
126✔
93
                log.Printf("tarantool: reconnect (%d/%d) failed: %s",
126✔
94
                        reconnects, conn.opts.MaxReconnects, err)
126✔
95
        case LogLastReconnectFailed:
×
96
                err := v[0].(error)
×
NEW
97
                log.Printf("tarantool: last reconnect failed: %s, giving it up", err)
×
98
        case LogUnexpectedResultId:
×
99
                resp := v[0].(*Response)
×
NEW
100
                log.Printf("tarantool: got unexpected resultId (%d) in response", resp.RequestId)
×
101
        case LogWatchEventReadFailed:
×
102
                err := v[0].(error)
×
103
                log.Printf("tarantool: unable to parse watch event: %s", err)
×
104
        default:
×
105
                args := append([]interface{}{"tarantool: unexpected event ", event, conn}, v...)
×
106
                log.Print(args...)
×
107
        }
108
}
109

110
// Connection is a handle with a single connection to a Tarantool instance.
111
//
112
// It is created and configured with Connect function, and could not be
113
// reconfigured later.
114
//
115
// Connection could be in three possible states:
116
//
117
// - In "Connected" state it sends queries to Tarantool.
118
//
119
// - In "Disconnected" state it rejects queries with ClientError{Code:
120
// ErrConnectionNotReady}
121
//
122
// - In "Shutdown" state it rejects queries with ClientError{Code:
123
// ErrConnectionShutdown}. The state indicates that a graceful shutdown
124
// in progress. The connection waits for all active requests to
125
// complete.
126
//
127
// - In "Closed" state it rejects queries with ClientError{Code:
128
// ErrConnectionClosed}. Connection could become "Closed" when
129
// Connection.Close() method called, or when Tarantool disconnected and
130
// Reconnect pause is not specified or MaxReconnects is specified and
131
// MaxReconnect reconnect attempts already performed.
132
//
133
// You may perform data manipulation operation by calling its methods:
134
// Call*, Insert*, Replace*, Update*, Upsert*, Call*, Eval*.
135
//
136
// In any method that accepts space you my pass either space number or space
137
// name (in this case it will be looked up in schema). Same is true for index.
138
//
139
// ATTENTION: tuple, key, ops and args arguments for any method should be
140
// and array or should serialize to msgpack array.
141
//
142
// ATTENTION: result argument for *Typed methods should deserialize from
143
// msgpack array, cause Tarantool always returns result as an array.
144
// For all space related methods and Call16* (but not Call17*) methods Tarantool
145
// always returns array of array (array of tuples for space related methods).
146
// For Eval* and Call* Tarantool always returns array, but does not forces
147
// array of arrays.
148
//
149
// If connected to Tarantool 2.10 or newer, connection supports server graceful
150
// shutdown. In this case, server will wait until all client requests will be
151
// finished and client disconnects before going down (server also may go down
152
// by timeout). Client reconnect will happen if connection options enable
153
// reconnect. Beware that graceful shutdown event initialization is asynchronous.
154
//
155
// More on graceful shutdown:
156
// https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
157
type Connection struct {
158
        addr   net.Addr
159
        dialer Dialer
160
        c      Conn
161
        mutex  sync.Mutex
162
        cond   *sync.Cond
163
        // Schema contains schema loaded on connection.
164
        Schema *Schema
165
        // schemaResolver contains a SchemaResolver implementation.
166
        schemaResolver SchemaResolver
167
        // requestId contains the last request ID for requests with nil context.
168
        requestId uint32
169
        // contextRequestId contains the last request ID for requests with context.
170
        contextRequestId uint32
171
        // Greeting contains first message sent by Tarantool.
172
        Greeting *Greeting
173

174
        shard      []connShard
175
        dirtyShard chan uint32
176

177
        control chan struct{}
178
        rlimit  chan struct{}
179
        opts    Opts
180
        state   uint32
181
        dec     *msgpack.Decoder
182
        lenbuf  [packetLengthBytes]byte
183

184
        lastStreamId uint64
185

186
        serverProtocolInfo ProtocolInfo
187
        // watchMap is a map of key -> chan watchState.
188
        watchMap sync.Map
189

190
        // shutdownWatcher is the "box.shutdown" event watcher.
191
        shutdownWatcher Watcher
192
        // requestCnt is a counter of active requests.
193
        requestCnt int64
194
}
195

196
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
197

198
type futureList struct {
199
        first *Future
200
        last  **Future
201
}
202

203
func (list *futureList) findFuture(reqid uint32, fetch bool) *Future {
3,648✔
204
        root := &list.first
3,648✔
205
        for {
7,296✔
206
                fut := *root
3,648✔
207
                if fut == nil {
3,657✔
208
                        return nil
9✔
209
                }
9✔
210
                if fut.requestId == reqid {
7,278✔
211
                        if fetch {
7,278✔
212
                                *root = fut.next
3,639✔
213
                                if fut.next == nil {
7,278✔
214
                                        list.last = root
3,639✔
215
                                } else {
3,639✔
216
                                        fut.next = nil
×
217
                                }
×
218
                        }
219
                        return fut
3,639✔
220
                }
221
                root = &fut.next
×
222
        }
223
}
224

225
func (list *futureList) addFuture(fut *Future) {
3,620✔
226
        *list.last = fut
3,620✔
227
        list.last = &fut.next
3,620✔
228
}
3,620✔
229

230
func (list *futureList) clear(err error, conn *Connection) {
5,390,336✔
231
        fut := list.first
5,390,336✔
232
        list.first = nil
5,390,336✔
233
        list.last = &list.first
5,390,336✔
234
        for fut != nil {
5,390,343✔
235
                fut.SetError(err)
7✔
236
                conn.markDone(fut)
7✔
237
                fut, fut.next = fut.next, nil
7✔
238
        }
7✔
239
}
240

241
type connShard struct {
242
        rmut            sync.Mutex
243
        requests        [requestsMap]futureList
244
        requestsWithCtx [requestsMap]futureList
245
        bufmut          sync.Mutex
246
        buf             smallWBuf
247
        enc             *msgpack.Encoder
248
}
249

250
// RLimitActions is an enumeration type for an action to do when a rate limit
251
// is reached.
252
type RLimitAction int
253

254
const (
255
        // RLimitDrop immediately aborts the request.
256
        RLimitDrop RLimitAction = iota
257
        // RLimitWait waits during timeout period for some request to be answered.
258
        // If no request answered during timeout period, this request is aborted.
259
        // If no timeout period is set, it will wait forever.
260
        RLimitWait
261
)
262

263
// Opts is a way to configure Connection
264
type Opts struct {
265
        // Timeout for response to a particular request. The timeout is reset when
266
        // push messages are received. If Timeout is zero, any request can be
267
        // blocked infinitely.
268
        // Also used to setup net.TCPConn.Set(Read|Write)Deadline.
269
        //
270
        // Pay attention, when using contexts with request objects,
271
        // the timeout option for Connection does not affect the lifetime
272
        // of the request. For those purposes use context.WithTimeout() as
273
        // the root context.
274
        Timeout time.Duration
275
        // Timeout between reconnect attempts. If Reconnect is zero, no
276
        // reconnect attempts will be made.
277
        // If specified, then when Tarantool is not reachable or disconnected,
278
        // new connect attempt is performed after pause.
279
        // By default, no reconnection attempts are performed,
280
        // so once disconnected, connection becomes Closed.
281
        Reconnect time.Duration
282
        // Maximum number of reconnect failures; after that we give it up to
283
        // on. If MaxReconnects is zero, the client will try to reconnect
284
        // endlessly.
285
        // After MaxReconnects attempts Connection becomes closed.
286
        MaxReconnects uint
287
        // RateLimit limits number of 'in-fly' request, i.e. already put into
288
        // requests queue, but not yet answered by server or timeouted.
289
        // It is disabled by default.
290
        // See RLimitAction for possible actions when RateLimit.reached.
291
        RateLimit uint
292
        // RLimitAction tells what to do when RateLimit is reached.
293
        // It is required if RateLimit is specified.
294
        RLimitAction RLimitAction
295
        // Concurrency is amount of separate mutexes for request
296
        // queues and buffers inside of connection.
297
        // It is rounded up to nearest power of 2.
298
        // By default it is runtime.GOMAXPROCS(-1) * 4
299
        Concurrency uint32
300
        // SkipSchema disables schema loading. Without disabling schema loading,
301
        // there is no way to create Connection for currently not accessible Tarantool.
302
        SkipSchema bool
303
        // Notify is a channel which receives notifications about Connection status
304
        // changes.
305
        Notify chan<- ConnEvent
306
        // Handle is user specified value, that could be retrivied with
307
        // Handle() method.
308
        Handle interface{}
309
        // Logger is user specified logger used for error messages.
310
        Logger Logger
311
}
312

313
// Connect creates and configures a new Connection.
314
func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, err error) {
668✔
315
        conn = &Connection{
668✔
316
                dialer:           dialer,
668✔
317
                requestId:        0,
668✔
318
                contextRequestId: 1,
668✔
319
                Greeting:         &Greeting{},
668✔
320
                control:          make(chan struct{}),
668✔
321
                opts:             opts,
668✔
322
                dec:              msgpack.NewDecoder(&smallBuf{}),
668✔
323
        }
668✔
324
        maxprocs := uint32(runtime.GOMAXPROCS(-1))
668✔
325
        if conn.opts.Concurrency == 0 || conn.opts.Concurrency > maxprocs*128 {
1,332✔
326
                conn.opts.Concurrency = maxprocs * 4
664✔
327
        }
664✔
328
        if c := conn.opts.Concurrency; c&(c-1) != 0 {
668✔
UNCOV
329
                for i := uint(1); i < 32; i *= 2 {
×
330
                        c |= c >> i
×
331
                }
×
332
                conn.opts.Concurrency = c + 1
×
333
        }
334
        conn.dirtyShard = make(chan uint32, conn.opts.Concurrency*2)
668✔
335
        conn.shard = make([]connShard, conn.opts.Concurrency)
668✔
336
        for i := range conn.shard {
11,420✔
337
                shard := &conn.shard[i]
10,752✔
338
                requestsLists := []*[requestsMap]futureList{&shard.requests, &shard.requestsWithCtx}
10,752✔
339
                for _, requests := range requestsLists {
32,256✔
340
                        for j := range requests {
2,774,016✔
341
                                requests[j].last = &requests[j].first
2,752,512✔
342
                        }
2,752,512✔
343
                }
344
        }
345

346
        if conn.opts.RateLimit > 0 {
668✔
347
                conn.rlimit = make(chan struct{}, conn.opts.RateLimit)
×
348
                if conn.opts.RLimitAction != RLimitDrop && conn.opts.RLimitAction != RLimitWait {
×
349
                        return nil, errors.New("RLimitAction should be specified to RLimitDone nor RLimitWait")
×
350
                }
×
351
        }
352

353
        if conn.opts.Logger == nil {
1,336✔
354
                conn.opts.Logger = defaultLogger{}
668✔
355
        }
668✔
356

357
        conn.cond = sync.NewCond(&conn.mutex)
668✔
358

668✔
359
        if err = conn.createConnection(ctx); err != nil {
686✔
360
                return nil, err
18✔
361
        }
18✔
362

363
        go conn.pinger()
650✔
364
        if conn.opts.Timeout > 0 {
1,298✔
365
                go conn.timeouts()
648✔
366
        }
648✔
367

368
        // TODO: reload schema after reconnect.
369
        if !conn.opts.SkipSchema {
1,225✔
370
                if err = conn.loadSchema(); err != nil {
637✔
371
                        conn.mutex.Lock()
62✔
372
                        defer conn.mutex.Unlock()
62✔
373
                        conn.closeConnection(err, true)
62✔
374
                        return nil, err
62✔
375
                }
62✔
376
        }
377

378
        return conn, err
588✔
379
}
380

381
// ConnectedNow reports if connection is established at the moment.
382
func (conn *Connection) ConnectedNow() bool {
10✔
383
        return atomic.LoadUint32(&conn.state) == connConnected
10✔
384
}
10✔
385

386
// ClosedNow reports if connection is closed by user or after reconnect.
387
func (conn *Connection) ClosedNow() bool {
658✔
388
        return atomic.LoadUint32(&conn.state) == connClosed
658✔
389
}
658✔
390

391
// Close closes Connection.
392
// After this method called, there is no way to reopen this Connection.
393
func (conn *Connection) Close() error {
588✔
394
        err := ClientError{ErrConnectionClosed, "connection closed by client"}
588✔
395
        conn.mutex.Lock()
588✔
396
        defer conn.mutex.Unlock()
588✔
397
        return conn.closeConnection(err, true)
588✔
398
}
588✔
399

400
// CloseGraceful closes Connection gracefully. It waits for all requests to
401
// complete.
402
// After this method called, there is no way to reopen this Connection.
403
func (conn *Connection) CloseGraceful() error {
4✔
404
        return conn.shutdown(true)
4✔
405
}
4✔
406

407
// Addr returns a configured address of Tarantool socket.
408
// It panics, if the connection has not been successfully established.
409
func (conn *Connection) Addr() net.Addr {
2✔
410
        return conn.addr
2✔
411
}
2✔
412

413
// Handle returns a user-specified handle from Opts.
UNCOV
414
func (conn *Connection) Handle() interface{} {
×
415
        return conn.opts.Handle
×
416
}
×
417

418
func (conn *Connection) cancelFuture(fut *Future, err error) {
8✔
419
        if fut = conn.fetchFuture(fut.requestId); fut != nil {
8✔
420
                fut.SetError(err)
×
421
                conn.markDone(fut)
×
422
        }
×
423
}
424

425
func (conn *Connection) dial(ctx context.Context) error {
798✔
426
        opts := conn.opts
798✔
427

798✔
428
        var c Conn
798✔
429
        c, err := conn.dialer.Dial(ctx, DialOpts{
798✔
430
                IoTimeout: opts.Timeout,
798✔
431
        })
798✔
432
        if err != nil {
942✔
433
                return err
144✔
434
        }
144✔
435

436
        conn.addr = c.Addr()
654✔
437
        conn.Greeting.Version = c.Greeting().Version
654✔
438
        conn.serverProtocolInfo = c.ProtocolInfo()
654✔
439

654✔
440
        spaceAndIndexNamesSupported :=
654✔
441
                isFeatureInSlice(iproto.IPROTO_FEATURE_SPACE_AND_INDEX_NAMES,
654✔
442
                        conn.serverProtocolInfo.Features)
654✔
443

654✔
444
        conn.schemaResolver = &noSchemaResolver{
654✔
445
                SpaceAndIndexNamesSupported: spaceAndIndexNamesSupported,
654✔
446
        }
654✔
447

654✔
448
        // Watchers.
654✔
449
        conn.watchMap.Range(func(key, value interface{}) bool {
660✔
450
                st := value.(chan watchState)
6✔
451
                state := <-st
6✔
452
                if state.unready != nil {
6✔
453
                        st <- state
×
454
                        return true
×
455
                }
×
456

457
                req := newWatchRequest(key.(string))
6✔
458
                if err = writeRequest(c, req); err != nil {
6✔
459
                        st <- state
×
460
                        return false
×
461
                }
×
462
                state.ack = true
6✔
463

6✔
464
                st <- state
6✔
465
                return true
6✔
466
        })
467

468
        if err != nil {
654✔
469
                c.Close()
×
470
                return fmt.Errorf("unable to register watch: %w", err)
×
471
        }
×
472

473
        // Only if connected and fully initialized.
474
        conn.lockShards()
654✔
475
        conn.c = c
654✔
476
        atomic.StoreUint32(&conn.state, connConnected)
654✔
477
        conn.cond.Broadcast()
654✔
478
        conn.unlockShards()
654✔
479
        go conn.writer(c, c)
654✔
480
        go conn.reader(c, c)
654✔
481

654✔
482
        // Subscribe shutdown event to process graceful shutdown.
654✔
483
        if conn.shutdownWatcher == nil &&
654✔
484
                isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
654✔
485
                        conn.serverProtocolInfo.Features) {
1,294✔
486
                watcher, werr := conn.newWatcherImpl(shutdownEventKey, shutdownEventCallback)
640✔
487
                if werr != nil {
640✔
488
                        return werr
×
489
                }
×
490
                conn.shutdownWatcher = watcher
640✔
491
        }
492

493
        return nil
654✔
494
}
495

496
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
497
        req Request, streamId uint64, res SchemaResolver) (err error) {
5,018✔
498
        const uint32Code = 0xce
5,018✔
499
        const uint64Code = 0xcf
5,018✔
500
        const streamBytesLenUint64 = 10
5,018✔
501
        const streamBytesLenUint32 = 6
5,018✔
502

5,018✔
503
        hl := h.Len()
5,018✔
504

5,018✔
505
        var streamBytesLen = 0
5,018✔
506
        var streamBytes [streamBytesLenUint64]byte
5,018✔
507
        hMapLen := byte(0x82) // 2 element map.
5,018✔
508
        if streamId != ignoreStreamId {
5,094✔
509
                hMapLen = byte(0x83) // 3 element map.
76✔
510
                streamBytes[0] = byte(iproto.IPROTO_STREAM_ID)
76✔
511
                if streamId > math.MaxUint32 {
80✔
512
                        streamBytesLen = streamBytesLenUint64
4✔
513
                        streamBytes[1] = uint64Code
4✔
514
                        binary.BigEndian.PutUint64(streamBytes[2:], streamId)
4✔
515
                } else {
76✔
516
                        streamBytesLen = streamBytesLenUint32
72✔
517
                        streamBytes[1] = uint32Code
72✔
518
                        binary.BigEndian.PutUint32(streamBytes[2:], uint32(streamId))
72✔
519
                }
72✔
520
        }
521

522
        hBytes := append([]byte{
5,018✔
523
                uint32Code, 0, 0, 0, 0, // Length.
5,018✔
524
                hMapLen,
5,018✔
525
                byte(iproto.IPROTO_REQUEST_TYPE), byte(req.Type()), // Request type.
5,018✔
526
                byte(iproto.IPROTO_SYNC), uint32Code,
5,018✔
527
                byte(reqid >> 24), byte(reqid >> 16),
5,018✔
528
                byte(reqid >> 8), byte(reqid),
5,018✔
529
        }, streamBytes[:streamBytesLen]...)
5,018✔
530

5,018✔
531
        h.Write(hBytes)
5,018✔
532

5,018✔
533
        if err = req.Body(res, enc); err != nil {
5,018✔
534
                return
×
535
        }
×
536

537
        l := uint32(h.Len() - 5 - hl)
5,018✔
538
        h.b[hl+1] = byte(l >> 24)
5,018✔
539
        h.b[hl+2] = byte(l >> 16)
5,018✔
540
        h.b[hl+3] = byte(l >> 8)
5,018✔
541
        h.b[hl+4] = byte(l)
5,018✔
542

5,018✔
543
        return
5,018✔
544
}
545

546
func (conn *Connection) createConnection(ctx context.Context) error {
920✔
547
        var err error
920✔
548
        if conn.c == nil && conn.state == connDisconnected {
1,718✔
549
                if err = conn.dial(ctx); err == nil {
1,452✔
550
                        conn.notify(Connected)
654✔
551
                        return nil
654✔
552
                }
654✔
553
        }
554
        if conn.state == connClosed {
388✔
555
                err = ClientError{ErrConnectionClosed, "using closed connection"}
122✔
556
        }
122✔
557
        return err
266✔
558
}
559

560
func (conn *Connection) closeConnection(neterr error, forever bool) (err error) {
1,310✔
561
        conn.lockShards()
1,310✔
562
        defer conn.unlockShards()
1,310✔
563
        if forever {
2,494✔
564
                if conn.state != connClosed {
1,828✔
565
                        close(conn.control)
644✔
566
                        atomic.StoreUint32(&conn.state, connClosed)
644✔
567
                        conn.cond.Broadcast()
644✔
568
                        // Free the resources.
644✔
569
                        if conn.shutdownWatcher != nil {
1,280✔
570
                                go conn.shutdownWatcher.Unregister()
636✔
571
                                conn.shutdownWatcher = nil
636✔
572
                        }
636✔
573
                        conn.notify(Closed)
644✔
574
                }
575
        } else {
126✔
576
                atomic.StoreUint32(&conn.state, connDisconnected)
126✔
577
                conn.cond.Broadcast()
126✔
578
                conn.notify(Disconnected)
126✔
579
        }
126✔
580
        if conn.c != nil {
1,958✔
581
                err = conn.c.Close()
648✔
582
                conn.c = nil
648✔
583
        }
648✔
584
        for i := range conn.shard {
22,366✔
585
                conn.shard[i].buf.Reset()
21,056✔
586
                requestsLists := []*[requestsMap]futureList{
21,056✔
587
                        &conn.shard[i].requests,
21,056✔
588
                        &conn.shard[i].requestsWithCtx,
21,056✔
589
                }
21,056✔
590
                for _, requests := range requestsLists {
63,168✔
591
                        for pos := range requests {
5,432,448✔
592
                                requests[pos].clear(neterr, conn)
5,390,336✔
593
                        }
5,390,336✔
594
                }
595
        }
596
        return
1,310✔
597
}
598

599
func (conn *Connection) getDialTimeout() time.Duration {
126✔
600
        dialTimeout := conn.opts.Reconnect / 2
126✔
601
        if dialTimeout == 0 {
126✔
602
                dialTimeout = 500 * time.Millisecond
×
603
        } else if dialTimeout > 5*time.Second {
126✔
604
                dialTimeout = 5 * time.Second
×
605
        }
×
606
        return dialTimeout
126✔
607
}
608

609
func (conn *Connection) runReconnects() error {
126✔
610
        dialTimeout := conn.getDialTimeout()
126✔
611
        var reconnects uint
126✔
612
        var err error
126✔
613

126✔
614
        for conn.opts.MaxReconnects == 0 || reconnects <= conn.opts.MaxReconnects {
378✔
615
                now := time.Now()
252✔
616

252✔
617
                ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
252✔
618
                err = conn.createConnection(ctx)
252✔
619
                cancel()
252✔
620

252✔
621
                if err != nil {
500✔
622
                        if clientErr, ok := err.(ClientError); ok &&
248✔
623
                                clientErr.Code == ErrConnectionClosed {
370✔
624
                                return err
122✔
625
                        }
122✔
626
                } else {
4✔
627
                        return nil
4✔
628
                }
4✔
629

630
                conn.opts.Logger.Report(LogReconnectFailed, conn, reconnects, err)
126✔
631
                conn.notify(ReconnectFailed)
126✔
632
                reconnects++
126✔
633
                conn.mutex.Unlock()
126✔
634

126✔
635
                time.Sleep(time.Until(now.Add(conn.opts.Reconnect)))
126✔
636

126✔
637
                conn.mutex.Lock()
126✔
638
        }
639

640
        conn.opts.Logger.Report(LogLastReconnectFailed, conn, err)
×
641
        // mark connection as closed to avoid reopening by another goroutine
×
642
        return ClientError{ErrConnectionClosed, "last reconnect failed"}
×
643
}
644

645
func (conn *Connection) reconnectImpl(neterr error, c Conn) {
724✔
646
        if conn.opts.Reconnect > 0 {
1,038✔
647
                if c == conn.c {
440✔
648
                        conn.closeConnection(neterr, false)
126✔
649
                        if err := conn.runReconnects(); err != nil {
248✔
650
                                conn.closeConnection(err, true)
122✔
651
                        }
122✔
652
                }
653
        } else {
410✔
654
                conn.closeConnection(neterr, true)
410✔
655
        }
410✔
656
}
657

658
func (conn *Connection) reconnect(neterr error, c Conn) {
602✔
659
        conn.mutex.Lock()
602✔
660
        defer conn.mutex.Unlock()
602✔
661
        conn.reconnectImpl(neterr, c)
602✔
662
        conn.cond.Broadcast()
602✔
663
}
602✔
664

665
func (conn *Connection) lockShards() {
2,477✔
666
        for i := range conn.shard {
42,333✔
667
                conn.shard[i].rmut.Lock()
39,856✔
668
                conn.shard[i].bufmut.Lock()
39,856✔
669
        }
39,856✔
670
}
671

672
func (conn *Connection) unlockShards() {
2,477✔
673
        for i := range conn.shard {
42,333✔
674
                conn.shard[i].rmut.Unlock()
39,856✔
675
                conn.shard[i].bufmut.Unlock()
39,856✔
676
        }
39,856✔
677
}
678

679
func (conn *Connection) pinger() {
650✔
680
        to := conn.opts.Timeout
650✔
681
        if to == 0 {
652✔
682
                to = 3 * time.Second
2✔
683
        }
2✔
684
        t := time.NewTicker(to / 3)
650✔
685
        defer t.Stop()
650✔
686
        for {
1,302✔
687
                select {
652✔
688
                case <-conn.control:
644✔
689
                        return
644✔
690
                case <-t.C:
2✔
691
                }
692
                conn.Ping()
2✔
693
        }
694
}
695

696
func (conn *Connection) notify(kind ConnEventKind) {
1,682✔
697
        if conn.opts.Notify != nil {
1,682✔
698
                select {
×
699
                case conn.opts.Notify <- ConnEvent{Kind: kind, Conn: conn, When: time.Now()}:
×
700
                default:
×
701
                }
702
        }
703
}
704

705
func (conn *Connection) writer(w writeFlusher, c Conn) {
654✔
706
        var shardn uint32
654✔
707
        var packet smallWBuf
654✔
708
        for atomic.LoadUint32(&conn.state) != connClosed {
4,881✔
709
                select {
4,227✔
710
                case shardn = <-conn.dirtyShard:
1,033✔
711
                default:
3,194✔
712
                        runtime.Gosched()
3,194✔
713
                        if len(conn.dirtyShard) == 0 {
6,228✔
714
                                if err := w.Flush(); err != nil {
3,058✔
715
                                        err = ClientError{
24✔
716
                                                ErrIoError,
24✔
717
                                                fmt.Sprintf("failed to flush data to the connection: %s", err),
24✔
718
                                        }
24✔
719
                                        conn.reconnect(err, c)
24✔
720
                                        return
24✔
721
                                }
24✔
722
                        }
723
                        select {
3,170✔
724
                        case shardn = <-conn.dirtyShard:
2,575✔
725
                        case <-conn.control:
589✔
726
                                return
589✔
727
                        }
728
                }
729
                shard := &conn.shard[shardn]
3,608✔
730
                shard.bufmut.Lock()
3,608✔
731
                if conn.c != c {
3,625✔
732
                        conn.dirtyShard <- shardn
17✔
733
                        shard.bufmut.Unlock()
17✔
734
                        return
17✔
735
                }
17✔
736
                packet, shard.buf = shard.buf, packet
3,591✔
737
                shard.bufmut.Unlock()
3,591✔
738
                if packet.Len() == 0 {
3,591✔
739
                        continue
×
740
                }
741
                if _, err := w.Write(packet.b); err != nil {
3,591✔
742
                        err = ClientError{
×
743
                                ErrIoError,
×
744
                                fmt.Sprintf("failed to write data to the connection: %s", err),
×
745
                        }
×
746
                        conn.reconnect(err, c)
×
747
                        return
×
748
                }
×
749
                packet.Reset()
3,591✔
750
        }
751
}
752

753
func readWatchEvent(reader io.Reader) (connWatchEvent, error) {
840✔
754
        keyExist := false
840✔
755
        event := connWatchEvent{}
840✔
756
        d := msgpack.NewDecoder(reader)
840✔
757

840✔
758
        l, err := d.DecodeMapLen()
840✔
759
        if err != nil {
840✔
760
                return event, err
×
761
        }
×
762

763
        for ; l > 0; l-- {
1,879✔
764
                cd, err := d.DecodeInt()
1,039✔
765
                if err != nil {
1,039✔
766
                        return event, err
×
767
                }
×
768

769
                switch iproto.Key(cd) {
1,039✔
770
                case iproto.IPROTO_EVENT_KEY:
840✔
771
                        if event.key, err = d.DecodeString(); err != nil {
840✔
772
                                return event, err
×
773
                        }
×
774
                        keyExist = true
840✔
775
                case iproto.IPROTO_EVENT_DATA:
199✔
776
                        if event.value, err = d.DecodeInterface(); err != nil {
199✔
777
                                return event, err
×
778
                        }
×
779
                default:
×
780
                        if err = d.Skip(); err != nil {
×
781
                                return event, err
×
782
                        }
×
783
                }
784
        }
785

786
        if !keyExist {
840✔
787
                return event, errors.New("watch event does not have a key")
×
788
        }
×
789

790
        return event, nil
840✔
791
}
792

793
func (conn *Connection) reader(r io.Reader, c Conn) {
654✔
794
        events := make(chan connWatchEvent, 1024)
654✔
795
        defer close(events)
654✔
796

654✔
797
        go conn.eventer(events)
654✔
798

654✔
799
        for atomic.LoadUint32(&conn.state) != connClosed {
4,284✔
800
                respBytes, err := read(r, conn.lenbuf[:])
3,630✔
801
                if err != nil {
4,208✔
802
                        err = ClientError{
578✔
803
                                ErrIoError,
578✔
804
                                fmt.Sprintf("failed to read data from the connection: %s", err),
578✔
805
                        }
578✔
806
                        conn.reconnect(err, c)
578✔
807
                        return
578✔
808
                }
578✔
809
                resp := &Response{buf: smallBuf{b: respBytes}}
3,046✔
810
                err = resp.decodeHeader(conn.dec)
3,046✔
811
                if err != nil {
3,046✔
812
                        err = ClientError{
×
813
                                ErrProtocolError,
×
814
                                fmt.Sprintf("failed to decode IPROTO header: %s", err),
×
815
                        }
×
816
                        conn.reconnect(err, c)
×
817
                        return
×
818
                }
×
819

820
                var fut *Future = nil
3,046✔
821
                if iproto.Type(resp.Code) == iproto.IPROTO_EVENT {
3,886✔
822
                        if event, err := readWatchEvent(&resp.buf); err == nil {
1,680✔
823
                                events <- event
840✔
824
                        } else {
840✔
825
                                err = ClientError{
×
826
                                        ErrProtocolError,
×
827
                                        fmt.Sprintf("failed to decode IPROTO_EVENT: %s", err),
×
828
                                }
×
829
                                conn.opts.Logger.Report(LogWatchEventReadFailed, conn, err)
×
830
                        }
×
831
                        continue
840✔
832
                } else if resp.Code == PushCode {
2,232✔
833
                        if fut = conn.peekFuture(resp.RequestId); fut != nil {
52✔
834
                                fut.AppendPush(resp)
26✔
835
                        }
26✔
836
                } else {
2,180✔
837
                        if fut = conn.fetchFuture(resp.RequestId); fut != nil {
4,360✔
838
                                fut.SetResponse(resp)
2,180✔
839
                                conn.markDone(fut)
2,180✔
840
                        }
2,180✔
841
                }
842

843
                if fut == nil {
2,206✔
844
                        conn.opts.Logger.Report(LogUnexpectedResultId, conn, resp)
×
845
                }
×
846
        }
847
}
848

849
// eventer goroutine gets watch events and updates values for watchers.
850
func (conn *Connection) eventer(events <-chan connWatchEvent) {
654✔
851
        for event := range events {
1,494✔
852
                if value, ok := conn.watchMap.Load(event.key); ok {
1,634✔
853
                        st := value.(chan watchState)
794✔
854
                        state := <-st
794✔
855
                        state.value = event.value
794✔
856
                        if state.version == math.MaxUint64 {
794✔
857
                                state.version = initWatchEventVersion + 1
×
858
                        } else {
794✔
859
                                state.version += 1
794✔
860
                        }
794✔
861
                        state.ack = false
794✔
862
                        if state.changed != nil {
1,588✔
863
                                close(state.changed)
794✔
864
                                state.changed = nil
794✔
865
                        }
794✔
866
                        st <- state
794✔
867
                }
868
                // It is possible to get IPROTO_EVENT after we already send
869
                // IPROTO_UNWATCH due to processing on a Tarantool side or slow
870
                // read from the network, so it looks like an expected behavior.
871
        }
872
}
873

874
func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
3,668✔
875
        fut = NewFuture()
3,668✔
876
        if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
3,668✔
877
                select {
×
878
                case conn.rlimit <- struct{}{}:
×
879
                default:
×
880
                        fut.err = ClientError{
×
881
                                ErrRateLimited,
×
882
                                "Request is rate limited on client",
×
883
                        }
×
884
                        fut.ready = nil
×
885
                        fut.done = nil
×
886
                        return
×
887
                }
888
        }
889
        fut.requestId = conn.nextRequestId(ctx != nil)
3,668✔
890
        shardn := fut.requestId & (conn.opts.Concurrency - 1)
3,668✔
891
        shard := &conn.shard[shardn]
3,668✔
892
        shard.rmut.Lock()
3,668✔
893
        switch atomic.LoadUint32(&conn.state) {
3,668✔
894
        case connClosed:
31✔
895
                fut.err = ClientError{
31✔
896
                        ErrConnectionClosed,
31✔
897
                        "using closed connection",
31✔
898
                }
31✔
899
                fut.ready = nil
31✔
900
                fut.done = nil
31✔
901
                shard.rmut.Unlock()
31✔
902
                return
31✔
903
        case connDisconnected:
×
904
                fut.err = ClientError{
×
905
                        ErrConnectionNotReady,
×
906
                        "client connection is not ready",
×
907
                }
×
908
                fut.ready = nil
×
909
                fut.done = nil
×
910
                shard.rmut.Unlock()
×
911
                return
×
912
        case connShutdown:
9✔
913
                fut.err = ClientError{
9✔
914
                        ErrConnectionShutdown,
9✔
915
                        "server shutdown in progress",
9✔
916
                }
9✔
917
                fut.ready = nil
9✔
918
                fut.done = nil
9✔
919
                shard.rmut.Unlock()
9✔
920
                return
9✔
921
        }
922
        pos := (fut.requestId / conn.opts.Concurrency) & (requestsMap - 1)
3,628✔
923
        if ctx != nil {
3,636✔
924
                select {
8✔
925
                case <-ctx.Done():
8✔
926
                        fut.SetError(fmt.Errorf("context is done"))
8✔
927
                        shard.rmut.Unlock()
8✔
928
                        return
8✔
929
                default:
×
930
                }
931
                shard.requestsWithCtx[pos].addFuture(fut)
×
932
        } else {
3,620✔
933
                shard.requests[pos].addFuture(fut)
3,620✔
934
                if conn.opts.Timeout > 0 {
7,222✔
935
                        fut.timeout = time.Since(epoch) + conn.opts.Timeout
3,602✔
936
                }
3,602✔
937
        }
938
        shard.rmut.Unlock()
3,620✔
939
        if conn.rlimit != nil && conn.opts.RLimitAction == RLimitWait {
3,620✔
940
                select {
×
941
                case conn.rlimit <- struct{}{}:
×
942
                default:
×
943
                        runtime.Gosched()
×
944
                        select {
×
945
                        case conn.rlimit <- struct{}{}:
×
946
                        case <-fut.done:
×
947
                                if fut.err == nil {
×
948
                                        panic("fut.done is closed, but err is nil")
×
949
                                }
950
                        }
951
                }
952
        }
953
        return
3,620✔
954
}
955

956
// This method removes a future from the internal queue if the context
957
// is "done" before the response is come.
958
func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
×
959
        select {
×
960
        case <-fut.done:
×
961
        case <-ctx.Done():
×
962
        }
963

964
        select {
×
965
        case <-fut.done:
×
966
                return
×
967
        default:
×
968
                conn.cancelFuture(fut, fmt.Errorf("context is done"))
×
969
        }
970
}
971

972
func (conn *Connection) incrementRequestCnt() {
3,668✔
973
        atomic.AddInt64(&conn.requestCnt, int64(1))
3,668✔
974
}
3,668✔
975

976
func (conn *Connection) decrementRequestCnt() {
3,660✔
977
        if atomic.AddInt64(&conn.requestCnt, int64(-1)) == 0 {
7,059✔
978
                conn.cond.Broadcast()
3,399✔
979
        }
3,399✔
980
}
981

982
func (conn *Connection) send(req Request, streamId uint64) *Future {
3,668✔
983
        conn.incrementRequestCnt()
3,668✔
984

3,668✔
985
        fut := conn.newFuture(req.Ctx())
3,668✔
986
        if fut.ready == nil {
3,708✔
987
                conn.decrementRequestCnt()
40✔
988
                return fut
40✔
989
        }
40✔
990

991
        if req.Ctx() != nil {
3,636✔
992
                select {
8✔
993
                case <-req.Ctx().Done():
8✔
994
                        conn.cancelFuture(fut, fmt.Errorf("context is done"))
8✔
995
                        return fut
8✔
996
                default:
×
997
                }
998
                go conn.contextWatchdog(fut, req.Ctx())
×
999
        }
1000
        conn.putFuture(fut, req, streamId)
3,620✔
1001

3,620✔
1002
        return fut
3,620✔
1003
}
1004

1005
func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
3,620✔
1006
        shardn := fut.requestId & (conn.opts.Concurrency - 1)
3,620✔
1007
        shard := &conn.shard[shardn]
3,620✔
1008
        shard.bufmut.Lock()
3,620✔
1009
        select {
3,620✔
1010
        case <-fut.done:
×
1011
                shard.bufmut.Unlock()
×
1012
                return
×
1013
        default:
3,620✔
1014
        }
1015
        firstWritten := shard.buf.Len() == 0
3,620✔
1016
        if shard.buf.Cap() == 0 {
6,706✔
1017
                shard.buf.b = make([]byte, 0, 128)
3,086✔
1018
                shard.enc = msgpack.NewEncoder(&shard.buf)
3,086✔
1019
        }
3,086✔
1020
        blen := shard.buf.Len()
3,620✔
1021
        reqid := fut.requestId
3,620✔
1022
        if err := pack(&shard.buf, shard.enc, reqid, req, streamId, conn.schemaResolver); err != nil {
3,620✔
1023
                shard.buf.Trunc(blen)
×
1024
                shard.bufmut.Unlock()
×
1025
                if f := conn.fetchFuture(reqid); f == fut {
×
1026
                        fut.SetError(err)
×
1027
                        conn.markDone(fut)
×
1028
                } else if f != nil {
×
1029
                        /* in theory, it is possible. In practice, you have
×
1030
                         * to have race condition that lasts hours */
×
1031
                        panic("Unknown future")
×
1032
                } else {
×
1033
                        fut.wait()
×
1034
                        if fut.err == nil {
×
1035
                                panic("Future removed from queue without error")
×
1036
                        }
1037
                        if _, ok := fut.err.(ClientError); ok {
×
1038
                                // packing error is more important than connection
×
1039
                                // error, because it is indication of programmer's
×
1040
                                // mistake.
×
1041
                                fut.SetError(err)
×
1042
                        }
×
1043
                }
1044
                return
×
1045
        }
1046
        shard.bufmut.Unlock()
3,620✔
1047

3,620✔
1048
        if firstWritten {
7,240✔
1049
                conn.dirtyShard <- shardn
3,620✔
1050
        }
3,620✔
1051

1052
        if req.Async() {
5,054✔
1053
                if fut = conn.fetchFuture(reqid); fut != nil {
2,867✔
1054
                        resp := &Response{
1,433✔
1055
                                RequestId: reqid,
1,433✔
1056
                                Code:      OkCode,
1,433✔
1057
                        }
1,433✔
1058
                        fut.SetResponse(resp)
1,433✔
1059
                        conn.markDone(fut)
1,433✔
1060
                }
1,433✔
1061
        }
1062
}
1063

1064
func (conn *Connection) markDone(fut *Future) {
3,620✔
1065
        if conn.rlimit != nil {
3,620✔
1066
                <-conn.rlimit
×
1067
        }
×
1068
        conn.decrementRequestCnt()
3,620✔
1069
}
1070

1071
func (conn *Connection) peekFuture(reqid uint32) (fut *Future) {
26✔
1072
        shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
26✔
1073
        pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1)
26✔
1074
        shard.rmut.Lock()
26✔
1075
        defer shard.rmut.Unlock()
26✔
1076

26✔
1077
        if conn.opts.Timeout > 0 {
52✔
1078
                if fut = conn.getFutureImp(reqid, true); fut != nil {
52✔
1079
                        pair := &shard.requests[pos]
26✔
1080
                        *pair.last = fut
26✔
1081
                        pair.last = &fut.next
26✔
1082
                        fut.timeout = time.Since(epoch) + conn.opts.Timeout
26✔
1083
                }
26✔
1084
        } else {
×
1085
                fut = conn.getFutureImp(reqid, false)
×
1086
        }
×
1087

1088
        return fut
26✔
1089
}
1090

1091
func (conn *Connection) fetchFuture(reqid uint32) (fut *Future) {
3,622✔
1092
        shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
3,622✔
1093
        shard.rmut.Lock()
3,622✔
1094
        fut = conn.getFutureImp(reqid, true)
3,622✔
1095
        shard.rmut.Unlock()
3,622✔
1096
        return fut
3,622✔
1097
}
3,622✔
1098

1099
func (conn *Connection) getFutureImp(reqid uint32, fetch bool) *Future {
3,648✔
1100
        shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
3,648✔
1101
        pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1)
3,648✔
1102
        // futures with even requests id belong to requests list with nil context
3,648✔
1103
        if reqid%2 == 0 {
7,288✔
1104
                return shard.requests[pos].findFuture(reqid, fetch)
3,640✔
1105
        } else {
3,648✔
1106
                return shard.requestsWithCtx[pos].findFuture(reqid, fetch)
8✔
1107
        }
8✔
1108
}
1109

1110
func (conn *Connection) timeouts() {
648✔
1111
        timeout := conn.opts.Timeout
648✔
1112
        t := time.NewTimer(timeout)
648✔
1113
        for {
1,296✔
1114
                var nowepoch time.Duration
648✔
1115
                select {
648✔
1116
                case <-conn.control:
644✔
1117
                        t.Stop()
644✔
1118
                        return
644✔
UNCOV
1119
                case <-t.C:
×
1120
                }
UNCOV
1121
                minNext := time.Since(epoch) + timeout
×
UNCOV
1122
                for i := range conn.shard {
×
UNCOV
1123
                        nowepoch = time.Since(epoch)
×
UNCOV
1124
                        shard := &conn.shard[i]
×
UNCOV
1125
                        for pos := range shard.requests {
×
UNCOV
1126
                                shard.rmut.Lock()
×
UNCOV
1127
                                pair := &shard.requests[pos]
×
UNCOV
1128
                                for pair.first != nil && pair.first.timeout < nowepoch {
×
1129
                                        shard.bufmut.Lock()
×
1130
                                        fut := pair.first
×
1131
                                        pair.first = fut.next
×
1132
                                        if fut.next == nil {
×
1133
                                                pair.last = &pair.first
×
1134
                                        } else {
×
1135
                                                fut.next = nil
×
1136
                                        }
×
1137
                                        fut.SetError(ClientError{
×
1138
                                                Code: ErrTimeouted,
×
1139
                                                Msg:  fmt.Sprintf("client timeout for request %d", fut.requestId),
×
1140
                                        })
×
1141
                                        conn.markDone(fut)
×
1142
                                        shard.bufmut.Unlock()
×
1143
                                }
UNCOV
1144
                                if pair.first != nil && pair.first.timeout < minNext {
×
UNCOV
1145
                                        minNext = pair.first.timeout
×
UNCOV
1146
                                }
×
UNCOV
1147
                                shard.rmut.Unlock()
×
1148
                        }
1149
                }
UNCOV
1150
                nowepoch = time.Since(epoch)
×
UNCOV
1151
                if nowepoch+time.Microsecond < minNext {
×
UNCOV
1152
                        t.Reset(minNext - nowepoch)
×
UNCOV
1153
                } else {
×
1154
                        t.Reset(time.Microsecond)
×
1155
                }
×
1156
        }
1157
}
1158

1159
func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
5,022✔
1160
        var length int
5,022✔
1161

5,022✔
1162
        if _, err = io.ReadFull(r, lenbuf); err != nil {
5,610✔
1163
                return
588✔
1164
        }
588✔
1165
        if lenbuf[0] != 0xce {
4,428✔
1166
                err = errors.New("wrong response header")
×
1167
                return
×
1168
        }
×
1169
        length = (int(lenbuf[1]) << 24) +
4,428✔
1170
                (int(lenbuf[2]) << 16) +
4,428✔
1171
                (int(lenbuf[3]) << 8) +
4,428✔
1172
                int(lenbuf[4])
4,428✔
1173

4,428✔
1174
        if length == 0 {
4,428✔
1175
                err = errors.New("response should not be 0 length")
×
1176
                return
×
1177
        }
×
1178
        response = make([]byte, length)
4,428✔
1179
        _, err = io.ReadFull(r, response)
4,428✔
1180

4,428✔
1181
        return
4,428✔
1182
}
1183

1184
func (conn *Connection) nextRequestId(context bool) (requestId uint32) {
3,668✔
1185
        if context {
3,676✔
1186
                return atomic.AddUint32(&conn.contextRequestId, 2)
8✔
1187
        } else {
3,668✔
1188
                return atomic.AddUint32(&conn.requestId, 2)
3,660✔
1189
        }
3,660✔
1190
}
1191

1192
// Do performs a request asynchronously on the connection.
1193
//
1194
// An error is returned if the request was formed incorrectly, or failed to
1195
// create the future.
1196
func (conn *Connection) Do(req Request) *Future {
3,592✔
1197
        if connectedReq, ok := req.(ConnectedRequest); ok {
3,602✔
1198
                if connectedReq.Conn() != conn {
12✔
1199
                        fut := NewFuture()
2✔
1200
                        fut.SetError(errUnknownRequest)
2✔
1201
                        return fut
2✔
1202
                }
2✔
1203
        }
1204
        return conn.send(req, ignoreStreamId)
3,590✔
1205
}
1206

1207
// ConfiguredTimeout returns a timeout from connection config.
1208
func (conn *Connection) ConfiguredTimeout() time.Duration {
×
1209
        return conn.opts.Timeout
×
1210
}
×
1211

1212
// OverrideSchema sets Schema for the connection.
1213
func (conn *Connection) OverrideSchema(s *Schema) {
×
1214
        if s != nil {
×
1215
                conn.mutex.Lock()
×
1216
                defer conn.mutex.Unlock()
×
1217
                conn.lockShards()
×
1218
                defer conn.unlockShards()
×
1219

×
1220
                conn.Schema = s
×
1221
        }
×
1222
}
1223

1224
// NewPrepared passes a sql statement to Tarantool for preparation synchronously.
1225
func (conn *Connection) NewPrepared(expr string) (*Prepared, error) {
2✔
1226
        req := NewPrepareRequest(expr)
2✔
1227
        resp, err := conn.Do(req).Get()
2✔
1228
        if err != nil {
2✔
1229
                return nil, err
×
1230
        }
×
1231
        return NewPreparedFromResponse(conn, resp)
2✔
1232
}
1233

1234
// NewStream creates new Stream object for connection.
1235
//
1236
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
1237
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
1238
// Since 1.7.0
1239
func (conn *Connection) NewStream() (*Stream, error) {
12✔
1240
        next := atomic.AddUint64(&conn.lastStreamId, 1)
12✔
1241
        return &Stream{
12✔
1242
                Id:   next,
12✔
1243
                Conn: conn,
12✔
1244
        }, nil
12✔
1245
}
12✔
1246

1247
// watchState is the current state of the watcher. See the idea at p. 70, 105:
1248
// https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view
1249
type watchState struct {
1250
        // value is a current value.
1251
        value interface{}
1252
        // version is a current version of the value. The only reason for uint64:
1253
        // go 1.13 has no math.Uint.
1254
        version uint64
1255
        // ack true if the acknowledge is already sent.
1256
        ack bool
1257
        // cnt is a count of active watchers for the key.
1258
        cnt int
1259
        // changed is a channel for broadcast the value changes.
1260
        changed chan struct{}
1261
        // unready channel exists if a state is not ready to work (subscription
1262
        // or unsubscription in progress).
1263
        unready chan struct{}
1264
}
1265

1266
// initWatchEventVersion is an initial version until no events from Tarantool.
1267
const initWatchEventVersion uint64 = 0
1268

1269
// connWatcher is an internal implementation of the Watcher interface.
1270
type connWatcher struct {
1271
        unregister sync.Once
1272
        // done is closed when the watcher is unregistered, but the watcher
1273
        // goroutine is not yet finished.
1274
        done chan struct{}
1275
        // finished is closed when the watcher is unregistered and the watcher
1276
        // goroutine is finished.
1277
        finished chan struct{}
1278
}
1279

1280
// Unregister unregisters the connection watcher.
1281
func (w *connWatcher) Unregister() {
4,762✔
1282
        w.unregister.Do(func() {
7,526✔
1283
                close(w.done)
2,764✔
1284
        })
2,764✔
1285
        <-w.finished
4,762✔
1286
}
1287

1288
// subscribeWatchChannel returns an existing one or a new watch state channel
1289
// for the key. It also increases a counter of active watchers for the channel.
1290
func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error) {
2,768✔
1291
        var st chan watchState
2,768✔
1292

2,768✔
1293
        for st == nil {
5,537✔
1294
                if val, ok := conn.watchMap.Load(key); !ok {
3,426✔
1295
                        st = make(chan watchState, 1)
657✔
1296
                        state := watchState{
657✔
1297
                                value:   nil,
657✔
1298
                                version: initWatchEventVersion,
657✔
1299
                                ack:     false,
657✔
1300
                                cnt:     0,
657✔
1301
                                changed: nil,
657✔
1302
                                unready: make(chan struct{}),
657✔
1303
                        }
657✔
1304
                        st <- state
657✔
1305

657✔
1306
                        if val, loaded := conn.watchMap.LoadOrStore(key, st); !loaded {
1,313✔
1307
                                if _, err := conn.Do(newWatchRequest(key)).Get(); err != nil {
656✔
1308
                                        conn.watchMap.Delete(key)
×
1309
                                        close(state.unready)
×
1310
                                        return nil, err
×
1311
                                }
×
1312
                                // It is a successful subsctiption to a watch events by itself.
1313
                                state = <-st
656✔
1314
                                state.cnt = 1
656✔
1315
                                close(state.unready)
656✔
1316
                                state.unready = nil
656✔
1317
                                st <- state
656✔
1318
                                continue
656✔
1319
                        } else {
1✔
1320
                                close(state.unready)
1✔
1321
                                close(st)
1✔
1322
                                st = val.(chan watchState)
1✔
1323
                        }
1✔
1324
                } else {
2,112✔
1325
                        st = val.(chan watchState)
2,112✔
1326
                }
2,112✔
1327

1328
                // It is an existing channel created outside. It may be in the
1329
                // unready state.
1330
                state := <-st
2,113✔
1331
                if state.unready == nil {
4,225✔
1332
                        state.cnt += 1
2,112✔
1333
                }
2,112✔
1334
                st <- state
2,113✔
1335

2,113✔
1336
                if state.unready != nil {
2,114✔
1337
                        // Wait for an update and retry.
1✔
1338
                        <-state.unready
1✔
1339
                        st = nil
1✔
1340
                }
1✔
1341
        }
1342

1343
        return st, nil
2,768✔
1344
}
1345

1346
func isFeatureInSlice(expected iproto.Feature, actualSlice []iproto.Feature) bool {
3,945✔
1347
        for _, actual := range actualSlice {
21,382✔
1348
                if expected == actual {
20,781✔
1349
                        return true
3,344✔
1350
                }
3,344✔
1351
        }
1352
        return false
601✔
1353
}
1354

1355
// NewWatcher creates a new Watcher object for the connection.
1356
//
1357
// Server must support IPROTO_FEATURE_WATCHERS to use watchers.
1358
//
1359
// After watcher creation, the watcher callback is invoked for the first time.
1360
// In this case, the callback is triggered whether or not the key has already
1361
// been broadcast. All subsequent invocations are triggered with
1362
// box.broadcast() called on the remote host. If a watcher is subscribed for a
1363
// key that has not been broadcast yet, the callback is triggered only once,
1364
// after the registration of the watcher.
1365
//
1366
// The watcher callbacks are always invoked in a separate goroutine. A watcher
1367
// callback is never executed in parallel with itself, but they can be executed
1368
// in parallel to other watchers.
1369
//
1370
// If the key is updated while the watcher callback is running, the callback
1371
// will be invoked again with the latest value as soon as it returns.
1372
//
1373
// Watchers survive reconnection. All registered watchers are automatically
1374
// resubscribed when the connection is reestablished.
1375
//
1376
// Keep in mind that garbage collection of a watcher handle doesn’t lead to the
1377
// watcher’s destruction. In this case, the watcher remains registered. You
1378
// need to call Unregister() directly.
1379
//
1380
// Unregister() guarantees that there will be no the watcher's callback calls
1381
// after it, but Unregister() call from the callback leads to a deadlock.
1382
//
1383
// See:
1384
// https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_events/#box-watchers
1385
//
1386
// Since 1.10.0
1387
func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, error) {
2,128✔
1388
        // We need to check the feature because the IPROTO_WATCH request is
2,128✔
1389
        // asynchronous. We do not expect any response from a Tarantool instance
2,128✔
1390
        // That's why we can't just check the Tarantool response for an unsupported
2,128✔
1391
        // request error.
2,128✔
1392
        if !isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
2,128✔
1393
                conn.c.ProtocolInfo().Features) {
2,128✔
NEW
1394
                err := fmt.Errorf("the feature %s must be supported by connection "+
×
NEW
1395
                        "to create a watcher", iproto.IPROTO_FEATURE_WATCHERS)
×
UNCOV
1396
                return nil, err
×
UNCOV
1397
        }
×
1398

1399
        return conn.newWatcherImpl(key, callback)
2,128✔
1400
}
1401

1402
func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watcher, error) {
2,768✔
1403
        st, err := subscribeWatchChannel(conn, key)
2,768✔
1404
        if err != nil {
2,768✔
1405
                return nil, err
×
1406
        }
×
1407

1408
        // Start the watcher goroutine.
1409
        done := make(chan struct{})
2,768✔
1410
        finished := make(chan struct{})
2,768✔
1411

2,768✔
1412
        go func() {
5,536✔
1413
                version := initWatchEventVersion
2,768✔
1414
                for {
7,259✔
1415
                        state := <-st
4,491✔
1416
                        if state.changed == nil {
5,939✔
1417
                                state.changed = make(chan struct{})
1,448✔
1418
                        }
1,448✔
1419
                        st <- state
4,491✔
1420

4,491✔
1421
                        if state.version != version {
7,505✔
1422
                                callback(WatchEvent{
3,014✔
1423
                                        Conn:  conn,
3,014✔
1424
                                        Key:   key,
3,014✔
1425
                                        Value: state.value,
3,014✔
1426
                                })
3,014✔
1427
                                version = state.version
3,014✔
1428

3,014✔
1429
                                // Do we need to acknowledge the notification?
3,014✔
1430
                                state = <-st
3,014✔
1431
                                sendAck := !state.ack && version == state.version
3,014✔
1432
                                if sendAck {
3,806✔
1433
                                        state.ack = true
792✔
1434
                                }
792✔
1435
                                st <- state
3,014✔
1436

3,014✔
1437
                                if sendAck {
3,806✔
1438
                                        // We expect a reconnect and re-subscribe if it fails to
792✔
1439
                                        // send the watch request. So it looks ok do not check a
792✔
1440
                                        // result. But we need to make sure that the re-watch
792✔
1441
                                        // request will not be finished by a small per-request
792✔
1442
                                        // timeout.
792✔
1443
                                        req := newWatchRequest(key).Context(context.Background())
792✔
1444
                                        conn.Do(req).Get()
792✔
1445
                                }
792✔
1446
                        }
1447

1448
                        select {
4,491✔
1449
                        case <-done:
2,764✔
1450
                                state := <-st
2,764✔
1451
                                state.cnt -= 1
2,764✔
1452
                                if state.cnt == 0 {
3,416✔
1453
                                        state.unready = make(chan struct{})
652✔
1454
                                }
652✔
1455
                                st <- state
2,764✔
1456

2,764✔
1457
                                if state.cnt == 0 {
3,416✔
1458
                                        // The last one sends IPROTO_UNWATCH.
652✔
1459
                                        if !conn.ClosedNow() {
668✔
1460
                                                // conn.ClosedNow() check is a workaround for calling
16✔
1461
                                                // Unregister from connectionClose().
16✔
1462
                                                //
16✔
1463
                                                // We need to make sure that the unwatch request will
16✔
1464
                                                // not be finished by a small per-request timeout to
16✔
1465
                                                // avoid lost of the request.
16✔
1466
                                                req := newUnwatchRequest(key).Context(context.Background())
16✔
1467
                                                conn.Do(req).Get()
16✔
1468
                                        }
16✔
1469
                                        conn.watchMap.Delete(key)
652✔
1470
                                        close(state.unready)
652✔
1471
                                }
1472

1473
                                close(finished)
2,764✔
1474
                                return
2,764✔
1475
                        case <-state.changed:
1,723✔
1476
                        }
1477
                }
1478
        }()
1479

1480
        return &connWatcher{
2,768✔
1481
                done:     done,
2,768✔
1482
                finished: finished,
2,768✔
1483
        }, nil
2,768✔
1484
}
1485

1486
// ProtocolInfo returns protocol version and protocol features
1487
// supported by connected Tarantool server. Beware that values might be
1488
// outdated if connection is in a disconnected state.
1489
// Since 1.10.0
1490
func (conn *Connection) ProtocolInfo() ProtocolInfo {
10✔
1491
        return conn.serverProtocolInfo.Clone()
10✔
1492
}
10✔
1493

1494
func shutdownEventCallback(event WatchEvent) {
754✔
1495
        // Receives "true" on server shutdown.
754✔
1496
        // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
754✔
1497
        // step 2.
754✔
1498
        val, ok := event.Value.(bool)
754✔
1499
        if ok && val {
882✔
1500
                go event.Conn.shutdown(false)
128✔
1501
        }
128✔
1502
}
1503

1504
func (conn *Connection) shutdown(forever bool) error {
132✔
1505
        // Forbid state changes.
132✔
1506
        conn.mutex.Lock()
132✔
1507
        defer conn.mutex.Unlock()
132✔
1508

132✔
1509
        if !atomic.CompareAndSwapUint32(&conn.state, connConnected, connShutdown) {
132✔
1510
                if forever {
×
1511
                        err := ClientError{ErrConnectionClosed, "connection closed by client"}
×
1512
                        return conn.closeConnection(err, true)
×
1513
                }
×
1514
                return nil
×
1515
        }
1516

1517
        if forever {
136✔
1518
                // We don't want to reconnect any more.
4✔
1519
                conn.opts.Reconnect = 0
4✔
1520
                conn.opts.MaxReconnects = 0
4✔
1521
        }
4✔
1522

1523
        conn.cond.Broadcast()
132✔
1524
        conn.notify(Shutdown)
132✔
1525

132✔
1526
        c := conn.c
132✔
1527
        for {
376✔
1528
                if (atomic.LoadUint32(&conn.state) != connShutdown) || (c != conn.c) {
252✔
1529
                        return nil
8✔
1530
                }
8✔
1531
                if atomic.LoadInt64(&conn.requestCnt) == 0 {
360✔
1532
                        break
124✔
1533
                }
1534
                // Use cond var on conn.mutex since request execution may
1535
                // call reconnect(). It is ok if state changes as part of
1536
                // reconnect since Tarantool server won't allow to reconnect
1537
                // in the middle of shutting down.
1538
                conn.cond.Wait()
112✔
1539
        }
1540

1541
        if forever {
126✔
1542
                err := ClientError{ErrConnectionClosed, "connection closed by client"}
2✔
1543
                return conn.closeConnection(err, true)
2✔
1544
        } else {
124✔
1545
                // Start to reconnect based on common rules, same as in net.box.
122✔
1546
                // Reconnect also closes the connection: server waits until all
122✔
1547
                // subscribed connections are terminated.
122✔
1548
                // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
122✔
1549
                // step 3.
122✔
1550
                conn.reconnectImpl(ClientError{
122✔
1551
                        ErrConnectionClosed,
122✔
1552
                        "connection closed after server shutdown",
122✔
1553
                }, conn.c)
122✔
1554
                return nil
122✔
1555
        }
122✔
1556
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc