• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / go-tarantool / 19481643557

18 Nov 2025 09:54PM UTC coverage: 75.195% (+0.05%) from 75.146%
19481643557

Pull #498

github

babyTsakhes
crud: refactor optional types to use go-option

- Remove: OptUint, OptInt, OptFloat64, OptString, OptBool, OptTuple.
- Remove: MakeOptUint, MakeOptInt, MakeOptFloat64, MakeOptString, MakeOptBool, MakeOptTuple.
- Add: OptAny = option.Generic[interface{}] .
- Add: MakeOptAny constructor.
- Update: All option structs to use option.* types.
- Fix: Test type inconsistencies.

Changed #492
Pull Request #498: crud: refactor optional types to use go-option

3083 of 4100 relevant lines covered (75.2%)

9795.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.81
/connection.go
1
// Package with implementation of methods and structures for work with
2
// Tarantool instance.
3
package tarantool
4

5
import (
6
        "context"
7
        "encoding/binary"
8
        "errors"
9
        "fmt"
10
        "io"
11
        "log"
12
        "math"
13
        "net"
14
        "runtime"
15
        "sync"
16
        "sync/atomic"
17
        "time"
18

19
        "github.com/tarantool/go-iproto"
20
        "github.com/vmihailenco/msgpack/v5"
21
)
22

23
const requestsMap = 128
24
const ignoreStreamId = 0
25
const (
26
        connDisconnected = 0
27
        connConnected    = 1
28
        connShutdown     = 2
29
        connClosed       = 3
30
)
31

32
const shutdownEventKey = "box.shutdown"
33

34
type ConnEventKind int
35
type ConnLogKind int
36

37
var (
38
        errUnknownRequest = errors.New("the passed connected request doesn't belong " +
39
                "to the current connection or connection pool")
40
)
41

42
const (
43
        // Connected signals that connection is established or reestablished.
44
        Connected ConnEventKind = iota + 1
45
        // Disconnected signals that connection is broken.
46
        Disconnected
47
        // ReconnectFailed signals that attempt to reconnect has failed.
48
        ReconnectFailed
49
        // Shutdown signals that shutdown callback is processing.
50
        Shutdown
51
        // Either reconnect attempts exhausted, or explicit Close is called.
52
        Closed
53

54
        // LogReconnectFailed is logged when reconnect attempt failed.
55
        LogReconnectFailed ConnLogKind = iota + 1
56
        // LogLastReconnectFailed is logged when last reconnect attempt failed,
57
        // connection will be closed after that.
58
        LogLastReconnectFailed
59
        // LogUnexpectedResultId is logged when response with unknown id was received.
60
        // Most probably it is due to request timeout.
61
        LogUnexpectedResultId
62
        // LogWatchEventReadFailed is logged when failed to read a watch event.
63
        LogWatchEventReadFailed
64
        // LogBoxSessionPushUnsupported is logged when response type turned IPROTO_CHUNK.
65
        LogBoxSessionPushUnsupported
66
)
67

68
// ConnEvent is sent throw Notify channel specified in Opts.
69
type ConnEvent struct {
70
        Conn *Connection
71
        Kind ConnEventKind
72
        When time.Time
73
}
74

75
// A raw watch event.
76
type connWatchEvent struct {
77
        key   string
78
        value interface{}
79
}
80

81
var epoch = time.Now()
82

83
// Logger is logger type expected to be passed in options.
84
type Logger interface {
85
        Report(event ConnLogKind, conn *Connection, v ...interface{})
86
}
87

88
type defaultLogger struct{}
89

90
func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interface{}) {
83✔
91
        switch event {
83✔
92
        case LogReconnectFailed:
80✔
93
                reconnects := v[0].(uint)
80✔
94
                err := v[1].(error)
80✔
95
                addr := conn.Addr()
80✔
96
                if addr == nil {
101✔
97
                        log.Printf("tarantool: connect (%d/%d) failed: %s",
21✔
98
                                reconnects, conn.opts.MaxReconnects, err)
21✔
99
                } else {
80✔
100
                        log.Printf("tarantool: reconnect (%d/%d) to %s failed: %s",
59✔
101
                                reconnects, conn.opts.MaxReconnects, addr, err)
59✔
102
                }
59✔
103
        case LogLastReconnectFailed:
1✔
104
                err := v[0].(error)
1✔
105
                addr := conn.Addr()
1✔
106
                if addr == nil {
2✔
107
                        log.Printf("tarantool: last connect failed: %s, giving it up",
1✔
108
                                err)
1✔
109
                } else {
1✔
110
                        log.Printf("tarantool: last reconnect to %s failed: %s, giving it up",
×
111
                                addr, err)
×
112
                }
×
113
        case LogUnexpectedResultId:
1✔
114
                header := v[0].(Header)
1✔
115
                log.Printf("tarantool: connection %s got unexpected request ID (%d) in response "+
1✔
116
                        "(probably cancelled request)",
1✔
117
                        conn.Addr(), header.RequestId)
1✔
118
        case LogWatchEventReadFailed:
×
119
                err := v[0].(error)
×
120
                log.Printf("tarantool: unable to parse watch event: %s", err)
×
121
        case LogBoxSessionPushUnsupported:
1✔
122
                header := v[0].(Header)
1✔
123
                log.Printf("tarantool: unsupported box.session.push() for request %d", header.RequestId)
1✔
124
        default:
×
125
                args := append([]interface{}{"tarantool: unexpected event ", event, conn}, v...)
×
126
                log.Print(args...)
×
127
        }
128
}
129

130
// Connection is a handle with a single connection to a Tarantool instance.
131
//
132
// It is created and configured with Connect function, and could not be
133
// reconfigured later.
134
//
135
// Connection could be in three possible states:
136
//
137
// - In "Connected" state it sends queries to Tarantool.
138
//
139
// - In "Disconnected" state it rejects queries with ClientError{Code:
140
// ErrConnectionNotReady}
141
//
142
// - In "Shutdown" state it rejects queries with ClientError{Code:
143
// ErrConnectionShutdown}. The state indicates that a graceful shutdown
144
// in progress. The connection waits for all active requests to
145
// complete.
146
//
147
// - In "Closed" state it rejects queries with ClientError{Code:
148
// ErrConnectionClosed}. Connection could become "Closed" when
149
// Connection.Close() method called, or when Tarantool disconnected and
150
// Reconnect pause is not specified or MaxReconnects is specified and
151
// MaxReconnect reconnect attempts already performed.
152
//
153
// You may perform data manipulation operation by calling its methods:
154
// Call*, Insert*, Replace*, Update*, Upsert*, Call*, Eval*.
155
//
156
// In any method that accepts space you my pass either space number or space
157
// name (in this case it will be looked up in schema). Same is true for index.
158
//
159
// ATTENTION: tuple, key, ops and args arguments for any method should be
160
// and array or should serialize to msgpack array.
161
//
162
// ATTENTION: result argument for *Typed methods should deserialize from
163
// msgpack array, cause Tarantool always returns result as an array.
164
// For all space related methods and Call16* (but not Call17*) methods Tarantool
165
// always returns array of array (array of tuples for space related methods).
166
// For Eval* and Call* Tarantool always returns array, but does not forces
167
// array of arrays.
168
//
169
// If connected to Tarantool 2.10 or newer, connection supports server graceful
170
// shutdown. In this case, server will wait until all client requests will be
171
// finished and client disconnects before going down (server also may go down
172
// by timeout). Client reconnect will happen if connection options enable
173
// reconnect. Beware that graceful shutdown event initialization is asynchronous.
174
//
175
// More on graceful shutdown:
176
// https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
177
type Connection struct {
178
        addr   net.Addr
179
        dialer Dialer
180
        c      Conn
181
        mutex  sync.Mutex
182
        cond   *sync.Cond
183
        // schemaResolver contains a SchemaResolver implementation.
184
        schemaResolver SchemaResolver
185
        // requestId contains the last request ID for requests with nil context.
186
        requestId uint32
187
        // contextRequestId contains the last request ID for requests with context.
188
        contextRequestId uint32
189
        // Greeting contains first message sent by Tarantool.
190
        Greeting *Greeting
191

192
        shard      []connShard
193
        dirtyShard chan uint32
194

195
        control chan struct{}
196
        rlimit  chan struct{}
197
        opts    Opts
198
        state   uint32
199
        dec     *msgpack.Decoder
200
        lenbuf  [packetLengthBytes]byte
201

202
        lastStreamId uint64
203

204
        serverProtocolInfo ProtocolInfo
205
        // watchMap is a map of key -> chan watchState.
206
        watchMap sync.Map
207

208
        // shutdownWatcher is the "box.shutdown" event watcher.
209
        shutdownWatcher Watcher
210
        // requestCnt is a counter of active requests.
211
        requestCnt int64
212
}
213

214
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
215

216
type futureList struct {
217
        first *Future
218
        last  **Future
219
}
220

221
func (list *futureList) findFuture(reqid uint32, fetch bool) *Future {
1,868✔
222
        root := &list.first
1,868✔
223
        for {
3,736✔
224
                fut := *root
1,868✔
225
                if fut == nil {
1,890✔
226
                        return nil
22✔
227
                }
22✔
228
                if fut.requestId == reqid {
3,692✔
229
                        if fetch {
3,692✔
230
                                *root = fut.next
1,846✔
231
                                if fut.next == nil {
3,692✔
232
                                        list.last = root
1,846✔
233
                                } else {
1,846✔
234
                                        fut.next = nil
×
235
                                }
×
236
                        }
237
                        return fut
1,846✔
238
                }
239
                root = &fut.next
×
240
        }
241
}
242

243
func (list *futureList) addFuture(fut *Future) {
1,864✔
244
        *list.last = fut
1,864✔
245
        list.last = &fut.next
1,864✔
246
}
1,864✔
247

248
func (list *futureList) clear(err error, conn *Connection) {
2,666,496✔
249
        fut := list.first
2,666,496✔
250
        list.first = nil
2,666,496✔
251
        list.last = &list.first
2,666,496✔
252
        for fut != nil {
2,666,514✔
253
                fut.SetError(err)
18✔
254
                conn.markDone(fut)
18✔
255
                fut, fut.next = fut.next, nil
18✔
256
        }
18✔
257
}
258

259
type connShard struct {
260
        rmut            sync.Mutex
261
        requests        [requestsMap]futureList
262
        requestsWithCtx [requestsMap]futureList
263
        bufmut          sync.Mutex
264
        buf             smallWBuf
265
        enc             *msgpack.Encoder
266
}
267

268
// RLimitActions is an enumeration type for an action to do when a rate limit
269
// is reached.
270
type RLimitAction int
271

272
const (
273
        // RLimitDrop immediately aborts the request.
274
        RLimitDrop RLimitAction = iota
275
        // RLimitWait waits during timeout period for some request to be answered.
276
        // If no request answered during timeout period, this request is aborted.
277
        // If no timeout period is set, it will wait forever.
278
        RLimitWait
279
)
280

281
// Opts is a way to configure Connection
282
type Opts struct {
283
        // Timeout for response to a particular request. The timeout is reset when
284
        // push messages are received. If Timeout is zero, any request can be
285
        // blocked infinitely.
286
        // Also used to setup net.TCPConn.Set(Read|Write)Deadline.
287
        //
288
        // Pay attention, when using contexts with request objects,
289
        // the timeout option for Connection does not affect the lifetime
290
        // of the request. For those purposes use context.WithTimeout() as
291
        // the root context.
292
        Timeout time.Duration
293
        // Timeout between reconnect attempts. If Reconnect is zero, no
294
        // reconnect attempts will be made.
295
        // If specified, then when Tarantool is not reachable or disconnected,
296
        // new connect attempt is performed after pause.
297
        // By default, no reconnection attempts are performed,
298
        // so once disconnected, connection becomes Closed.
299
        Reconnect time.Duration
300
        // Maximum number of reconnect failures; after that we give it up to
301
        // on. If MaxReconnects is zero, the client will try to reconnect
302
        // endlessly.
303
        // After MaxReconnects attempts Connection becomes closed.
304
        MaxReconnects uint
305
        // RateLimit limits number of 'in-fly' request, i.e. already put into
306
        // requests queue, but not yet answered by server or timeouted.
307
        // It is disabled by default.
308
        // See RLimitAction for possible actions when RateLimit.reached.
309
        RateLimit uint
310
        // RLimitAction tells what to do when RateLimit is reached.
311
        // It is required if RateLimit is specified.
312
        RLimitAction RLimitAction
313
        // Concurrency is amount of separate mutexes for request
314
        // queues and buffers inside of connection.
315
        // It is rounded up to nearest power of 2.
316
        // By default it is runtime.GOMAXPROCS(-1) * 4
317
        Concurrency uint32
318
        // SkipSchema disables schema loading. Without disabling schema loading,
319
        // there is no way to create Connection for currently not accessible Tarantool.
320
        SkipSchema bool
321
        // Notify is a channel which receives notifications about Connection status
322
        // changes.
323
        Notify chan<- ConnEvent
324
        // Handle is user specified value, that could be retrivied with
325
        // Handle() method.
326
        Handle interface{}
327
        // Logger is user specified logger used for error messages.
328
        Logger Logger
329
}
330

331
// Connect creates and configures a new Connection.
332
func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, err error) {
349✔
333
        conn = &Connection{
349✔
334
                dialer:           dialer,
349✔
335
                requestId:        0,
349✔
336
                contextRequestId: 1,
349✔
337
                Greeting:         &Greeting{},
349✔
338
                control:          make(chan struct{}),
349✔
339
                opts:             opts,
349✔
340
                dec:              msgpack.NewDecoder(&smallBuf{}),
349✔
341
        }
349✔
342
        maxprocs := uint32(runtime.GOMAXPROCS(-1))
349✔
343
        if conn.opts.Concurrency == 0 || conn.opts.Concurrency > maxprocs*128 {
696✔
344
                conn.opts.Concurrency = maxprocs * 4
347✔
345
        }
347✔
346
        if c := conn.opts.Concurrency; c&(c-1) != 0 {
349✔
347
                for i := uint(1); i < 32; i *= 2 {
×
348
                        c |= c >> i
×
349
                }
×
350
                conn.opts.Concurrency = c + 1
×
351
        }
352
        conn.dirtyShard = make(chan uint32, conn.opts.Concurrency*2)
349✔
353
        conn.shard = make([]connShard, conn.opts.Concurrency)
349✔
354
        for i := range conn.shard {
5,965✔
355
                shard := &conn.shard[i]
5,616✔
356
                requestsLists := []*[requestsMap]futureList{&shard.requests, &shard.requestsWithCtx}
5,616✔
357
                for _, requests := range requestsLists {
16,848✔
358
                        for j := range requests {
1,448,928✔
359
                                requests[j].last = &requests[j].first
1,437,696✔
360
                        }
1,437,696✔
361
                }
362
        }
363

364
        if conn.opts.RateLimit > 0 {
349✔
365
                conn.rlimit = make(chan struct{}, conn.opts.RateLimit)
×
366
                if conn.opts.RLimitAction != RLimitDrop && conn.opts.RLimitAction != RLimitWait {
×
367
                        return nil, errors.New("RLimitAction should be specified to RLimitDone nor RLimitWait")
×
368
                }
×
369
        }
370

371
        if conn.opts.Logger == nil {
698✔
372
                conn.opts.Logger = defaultLogger{}
349✔
373
        }
349✔
374

375
        conn.cond = sync.NewCond(&conn.mutex)
349✔
376

349✔
377
        if conn.opts.Reconnect > 0 {
464✔
378
                // We don't need these mutex.Lock()/mutex.Unlock() here, but
115✔
379
                // runReconnects() expects mutex.Lock() to be set, so it's
115✔
380
                // easier to add them instead of reworking runReconnects().
115✔
381
                conn.mutex.Lock()
115✔
382
                err = conn.runReconnects(ctx)
115✔
383
                conn.mutex.Unlock()
115✔
384
                if err != nil {
118✔
385
                        return nil, err
3✔
386
                }
3✔
387
        } else {
234✔
388
                if err = conn.connect(ctx); err != nil {
257✔
389
                        return nil, err
23✔
390
                }
23✔
391
        }
392

393
        go conn.pinger()
323✔
394
        if conn.opts.Timeout > 0 {
645✔
395
                go conn.timeouts()
322✔
396
        }
322✔
397

398
        // TODO: reload schema after reconnect.
399
        if !conn.opts.SkipSchema {
624✔
400
                schema, err := GetSchema(conn)
301✔
401
                if err != nil {
346✔
402
                        conn.mutex.Lock()
45✔
403
                        defer conn.mutex.Unlock()
45✔
404
                        conn.closeConnection(err, true)
45✔
405
                        return nil, err
45✔
406
                }
45✔
407
                conn.SetSchema(schema)
256✔
408
        }
409

410
        return conn, err
278✔
411
}
412

413
// ConnectedNow reports if connection is established at the moment.
414
func (conn *Connection) ConnectedNow() bool {
6✔
415
        return atomic.LoadUint32(&conn.state) == connConnected
6✔
416
}
6✔
417

418
// ClosedNow reports if connection is closed by user or after reconnect.
419
func (conn *Connection) ClosedNow() bool {
327✔
420
        return atomic.LoadUint32(&conn.state) == connClosed
327✔
421
}
327✔
422

423
// Close closes Connection.
424
// After this method called, there is no way to reopen this Connection.
425
func (conn *Connection) Close() error {
279✔
426
        err := ClientError{ErrConnectionClosed, "connection closed by client"}
279✔
427
        conn.mutex.Lock()
279✔
428
        defer conn.mutex.Unlock()
279✔
429
        return conn.closeConnection(err, true)
279✔
430
}
279✔
431

432
// CloseGraceful closes Connection gracefully. It waits for all requests to
433
// complete.
434
// After this method called, there is no way to reopen this Connection.
435
func (conn *Connection) CloseGraceful() error {
2✔
436
        return conn.shutdown(true)
2✔
437
}
2✔
438

439
// Addr returns a configured address of Tarantool socket.
440
func (conn *Connection) Addr() net.Addr {
83✔
441
        return conn.addr
83✔
442
}
83✔
443

444
// Handle returns a user-specified handle from Opts.
445
func (conn *Connection) Handle() interface{} {
×
446
        return conn.opts.Handle
×
447
}
×
448

449
func (conn *Connection) cancelFuture(fut *Future, err error) {
7✔
450
        if fut = conn.fetchFuture(fut.requestId); fut != nil {
7✔
451
                fut.SetError(err)
×
452
                conn.markDone(fut)
×
453
        }
×
454
}
455

456
func (conn *Connection) dial(ctx context.Context) error {
431✔
457
        opts := conn.opts
431✔
458

431✔
459
        var c Conn
431✔
460
        c, err := conn.dialer.Dial(ctx, DialOpts{
431✔
461
                IoTimeout: opts.Timeout,
431✔
462
        })
431✔
463
        if err != nil {
536✔
464
                return err
105✔
465
        }
105✔
466

467
        conn.addr = c.Addr()
326✔
468
        connGreeting := c.Greeting()
326✔
469
        conn.Greeting.Version = connGreeting.Version
326✔
470
        conn.Greeting.Salt = connGreeting.Salt
326✔
471
        conn.serverProtocolInfo = c.ProtocolInfo()
326✔
472

326✔
473
        if conn.schemaResolver == nil {
649✔
474
                namesSupported := isFeatureInSlice(iproto.IPROTO_FEATURE_SPACE_AND_INDEX_NAMES,
323✔
475
                        conn.serverProtocolInfo.Features)
323✔
476

323✔
477
                conn.schemaResolver = &noSchemaResolver{
323✔
478
                        SpaceAndIndexNamesSupported: namesSupported,
323✔
479
                }
323✔
480
        }
323✔
481

482
        // Watchers.
483
        conn.watchMap.Range(func(key, value interface{}) bool {
330✔
484
                st := value.(chan watchState)
4✔
485
                state := <-st
4✔
486
                if state.unready != nil {
4✔
487
                        st <- state
×
488
                        return true
×
489
                }
×
490

491
                req := newWatchRequest(key.(string))
4✔
492
                if err = writeRequest(ctx, c, req); err != nil {
4✔
493
                        st <- state
×
494
                        return false
×
495
                }
×
496
                state.ack = true
4✔
497

4✔
498
                st <- state
4✔
499
                return true
4✔
500
        })
501

502
        if err != nil {
326✔
503
                c.Close()
×
504
                return fmt.Errorf("unable to register watch: %w", err)
×
505
        }
×
506

507
        // Only if connected and fully initialized.
508
        conn.lockShards()
326✔
509
        conn.c = c
326✔
510
        atomic.StoreUint32(&conn.state, connConnected)
326✔
511
        conn.cond.Broadcast()
326✔
512
        conn.unlockShards()
326✔
513
        go conn.writer(c, c)
326✔
514
        go conn.reader(c, c)
326✔
515

326✔
516
        // Subscribe shutdown event to process graceful shutdown.
326✔
517
        if conn.shutdownWatcher == nil &&
326✔
518
                isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
326✔
519
                        conn.serverProtocolInfo.Features) {
644✔
520
                watcher, werr := conn.newWatcherImpl(shutdownEventKey, shutdownEventCallback)
318✔
521
                if werr != nil {
318✔
522
                        return werr
×
523
                }
×
524
                conn.shutdownWatcher = watcher
318✔
525
        }
526

527
        return nil
326✔
528
}
529

530
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
531
        req Request, streamId uint64, res SchemaResolver) (err error) {
2,535✔
532
        const uint32Code = 0xce
2,535✔
533
        const uint64Code = 0xcf
2,535✔
534
        const streamBytesLenUint64 = 10
2,535✔
535
        const streamBytesLenUint32 = 6
2,535✔
536

2,535✔
537
        hl := h.Len()
2,535✔
538

2,535✔
539
        var streamBytesLen = 0
2,535✔
540
        var streamBytes [streamBytesLenUint64]byte
2,535✔
541
        hMapLen := byte(0x82) // 2 element map.
2,535✔
542
        if streamId != ignoreStreamId {
2,582✔
543
                hMapLen = byte(0x83) // 3 element map.
47✔
544
                streamBytes[0] = byte(iproto.IPROTO_STREAM_ID)
47✔
545
                if streamId > math.MaxUint32 {
49✔
546
                        streamBytesLen = streamBytesLenUint64
2✔
547
                        streamBytes[1] = uint64Code
2✔
548
                        binary.BigEndian.PutUint64(streamBytes[2:], streamId)
2✔
549
                } else {
47✔
550
                        streamBytesLen = streamBytesLenUint32
45✔
551
                        streamBytes[1] = uint32Code
45✔
552
                        binary.BigEndian.PutUint32(streamBytes[2:], uint32(streamId))
45✔
553
                }
45✔
554
        }
555

556
        hBytes := append([]byte{
2,535✔
557
                uint32Code, 0, 0, 0, 0, // Length.
2,535✔
558
                hMapLen,
2,535✔
559
                byte(iproto.IPROTO_REQUEST_TYPE), byte(req.Type()), // Request type.
2,535✔
560
                byte(iproto.IPROTO_SYNC), uint32Code,
2,535✔
561
                byte(reqid >> 24), byte(reqid >> 16),
2,535✔
562
                byte(reqid >> 8), byte(reqid),
2,535✔
563
        }, streamBytes[:streamBytesLen]...)
2,535✔
564

2,535✔
565
        h.Write(hBytes)
2,535✔
566

2,535✔
567
        if err = req.Body(res, enc); err != nil {
2,535✔
568
                return
×
569
        }
×
570

571
        l := uint32(h.Len() - 5 - hl)
2,535✔
572
        h.b[hl+1] = byte(l >> 24)
2,535✔
573
        h.b[hl+2] = byte(l >> 16)
2,535✔
574
        h.b[hl+3] = byte(l >> 8)
2,535✔
575
        h.b[hl+4] = byte(l)
2,535✔
576

2,535✔
577
        return
2,535✔
578
}
579

580
func (conn *Connection) connect(ctx context.Context) error {
485✔
581
        var err error
485✔
582
        if conn.c == nil && conn.state == connDisconnected {
916✔
583
                if err = conn.dial(ctx); err == nil {
757✔
584
                        conn.notify(Connected)
326✔
585
                        return nil
326✔
586
                }
326✔
587
        }
588
        if conn.state == connClosed {
213✔
589
                err = ClientError{ErrConnectionClosed, "using closed connection"}
54✔
590
        }
54✔
591
        return err
159✔
592
}
593

594
func (conn *Connection) closeConnection(neterr error, forever bool) (err error) {
648✔
595
        conn.lockShards()
648✔
596
        defer conn.unlockShards()
648✔
597
        if forever {
1,239✔
598
                if conn.state != connClosed {
912✔
599
                        close(conn.control)
321✔
600
                        atomic.StoreUint32(&conn.state, connClosed)
321✔
601
                        conn.cond.Broadcast()
321✔
602
                        // Free the resources.
321✔
603
                        if conn.shutdownWatcher != nil {
637✔
604
                                go conn.shutdownWatcher.Unregister()
316✔
605
                                conn.shutdownWatcher = nil
316✔
606
                        }
316✔
607
                        conn.notify(Closed)
321✔
608
                }
609
        } else {
57✔
610
                atomic.StoreUint32(&conn.state, connDisconnected)
57✔
611
                conn.cond.Broadcast()
57✔
612
                conn.notify(Disconnected)
57✔
613
        }
57✔
614
        if conn.c != nil {
972✔
615
                err = conn.c.Close()
324✔
616
                conn.c = nil
324✔
617
        }
324✔
618
        for i := range conn.shard {
11,064✔
619
                conn.shard[i].buf.Reset()
10,416✔
620
                requestsLists := []*[requestsMap]futureList{
10,416✔
621
                        &conn.shard[i].requests,
10,416✔
622
                        &conn.shard[i].requestsWithCtx,
10,416✔
623
                }
10,416✔
624
                for _, requests := range requestsLists {
31,248✔
625
                        for pos := range requests {
2,687,328✔
626
                                requests[pos].clear(neterr, conn)
2,666,496✔
627
                        }
2,666,496✔
628
                }
629
        }
630
        return
648✔
631
}
632

633
func (conn *Connection) getDialTimeout() time.Duration {
172✔
634
        dialTimeout := conn.opts.Reconnect / 2
172✔
635
        if dialTimeout == 0 {
172✔
636
                dialTimeout = 500 * time.Millisecond
×
637
        } else if dialTimeout > 5*time.Second {
172✔
638
                dialTimeout = 5 * time.Second
×
639
        }
×
640
        return dialTimeout
172✔
641
}
642

643
func (conn *Connection) runReconnects(ctx context.Context) error {
172✔
644
        dialTimeout := conn.getDialTimeout()
172✔
645
        var reconnects uint
172✔
646
        var err error
172✔
647

172✔
648
        t := time.NewTicker(conn.opts.Reconnect)
172✔
649
        defer t.Stop()
172✔
650
        for conn.opts.MaxReconnects == 0 || reconnects <= conn.opts.MaxReconnects {
423✔
651
                localCtx, cancel := context.WithTimeout(ctx, dialTimeout)
251✔
652
                err = conn.connect(localCtx)
251✔
653
                cancel()
251✔
654

251✔
655
                if err != nil {
387✔
656
                        // The error will most likely be the one that Dialer
136✔
657
                        // returns to us due to the context being cancelled.
136✔
658
                        // Although this is not guaranteed. For example,
136✔
659
                        // if the dialer may throw another error before checking
136✔
660
                        // the context, and the context has already been
136✔
661
                        // canceled. Or the context was not canceled after
136✔
662
                        // the error was thrown, but before the context was
136✔
663
                        // checked here.
136✔
664
                        if ctx.Err() != nil {
138✔
665
                                return err
2✔
666
                        }
2✔
667
                        if clientErr, ok := err.(ClientError); ok &&
134✔
668
                                clientErr.Code == ErrConnectionClosed {
188✔
669
                                return err
54✔
670
                        }
54✔
671
                } else {
115✔
672
                        return nil
115✔
673
                }
115✔
674

675
                conn.opts.Logger.Report(LogReconnectFailed, conn, reconnects, err)
80✔
676
                conn.notify(ReconnectFailed)
80✔
677
                reconnects++
80✔
678
                conn.mutex.Unlock()
80✔
679

80✔
680
                select {
80✔
681
                case <-ctx.Done():
×
682
                        // Since the context is cancelled, we don't need to do anything.
683
                        // Conn.connect() will return the correct error.
684
                case <-t.C:
80✔
685
                }
686

687
                conn.mutex.Lock()
80✔
688
        }
689

690
        conn.opts.Logger.Report(LogLastReconnectFailed, conn, err)
1✔
691
        // mark connection as closed to avoid reopening by another goroutine
1✔
692
        return ClientError{ErrConnectionClosed, "last reconnect failed"}
1✔
693
}
694

695
func (conn *Connection) reconnectImpl(neterr error, c Conn) {
333✔
696
        if conn.opts.Reconnect > 0 {
454✔
697
                if c == conn.c {
178✔
698
                        conn.closeConnection(neterr, false)
57✔
699
                        if err := conn.runReconnects(context.Background()); err != nil {
111✔
700
                                conn.closeConnection(err, true)
54✔
701
                        }
54✔
702
                }
703
        } else {
212✔
704
                conn.closeConnection(neterr, true)
212✔
705
        }
212✔
706
}
707

708
func (conn *Connection) reconnect(neterr error, c Conn) {
281✔
709
        conn.mutex.Lock()
281✔
710
        defer conn.mutex.Unlock()
281✔
711
        conn.reconnectImpl(neterr, c)
281✔
712
        conn.cond.Broadcast()
281✔
713
}
281✔
714

715
func (conn *Connection) lockShards() {
1,231✔
716
        for i := range conn.shard {
21,039✔
717
                conn.shard[i].rmut.Lock()
19,808✔
718
                conn.shard[i].bufmut.Lock()
19,808✔
719
        }
19,808✔
720
}
721

722
func (conn *Connection) unlockShards() {
1,231✔
723
        for i := range conn.shard {
21,039✔
724
                conn.shard[i].rmut.Unlock()
19,808✔
725
                conn.shard[i].bufmut.Unlock()
19,808✔
726
        }
19,808✔
727
}
728

729
func (conn *Connection) pinger() {
323✔
730
        to := conn.opts.Timeout
323✔
731
        if to == 0 {
324✔
732
                to = 3 * time.Second
1✔
733
        }
1✔
734
        t := time.NewTicker(to / 3)
323✔
735
        defer t.Stop()
323✔
736
        for {
647✔
737
                select {
324✔
738
                case <-conn.control:
321✔
739
                        return
321✔
740
                case <-t.C:
1✔
741
                }
742
                conn.Ping()
1✔
743
        }
744
}
745

746
func (conn *Connection) notify(kind ConnEventKind) {
840✔
747
        if conn.opts.Notify != nil {
848✔
748
                select {
8✔
749
                case conn.opts.Notify <- ConnEvent{Kind: kind, Conn: conn, When: time.Now()}:
×
750
                default:
8✔
751
                }
752
        }
753
}
754

755
func (conn *Connection) writer(w writeFlusher, c Conn) {
326✔
756
        var shardn uint32
326✔
757
        var packet smallWBuf
326✔
758
        for atomic.LoadUint32(&conn.state) != connClosed {
2,497✔
759
                select {
2,171✔
760
                case shardn = <-conn.dirtyShard:
525✔
761
                default:
1,646✔
762
                        runtime.Gosched()
1,646✔
763
                        if len(conn.dirtyShard) == 0 {
3,232✔
764
                                if err := w.Flush(); err != nil {
1,593✔
765
                                        err = ClientError{
7✔
766
                                                ErrIoError,
7✔
767
                                                fmt.Sprintf("failed to flush data to the connection: %s", err),
7✔
768
                                        }
7✔
769
                                        conn.reconnect(err, c)
7✔
770
                                        return
7✔
771
                                }
7✔
772
                        }
773
                        select {
1,639✔
774
                        case shardn = <-conn.dirtyShard:
1,340✔
775
                        case <-conn.control:
297✔
776
                                return
297✔
777
                        }
778
                }
779
                shard := &conn.shard[shardn]
1,865✔
780
                shard.bufmut.Lock()
1,865✔
781
                if conn.c != c {
1,882✔
782
                        conn.dirtyShard <- shardn
17✔
783
                        shard.bufmut.Unlock()
17✔
784
                        return
17✔
785
                }
17✔
786
                packet, shard.buf = shard.buf, packet
1,848✔
787
                shard.bufmut.Unlock()
1,848✔
788
                if packet.Len() == 0 {
1,848✔
789
                        continue
×
790
                }
791
                if _, err := w.Write(packet.b); err != nil {
1,848✔
792
                        err = ClientError{
×
793
                                ErrIoError,
×
794
                                fmt.Sprintf("failed to write data to the connection: %s", err),
×
795
                        }
×
796
                        conn.reconnect(err, c)
×
797
                        return
×
798
                }
×
799
                packet.Reset()
1,848✔
800
        }
801
}
802

803
func readWatchEvent(reader io.Reader) (connWatchEvent, error) {
440✔
804
        keyExist := false
440✔
805
        event := connWatchEvent{}
440✔
806

440✔
807
        d := getDecoder(reader)
440✔
808
        defer putDecoder(d)
440✔
809

440✔
810
        l, err := d.DecodeMapLen()
440✔
811
        if err != nil {
440✔
812
                return event, err
×
813
        }
×
814

815
        for ; l > 0; l-- {
992✔
816
                cd, err := d.DecodeInt()
552✔
817
                if err != nil {
552✔
818
                        return event, err
×
819
                }
×
820

821
                switch iproto.Key(cd) {
552✔
822
                case iproto.IPROTO_EVENT_KEY:
440✔
823
                        if event.key, err = d.DecodeString(); err != nil {
440✔
824
                                return event, err
×
825
                        }
×
826
                        keyExist = true
440✔
827
                case iproto.IPROTO_EVENT_DATA:
112✔
828
                        if event.value, err = d.DecodeInterface(); err != nil {
112✔
829
                                return event, err
×
830
                        }
×
831
                default:
×
832
                        if err = d.Skip(); err != nil {
×
833
                                return event, err
×
834
                        }
×
835
                }
836
        }
837

838
        if !keyExist {
440✔
839
                return event, errors.New("watch event does not have a key")
×
840
        }
×
841

842
        return event, nil
440✔
843
}
844

845
func (conn *Connection) reader(r io.Reader, c Conn) {
326✔
846
        events := make(chan connWatchEvent, 1024)
326✔
847
        defer close(events)
326✔
848

326✔
849
        go conn.eventer(events)
326✔
850

326✔
851
        for atomic.LoadUint32(&conn.state) != connClosed {
2,177✔
852
                respBytes, err := read(r, conn.lenbuf[:])
1,851✔
853
                if err != nil {
2,125✔
854
                        err = ClientError{
274✔
855
                                ErrIoError,
274✔
856
                                fmt.Sprintf("failed to read data from the connection: %s", err),
274✔
857
                        }
274✔
858
                        conn.reconnect(err, c)
274✔
859
                        return
274✔
860
                }
274✔
861
                buf := smallBuf{b: respBytes}
1,575✔
862
                header, code, err := decodeHeader(conn.dec, &buf)
1,575✔
863
                if err != nil {
1,575✔
864
                        err = ClientError{
×
865
                                ErrProtocolError,
×
866
                                fmt.Sprintf("failed to decode IPROTO header: %s", err),
×
867
                        }
×
868
                        conn.reconnect(err, c)
×
869
                        return
×
870
                }
×
871

872
                var fut *Future = nil
1,575✔
873
                if code == iproto.IPROTO_EVENT {
2,015✔
874
                        if event, err := readWatchEvent(&buf); err == nil {
880✔
875
                                events <- event
440✔
876
                        } else {
440✔
877
                                err = ClientError{
×
878
                                        ErrProtocolError,
×
879
                                        fmt.Sprintf("failed to decode IPROTO_EVENT: %s", err),
×
880
                                }
×
881
                                conn.opts.Logger.Report(LogWatchEventReadFailed, conn, err)
×
882
                        }
×
883
                        continue
440✔
884
                } else if code == iproto.IPROTO_CHUNK {
1,136✔
885
                        conn.opts.Logger.Report(LogBoxSessionPushUnsupported, conn, header)
1✔
886
                } else {
1,135✔
887
                        if fut = conn.fetchFuture(header.RequestId); fut != nil {
2,268✔
888
                                if err := fut.SetResponse(header, &buf); err != nil {
1,135✔
889
                                        fut.SetError(fmt.Errorf("failed to set response: %w", err))
1✔
890
                                }
1✔
891
                                conn.markDone(fut)
1,134✔
892
                        }
893
                }
894

895
                if fut == nil {
1,136✔
896
                        conn.opts.Logger.Report(LogUnexpectedResultId, conn, header)
1✔
897
                }
1✔
898
        }
899
}
900

901
// eventer goroutine gets watch events and updates values for watchers.
902
func (conn *Connection) eventer(events <-chan connWatchEvent) {
326✔
903
        for event := range events {
766✔
904
                if value, ok := conn.watchMap.Load(event.key); ok {
834✔
905
                        st := value.(chan watchState)
394✔
906
                        state := <-st
394✔
907
                        state.value = event.value
394✔
908
                        if state.version == math.MaxUint {
394✔
909
                                state.version = initWatchEventVersion + 1
×
910
                        } else {
394✔
911
                                state.version += 1
394✔
912
                        }
394✔
913
                        state.ack = false
394✔
914
                        if state.changed != nil {
788✔
915
                                close(state.changed)
394✔
916
                                state.changed = nil
394✔
917
                        }
394✔
918
                        st <- state
394✔
919
                }
920
                // It is possible to get IPROTO_EVENT after we already send
921
                // IPROTO_UNWATCH due to processing on a Tarantool side or slow
922
                // read from the network, so it looks like an expected behavior.
923
        }
924
}
925

926
func (conn *Connection) newFuture(req Request) (fut *Future) {
1,880✔
927
        ctx := req.Ctx()
1,880✔
928
        fut = NewFuture(req)
1,880✔
929
        if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
1,880✔
930
                select {
×
931
                case conn.rlimit <- struct{}{}:
×
932
                default:
×
933
                        fut.err = ClientError{
×
934
                                ErrRateLimited,
×
935
                                "Request is rate limited on client",
×
936
                        }
×
937
                        fut.ready = nil
×
938
                        fut.done = nil
×
939
                        return
×
940
                }
941
        }
942
        fut.requestId = conn.nextRequestId(ctx != nil)
1,880✔
943
        shardn := fut.requestId & (conn.opts.Concurrency - 1)
1,880✔
944
        shard := &conn.shard[shardn]
1,880✔
945
        shard.rmut.Lock()
1,880✔
946
        switch atomic.LoadUint32(&conn.state) {
1,880✔
947
        case connClosed:
3✔
948
                fut.err = ClientError{
3✔
949
                        ErrConnectionClosed,
3✔
950
                        "using closed connection",
3✔
951
                }
3✔
952
                fut.ready = nil
3✔
953
                fut.done = nil
3✔
954
                shard.rmut.Unlock()
3✔
955
                return
3✔
956
        case connDisconnected:
2✔
957
                fut.err = ClientError{
2✔
958
                        ErrConnectionNotReady,
2✔
959
                        "client connection is not ready",
2✔
960
                }
2✔
961
                fut.ready = nil
2✔
962
                fut.done = nil
2✔
963
                shard.rmut.Unlock()
2✔
964
                return
2✔
965
        case connShutdown:
4✔
966
                fut.err = ClientError{
4✔
967
                        ErrConnectionShutdown,
4✔
968
                        "server shutdown in progress",
4✔
969
                }
4✔
970
                fut.ready = nil
4✔
971
                fut.done = nil
4✔
972
                shard.rmut.Unlock()
4✔
973
                return
4✔
974
        }
975
        pos := (fut.requestId / conn.opts.Concurrency) & (requestsMap - 1)
1,871✔
976
        if ctx != nil {
1,878✔
977
                select {
7✔
978
                case <-ctx.Done():
7✔
979
                        fut.SetError(fmt.Errorf("context is done (request ID %d): %w",
7✔
980
                                fut.requestId, context.Cause(ctx)))
7✔
981
                        shard.rmut.Unlock()
7✔
982
                        return
7✔
983
                default:
×
984
                }
985
                shard.requestsWithCtx[pos].addFuture(fut)
×
986
        } else {
1,864✔
987
                shard.requests[pos].addFuture(fut)
1,864✔
988
                if conn.opts.Timeout > 0 {
3,719✔
989
                        fut.timeout = time.Since(epoch) + conn.opts.Timeout
1,855✔
990
                }
1,855✔
991
        }
992
        shard.rmut.Unlock()
1,864✔
993
        if conn.rlimit != nil && conn.opts.RLimitAction == RLimitWait {
1,864✔
994
                select {
×
995
                case conn.rlimit <- struct{}{}:
×
996
                default:
×
997
                        runtime.Gosched()
×
998
                        select {
×
999
                        case conn.rlimit <- struct{}{}:
×
1000
                        case <-fut.done:
×
1001
                                if fut.err == nil {
×
1002
                                        panic("fut.done is closed, but err is nil")
×
1003
                                }
1004
                        }
1005
                }
1006
        }
1007
        return
1,864✔
1008
}
1009

1010
// This method removes a future from the internal queue if the context
1011
// is "done" before the response is come.
1012
func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
×
1013
        select {
×
1014
        case <-fut.done:
×
1015
        case <-ctx.Done():
×
1016
        }
1017

1018
        select {
×
1019
        case <-fut.done:
×
1020
                return
×
1021
        default:
×
1022
                conn.cancelFuture(fut, fmt.Errorf("context is done (request ID %d): %w",
×
1023
                        fut.requestId, context.Cause(ctx)))
×
1024
        }
1025
}
1026

1027
func (conn *Connection) incrementRequestCnt() {
1,880✔
1028
        atomic.AddInt64(&conn.requestCnt, int64(1))
1,880✔
1029
}
1,880✔
1030

1031
func (conn *Connection) decrementRequestCnt() {
1,873✔
1032
        if atomic.AddInt64(&conn.requestCnt, int64(-1)) == 0 {
3,659✔
1033
                conn.cond.Broadcast()
1,786✔
1034
        }
1,786✔
1035
}
1036

1037
func (conn *Connection) send(req Request, streamId uint64) *Future {
1,880✔
1038
        conn.incrementRequestCnt()
1,880✔
1039

1,880✔
1040
        fut := conn.newFuture(req)
1,880✔
1041
        if fut.ready == nil {
1,889✔
1042
                conn.decrementRequestCnt()
9✔
1043
                return fut
9✔
1044
        }
9✔
1045

1046
        if req.Ctx() != nil {
1,878✔
1047
                select {
7✔
1048
                case <-req.Ctx().Done():
7✔
1049
                        conn.cancelFuture(fut, fmt.Errorf("context is done (request ID %d)", fut.requestId))
7✔
1050
                        return fut
7✔
1051
                default:
×
1052
                }
1053
                go conn.contextWatchdog(fut, req.Ctx())
×
1054
        }
1055
        conn.putFuture(fut, req, streamId)
1,864✔
1056

1,864✔
1057
        return fut
1,864✔
1058
}
1059

1060
func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
1,864✔
1061
        shardn := fut.requestId & (conn.opts.Concurrency - 1)
1,864✔
1062
        shard := &conn.shard[shardn]
1,864✔
1063
        shard.bufmut.Lock()
1,864✔
1064
        select {
1,864✔
1065
        case <-fut.done:
×
1066
                shard.bufmut.Unlock()
×
1067
                return
×
1068
        default:
1,864✔
1069
        }
1070
        firstWritten := shard.buf.Len() == 0
1,864✔
1071
        if shard.buf.Cap() == 0 {
3,461✔
1072
                shard.buf.b = make([]byte, 0, 128)
1,597✔
1073
                shard.enc = msgpack.NewEncoder(&shard.buf)
1,597✔
1074
        }
1,597✔
1075
        blen := shard.buf.Len()
1,864✔
1076
        reqid := fut.requestId
1,864✔
1077
        if err := pack(&shard.buf, shard.enc, reqid, req, streamId, conn.schemaResolver); err != nil {
1,864✔
1078
                shard.buf.Trunc(blen)
×
1079
                shard.bufmut.Unlock()
×
1080
                if f := conn.fetchFuture(reqid); f == fut {
×
1081
                        fut.SetError(err)
×
1082
                        conn.markDone(fut)
×
1083
                } else if f != nil {
×
1084
                        /* in theory, it is possible. In practice, you have
×
1085
                         * to have race condition that lasts hours */
×
1086
                        panic("Unknown future")
×
1087
                } else {
×
1088
                        fut.wait()
×
1089
                        if fut.err == nil {
×
1090
                                panic("Future removed from queue without error")
×
1091
                        }
1092
                        if _, ok := fut.err.(ClientError); ok {
×
1093
                                // packing error is more important than connection
×
1094
                                // error, because it is indication of programmer's
×
1095
                                // mistake.
×
1096
                                fut.SetError(err)
×
1097
                        }
×
1098
                }
1099
                return
×
1100
        }
1101
        shard.bufmut.Unlock()
1,864✔
1102

1,864✔
1103
        if firstWritten {
3,728✔
1104
                conn.dirtyShard <- shardn
1,864✔
1105
        }
1,864✔
1106

1107
        if req.Async() {
2,591✔
1108
                if fut = conn.fetchFuture(reqid); fut != nil {
1,439✔
1109
                        header := Header{
712✔
1110
                                RequestId: reqid,
712✔
1111
                                Error:     ErrorNo,
712✔
1112
                        }
712✔
1113
                        fut.SetResponse(header, nil)
712✔
1114
                        conn.markDone(fut)
712✔
1115
                }
712✔
1116
        }
1117
}
1118

1119
func (conn *Connection) markDone(fut *Future) {
1,864✔
1120
        if conn.rlimit != nil {
1,864✔
1121
                <-conn.rlimit
×
1122
        }
×
1123
        conn.decrementRequestCnt()
1,864✔
1124
}
1125

1126
func (conn *Connection) fetchFuture(reqid uint32) (fut *Future) {
1,868✔
1127
        shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
1,868✔
1128
        shard.rmut.Lock()
1,868✔
1129
        fut = conn.getFutureImp(reqid, true)
1,868✔
1130
        shard.rmut.Unlock()
1,868✔
1131
        return fut
1,868✔
1132
}
1,868✔
1133

1134
func (conn *Connection) getFutureImp(reqid uint32, fetch bool) *Future {
1,868✔
1135
        shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
1,868✔
1136
        pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1)
1,868✔
1137
        // futures with even requests id belong to requests list with nil context
1,868✔
1138
        if reqid%2 == 0 {
3,729✔
1139
                return shard.requests[pos].findFuture(reqid, fetch)
1,861✔
1140
        } else {
1,868✔
1141
                return shard.requestsWithCtx[pos].findFuture(reqid, fetch)
7✔
1142
        }
7✔
1143
}
1144

1145
func (conn *Connection) timeouts() {
322✔
1146
        timeout := conn.opts.Timeout
322✔
1147
        t := time.NewTimer(timeout)
322✔
1148
        for {
644✔
1149
                var nowepoch time.Duration
322✔
1150
                select {
322✔
1151
                case <-conn.control:
321✔
1152
                        t.Stop()
321✔
1153
                        return
321✔
1154
                case <-t.C:
×
1155
                }
1156
                minNext := time.Since(epoch) + timeout
×
1157
                for i := range conn.shard {
×
1158
                        nowepoch = time.Since(epoch)
×
1159
                        shard := &conn.shard[i]
×
1160
                        for pos := range shard.requests {
×
1161
                                shard.rmut.Lock()
×
1162
                                pair := &shard.requests[pos]
×
1163
                                for pair.first != nil && pair.first.timeout < nowepoch {
×
1164
                                        shard.bufmut.Lock()
×
1165
                                        fut := pair.first
×
1166
                                        pair.first = fut.next
×
1167
                                        if fut.next == nil {
×
1168
                                                pair.last = &pair.first
×
1169
                                        } else {
×
1170
                                                fut.next = nil
×
1171
                                        }
×
1172
                                        fut.SetError(ClientError{
×
1173
                                                Code: ErrTimeouted,
×
1174
                                                Msg:  fmt.Sprintf("client timeout for request %d", fut.requestId),
×
1175
                                        })
×
1176
                                        conn.markDone(fut)
×
1177
                                        shard.bufmut.Unlock()
×
1178
                                }
1179
                                if pair.first != nil && pair.first.timeout < minNext {
×
1180
                                        minNext = pair.first.timeout
×
1181
                                }
×
1182
                                shard.rmut.Unlock()
×
1183
                        }
1184
                }
1185
                nowepoch = time.Since(epoch)
×
1186
                if nowepoch+time.Microsecond < minNext {
×
1187
                        t.Reset(minNext - nowepoch)
×
1188
                } else {
×
1189
                        t.Reset(time.Microsecond)
×
1190
                }
×
1191
        }
1192
}
1193

1194
func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
2,516✔
1195
        var length uint64
2,516✔
1196

2,516✔
1197
        if _, err = io.ReadFull(r, lenbuf); err != nil {
2,794✔
1198
                return
278✔
1199
        }
278✔
1200
        if lenbuf[0] != 0xce {
2,236✔
1201
                err = errors.New("wrong response header")
×
1202
                return
×
1203
        }
×
1204
        length = (uint64(lenbuf[1]) << 24) +
2,236✔
1205
                (uint64(lenbuf[2]) << 16) +
2,236✔
1206
                (uint64(lenbuf[3]) << 8) +
2,236✔
1207
                uint64(lenbuf[4])
2,236✔
1208

2,236✔
1209
        switch {
2,236✔
1210
        case length == 0:
×
1211
                err = errors.New("response should not be 0 length")
×
1212
                return
×
1213
        case length > math.MaxUint32:
×
1214
                err = errors.New("response is too big")
×
1215
                return
×
1216
        }
1217

1218
        response = make([]byte, length)
2,236✔
1219
        _, err = io.ReadFull(r, response)
2,236✔
1220

2,236✔
1221
        return
2,236✔
1222
}
1223

1224
func (conn *Connection) nextRequestId(context bool) (requestId uint32) {
1,880✔
1225
        if context {
1,887✔
1226
                return atomic.AddUint32(&conn.contextRequestId, 2)
7✔
1227
        } else {
1,880✔
1228
                return atomic.AddUint32(&conn.requestId, 2)
1,873✔
1229
        }
1,873✔
1230
}
1231

1232
// Do performs a request asynchronously on the connection.
1233
//
1234
// An error is returned if the request was formed incorrectly, or failed to
1235
// create the future.
1236
func (conn *Connection) Do(req Request) *Future {
1,833✔
1237
        if connectedReq, ok := req.(ConnectedRequest); ok {
1,839✔
1238
                if connectedReq.Conn() != conn {
7✔
1239
                        fut := NewFuture(req)
1✔
1240
                        fut.SetError(errUnknownRequest)
1✔
1241
                        return fut
1✔
1242
                }
1✔
1243
        }
1244
        return conn.send(req, ignoreStreamId)
1,832✔
1245
}
1246

1247
// ConfiguredTimeout returns a timeout from connection config.
1248
func (conn *Connection) ConfiguredTimeout() time.Duration {
×
1249
        return conn.opts.Timeout
×
1250
}
×
1251

1252
// SetSchema sets Schema for the connection.
1253
func (conn *Connection) SetSchema(s Schema) {
257✔
1254
        sCopy := s.copy()
257✔
1255
        spaceAndIndexNamesSupported :=
257✔
1256
                isFeatureInSlice(iproto.IPROTO_FEATURE_SPACE_AND_INDEX_NAMES,
257✔
1257
                        conn.serverProtocolInfo.Features)
257✔
1258

257✔
1259
        conn.mutex.Lock()
257✔
1260
        defer conn.mutex.Unlock()
257✔
1261
        conn.lockShards()
257✔
1262
        defer conn.unlockShards()
257✔
1263

257✔
1264
        conn.schemaResolver = &loadedSchemaResolver{
257✔
1265
                Schema:                      sCopy,
257✔
1266
                SpaceAndIndexNamesSupported: spaceAndIndexNamesSupported,
257✔
1267
        }
257✔
1268
}
257✔
1269

1270
// NewPrepared passes a sql statement to Tarantool for preparation synchronously.
1271
func (conn *Connection) NewPrepared(expr string) (*Prepared, error) {
1✔
1272
        req := NewPrepareRequest(expr)
1✔
1273
        resp, err := conn.Do(req).GetResponse()
1✔
1274
        if err != nil {
1✔
1275
                return nil, err
×
1276
        }
×
1277
        return NewPreparedFromResponse(conn, resp)
1✔
1278
}
1279

1280
// NewStream creates new Stream object for connection.
1281
//
1282
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
1283
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
1284
// Since 1.7.0
1285
func (conn *Connection) NewStream() (*Stream, error) {
9✔
1286
        next := atomic.AddUint64(&conn.lastStreamId, 1)
9✔
1287
        return &Stream{
9✔
1288
                Id:   next,
9✔
1289
                Conn: conn,
9✔
1290
        }, nil
9✔
1291
}
9✔
1292

1293
// watchState is the current state of the watcher. See the idea at p. 70, 105:
1294
// https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view
1295
type watchState struct {
1296
        // value is a current value.
1297
        value interface{}
1298
        // version is a current version of the value.
1299
        version uint
1300
        // ack true if the acknowledge is already sent.
1301
        ack bool
1302
        // cnt is a count of active watchers for the key.
1303
        cnt int
1304
        // changed is a channel for broadcast the value changes.
1305
        changed chan struct{}
1306
        // unready channel exists if a state is not ready to work (subscription
1307
        // or unsubscription in progress).
1308
        unready chan struct{}
1309
}
1310

1311
// initWatchEventVersion is an initial version until no events from Tarantool.
1312
const initWatchEventVersion uint = 0
1313

1314
// connWatcher is an internal implementation of the Watcher interface.
1315
type connWatcher struct {
1316
        unregister sync.Once
1317
        // done is closed when the watcher is unregistered, but the watcher
1318
        // goroutine is not yet finished.
1319
        done chan struct{}
1320
        // finished is closed when the watcher is unregistered and the watcher
1321
        // goroutine is finished.
1322
        finished chan struct{}
1323
}
1324

1325
// Unregister unregisters the connection watcher.
1326
func (w *connWatcher) Unregister() {
2,379✔
1327
        w.unregister.Do(func() {
3,759✔
1328
                close(w.done)
1,380✔
1329
        })
1,380✔
1330
        <-w.finished
2,379✔
1331
}
1332

1333
// subscribeWatchChannel returns an existing one or a new watch state channel
1334
// for the key. It also increases a counter of active watchers for the channel.
1335
func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error) {
1,384✔
1336
        var st chan watchState
1,384✔
1337

1,384✔
1338
        for st == nil {
2,770✔
1339
                if val, ok := conn.watchMap.Load(key); !ok {
1,714✔
1340
                        st = make(chan watchState, 1)
328✔
1341
                        state := watchState{
328✔
1342
                                value:   nil,
328✔
1343
                                version: initWatchEventVersion,
328✔
1344
                                ack:     false,
328✔
1345
                                cnt:     0,
328✔
1346
                                changed: nil,
328✔
1347
                                unready: make(chan struct{}),
328✔
1348
                        }
328✔
1349
                        st <- state
328✔
1350

328✔
1351
                        if val, loaded := conn.watchMap.LoadOrStore(key, st); !loaded {
656✔
1352
                                if _, err := conn.Do(newWatchRequest(key)).Get(); err != nil {
330✔
1353
                                        conn.watchMap.Delete(key)
2✔
1354
                                        close(state.unready)
2✔
1355
                                        return nil, err
2✔
1356
                                }
2✔
1357
                                // It is a successful subsctiption to a watch events by itself.
1358
                                state = <-st
326✔
1359
                                state.cnt = 1
326✔
1360
                                close(state.unready)
326✔
1361
                                state.unready = nil
326✔
1362
                                st <- state
326✔
1363
                                continue
326✔
1364
                        } else {
×
1365
                                close(state.unready)
×
1366
                                close(st)
×
1367
                                st = val.(chan watchState)
×
1368
                        }
×
1369
                } else {
1,058✔
1370
                        st = val.(chan watchState)
1,058✔
1371
                }
1,058✔
1372

1373
                // It is an existing channel created outside. It may be in the
1374
                // unready state.
1375
                state := <-st
1,058✔
1376
                if state.unready == nil {
2,114✔
1377
                        state.cnt += 1
1,056✔
1378
                }
1,056✔
1379
                st <- state
1,058✔
1380

1,058✔
1381
                if state.unready != nil {
1,060✔
1382
                        // Wait for an update and retry.
2✔
1383
                        <-state.unready
2✔
1384
                        st = nil
2✔
1385
                }
2✔
1386
        }
1387

1388
        return st, nil
1,382✔
1389
}
1390

1391
func isFeatureInSlice(expected iproto.Feature, actualSlice []iproto.Feature) bool {
1,969✔
1392
        for _, actual := range actualSlice {
10,957✔
1393
                if expected == actual {
10,947✔
1394
                        return true
1,959✔
1395
                }
1,959✔
1396
        }
1397
        return false
10✔
1398
}
1399

1400
// NewWatcher creates a new Watcher object for the connection.
1401
//
1402
// Server must support IPROTO_FEATURE_WATCHERS to use watchers.
1403
//
1404
// After watcher creation, the watcher callback is invoked for the first time.
1405
// In this case, the callback is triggered whether or not the key has already
1406
// been broadcast. All subsequent invocations are triggered with
1407
// box.broadcast() called on the remote host. If a watcher is subscribed for a
1408
// key that has not been broadcast yet, the callback is triggered only once,
1409
// after the registration of the watcher.
1410
//
1411
// The watcher callbacks are always invoked in a separate goroutine. A watcher
1412
// callback is never executed in parallel with itself, but they can be executed
1413
// in parallel to other watchers.
1414
//
1415
// If the key is updated while the watcher callback is running, the callback
1416
// will be invoked again with the latest value as soon as it returns.
1417
//
1418
// Watchers survive reconnection. All registered watchers are automatically
1419
// resubscribed when the connection is reestablished.
1420
//
1421
// Keep in mind that garbage collection of a watcher handle doesn’t lead to the
1422
// watcher’s destruction. In this case, the watcher remains registered. You
1423
// need to call Unregister() directly.
1424
//
1425
// Unregister() guarantees that there will be no the watcher's callback calls
1426
// after it, but Unregister() call from the callback leads to a deadlock.
1427
//
1428
// See:
1429
// https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_events/#box-watchers
1430
//
1431
// Since 1.10.0
1432
func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, error) {
1,066✔
1433
        // We need to check the feature because the IPROTO_WATCH request is
1,066✔
1434
        // asynchronous. We do not expect any response from a Tarantool instance
1,066✔
1435
        // That's why we can't just check the Tarantool response for an unsupported
1,066✔
1436
        // request error.
1,066✔
1437
        if !isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
1,066✔
1438
                conn.serverProtocolInfo.Features) {
1,066✔
1439
                err := fmt.Errorf("the feature %s must be supported by connection "+
×
1440
                        "to create a watcher", iproto.IPROTO_FEATURE_WATCHERS)
×
1441
                return nil, err
×
1442
        }
×
1443

1444
        return conn.newWatcherImpl(key, callback)
1,066✔
1445
}
1446

1447
func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watcher, error) {
1,384✔
1448
        st, err := subscribeWatchChannel(conn, key)
1,384✔
1449
        if err != nil {
1,386✔
1450
                return nil, err
2✔
1451
        }
2✔
1452

1453
        // Start the watcher goroutine.
1454
        done := make(chan struct{})
1,382✔
1455
        finished := make(chan struct{})
1,382✔
1456

1,382✔
1457
        go func() {
2,764✔
1458
                version := initWatchEventVersion
1,382✔
1459
                for {
3,241✔
1460
                        state := <-st
1,859✔
1461
                        if state.changed == nil {
2,579✔
1462
                                state.changed = make(chan struct{})
720✔
1463
                        }
720✔
1464
                        st <- state
1,859✔
1465

1,859✔
1466
                        if state.version != version {
3,364✔
1467
                                callback(WatchEvent{
1,505✔
1468
                                        Conn:  conn,
1,505✔
1469
                                        Key:   key,
1,505✔
1470
                                        Value: state.value,
1,505✔
1471
                                })
1,505✔
1472
                                version = state.version
1,505✔
1473

1,505✔
1474
                                // Do we need to acknowledge the notification?
1,505✔
1475
                                state = <-st
1,505✔
1476
                                sendAck := !state.ack && version == state.version
1,505✔
1477
                                if sendAck {
1,899✔
1478
                                        state.ack = true
394✔
1479
                                }
394✔
1480
                                st <- state
1,505✔
1481

1,505✔
1482
                                if sendAck {
1,899✔
1483
                                        // We expect a reconnect and re-subscribe if it fails to
394✔
1484
                                        // send the watch request. So it looks ok do not check a
394✔
1485
                                        // result. But we need to make sure that the re-watch
394✔
1486
                                        // request will not be finished by a small per-request
394✔
1487
                                        // timeout.
394✔
1488
                                        req := newWatchRequest(key).Context(context.Background())
394✔
1489
                                        conn.Do(req).Get()
394✔
1490
                                }
394✔
1491
                        }
1492

1493
                        select {
1,859✔
1494
                        case <-done:
1,380✔
1495
                                state := <-st
1,380✔
1496
                                state.cnt -= 1
1,380✔
1497
                                if state.cnt == 0 {
1,704✔
1498
                                        state.unready = make(chan struct{})
324✔
1499
                                }
324✔
1500
                                st <- state
1,380✔
1501

1,380✔
1502
                                if state.cnt == 0 {
1,704✔
1503
                                        // The last one sends IPROTO_UNWATCH.
324✔
1504
                                        if !conn.ClosedNow() {
332✔
1505
                                                // conn.ClosedNow() check is a workaround for calling
8✔
1506
                                                // Unregister from connectionClose().
8✔
1507
                                                //
8✔
1508
                                                // We need to make sure that the unwatch request will
8✔
1509
                                                // not be finished by a small per-request timeout to
8✔
1510
                                                // avoid lost of the request.
8✔
1511
                                                req := newUnwatchRequest(key).Context(context.Background())
8✔
1512
                                                conn.Do(req).Get()
8✔
1513
                                        }
8✔
1514
                                        conn.watchMap.Delete(key)
324✔
1515
                                        close(state.unready)
324✔
1516
                                }
1517

1518
                                close(finished)
1,380✔
1519
                                return
1,380✔
1520
                        case <-state.changed:
477✔
1521
                        }
1522
                }
1523
        }()
1524

1525
        return &connWatcher{
1,382✔
1526
                done:     done,
1,382✔
1527
                finished: finished,
1,382✔
1528
        }, nil
1,382✔
1529
}
1530

1531
// ProtocolInfo returns protocol version and protocol features
1532
// supported by connected Tarantool server. Beware that values might be
1533
// outdated if connection is in a disconnected state.
1534
// Since 2.0.0
1535
func (conn *Connection) ProtocolInfo() ProtocolInfo {
5✔
1536
        return conn.serverProtocolInfo.Clone()
5✔
1537
}
5✔
1538

1539
func shutdownEventCallback(event WatchEvent) {
375✔
1540
        // Receives "true" on server shutdown.
375✔
1541
        // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
375✔
1542
        // step 2.
375✔
1543
        val, ok := event.Value.(bool)
375✔
1544
        if ok && val {
429✔
1545
                go event.Conn.shutdown(false)
54✔
1546
        }
54✔
1547
}
1548

1549
func (conn *Connection) shutdown(forever bool) error {
56✔
1550
        // Forbid state changes.
56✔
1551
        conn.mutex.Lock()
56✔
1552
        defer conn.mutex.Unlock()
56✔
1553

56✔
1554
        if !atomic.CompareAndSwapUint32(&conn.state, connConnected, connShutdown) {
56✔
1555
                if forever {
×
1556
                        err := ClientError{ErrConnectionClosed, "connection closed by client"}
×
1557
                        return conn.closeConnection(err, true)
×
1558
                }
×
1559
                return nil
×
1560
        }
1561

1562
        if forever {
58✔
1563
                // We don't want to reconnect any more.
2✔
1564
                conn.opts.Reconnect = 0
2✔
1565
                conn.opts.MaxReconnects = 0
2✔
1566
        }
2✔
1567

1568
        conn.cond.Broadcast()
56✔
1569
        conn.notify(Shutdown)
56✔
1570

56✔
1571
        c := conn.c
56✔
1572
        for {
168✔
1573
                if (atomic.LoadUint32(&conn.state) != connShutdown) || (c != conn.c) {
115✔
1574
                        return nil
3✔
1575
                }
3✔
1576
                if atomic.LoadInt64(&conn.requestCnt) == 0 {
162✔
1577
                        break
53✔
1578
                }
1579
                // Use cond var on conn.mutex since request execution may
1580
                // call reconnect(). It is ok if state changes as part of
1581
                // reconnect since Tarantool server won't allow to reconnect
1582
                // in the middle of shutting down.
1583
                conn.cond.Wait()
56✔
1584
        }
1585

1586
        if forever {
54✔
1587
                err := ClientError{ErrConnectionClosed, "connection closed by client"}
1✔
1588
                return conn.closeConnection(err, true)
1✔
1589
        } else {
53✔
1590
                // Start to reconnect based on common rules, same as in net.box.
52✔
1591
                // Reconnect also closes the connection: server waits until all
52✔
1592
                // subscribed connections are terminated.
52✔
1593
                // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
52✔
1594
                // step 3.
52✔
1595
                conn.reconnectImpl(ClientError{
52✔
1596
                        ErrConnectionClosed,
52✔
1597
                        "connection closed after server shutdown",
52✔
1598
                }, conn.c)
52✔
1599
                return nil
52✔
1600
        }
52✔
1601
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc