• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / go-tarantool / 5112083395

29 May 2023 12:53PM UTC coverage: 79.052% (-0.07%) from 79.125%
5112083395

push

github

oleg-jukovec
api: ConnectionPool.Add()/ConnectionPool.Remove()

* ConnectionPool.Add() allows to add a new endpoint into a pool.
* ConnectionPool.Remove() allows to remove an endpoint from a pool.

Closes #290

167 of 167 new or added lines in 1 file covered. (100.0%)

5472 of 6922 relevant lines covered (79.05%)

7727.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.47
/connection_pool/connection_pool.go
1
// Package with methods to work with a Tarantool cluster
2
// considering master discovery.
3
//
4
// Main features:
5
//
6
// - Return available connection from pool according to round-robin strategy.
7
//
8
// - Automatic master discovery by mode parameter.
9
//
10
// Since: 1.6.0
11
package connection_pool
12

13
import (
14
        "errors"
15
        "fmt"
16
        "log"
17
        "sync"
18
        "time"
19

20
        "github.com/tarantool/go-tarantool"
21
)
22

23
var (
24
        ErrEmptyAddrs        = errors.New("addrs (first argument) should not be empty")
25
        ErrWrongCheckTimeout = errors.New("wrong check timeout, must be greater than 0")
26
        ErrNoConnection      = errors.New("no active connections")
27
        ErrTooManyArgs       = errors.New("too many arguments")
28
        ErrIncorrectResponse = errors.New("incorrect response format")
29
        ErrIncorrectStatus   = errors.New("incorrect instance status: status should be `running`")
30
        ErrNoRwInstance      = errors.New("can't find rw instance in pool")
31
        ErrNoRoInstance      = errors.New("can't find ro instance in pool")
32
        ErrNoHealthyInstance = errors.New("can't find healthy instance in pool")
33
        ErrClosed            = errors.New("pool is closed")
34
)
35

36
// ConnectionHandler provides callbacks for components interested in handling
37
// changes of connections in a ConnectionPool.
38
type ConnectionHandler interface {
39
        // Discovered is called when a connection with a role has been detected
40
        // (for the first time or when a role of a connection has been changed),
41
        // but is not yet available to send requests. It allows for a client to
42
        // initialize the connection before using it in a pool.
43
        //
44
        // The client code may cancel adding a connection to the pool. The client
45
        // need to return an error from the Discovered call for that. In this case
46
        // the pool will close connection and will try to reopen it later.
47
        Discovered(conn *tarantool.Connection, role Role) error
48
        // Deactivated is called when a connection with a role has become
49
        // unavaileble to send requests. It happens if the connection is closed or
50
        // the connection role is switched.
51
        //
52
        // So if a connection switches a role, a pool calls:
53
        // Deactivated() + Discovered().
54
        //
55
        // Deactivated will not be called if a previous Discovered() call returns
56
        // an error. Because in this case, the connection does not become available
57
        // for sending requests.
58
        Deactivated(conn *tarantool.Connection, role Role) error
59
}
60

61
// OptsPool provides additional options (configurable via ConnectWithOpts).
62
type OptsPool struct {
63
        // Timeout for timer to reopen connections that have been closed by some
64
        // events and to relocate connection between subpools if ro/rw role has
65
        // been updated.
66
        CheckTimeout time.Duration
67
        // ConnectionHandler provides an ability to handle connection updates.
68
        ConnectionHandler ConnectionHandler
69
}
70

71
/*
72
ConnectionInfo structure for information about connection statuses:
73

74
- ConnectedNow reports if connection is established at the moment.
75

76
- ConnRole reports master/replica role of instance.
77
*/
78
type ConnectionInfo struct {
79
        ConnectedNow bool
80
        ConnRole     Role
81
}
82

83
/*
84
Main features:
85

86
- Return available connection from pool according to round-robin strategy.
87

88
- Automatic master discovery by mode parameter.
89
*/
90
type ConnectionPool struct {
91
        addrs      map[string]*endpoint
92
        addrsMutex sync.RWMutex
93

94
        connOpts tarantool.Opts
95
        opts     OptsPool
96

97
        state            state
98
        done             chan struct{}
99
        roPool           *RoundRobinStrategy
100
        rwPool           *RoundRobinStrategy
101
        anyPool          *RoundRobinStrategy
102
        poolsMutex       sync.RWMutex
103
        watcherContainer watcherContainer
104
}
105

106
var _ Pooler = (*ConnectionPool)(nil)
107

108
type endpoint struct {
109
        addr   string
110
        notify chan tarantool.ConnEvent
111
        conn   *tarantool.Connection
112
        role   Role
113
        // This is used to switch a connection states.
114
        shutdown chan struct{}
115
        close    chan struct{}
116
        closed   chan struct{}
117
        closeErr error
118
}
119

120
func newEndpoint(addr string) *endpoint {
534✔
121
        return &endpoint{
534✔
122
                addr:     addr,
534✔
123
                notify:   make(chan tarantool.ConnEvent, 100),
534✔
124
                conn:     nil,
534✔
125
                role:     UnknownRole,
534✔
126
                shutdown: make(chan struct{}),
534✔
127
                close:    make(chan struct{}),
534✔
128
                closed:   make(chan struct{}),
534✔
129
        }
534✔
130
}
534✔
131

132
// ConnectWithOpts creates pool for instances with addresses addrs
133
// with options opts.
134
func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (connPool *ConnectionPool, err error) {
138✔
135
        if len(addrs) == 0 {
140✔
136
                return nil, ErrEmptyAddrs
2✔
137
        }
2✔
138
        if opts.CheckTimeout <= 0 {
138✔
139
                return nil, ErrWrongCheckTimeout
2✔
140
        }
2✔
141

142
        size := len(addrs)
134✔
143
        rwPool := NewEmptyRoundRobin(size)
134✔
144
        roPool := NewEmptyRoundRobin(size)
134✔
145
        anyPool := NewEmptyRoundRobin(size)
134✔
146

134✔
147
        connPool = &ConnectionPool{
134✔
148
                addrs:    make(map[string]*endpoint),
134✔
149
                connOpts: connOpts.Clone(),
134✔
150
                opts:     opts,
134✔
151
                state:    unknownState,
134✔
152
                done:     make(chan struct{}),
134✔
153
                rwPool:   rwPool,
134✔
154
                roPool:   roPool,
134✔
155
                anyPool:  anyPool,
134✔
156
        }
134✔
157

134✔
158
        for _, addr := range addrs {
656✔
159
                connPool.addrs[addr] = nil
522✔
160
        }
522✔
161

162
        somebodyAlive := connPool.fillPools()
134✔
163
        if !somebodyAlive {
138✔
164
                connPool.state.set(closedState)
4✔
165
                return nil, ErrNoConnection
4✔
166
        }
4✔
167

168
        connPool.state.set(connectedState)
130✔
169

130✔
170
        for _, s := range connPool.addrs {
638✔
171
                go connPool.controller(s)
508✔
172
        }
508✔
173

174
        return connPool, nil
130✔
175
}
176

177
// ConnectWithOpts creates pool for instances with addresses addrs.
178
//
179
// It is useless to set up tarantool.Opts.Reconnect value for a connection.
180
// The connection pool has its own reconnection logic. See
181
// OptsPool.CheckTimeout description.
182
func Connect(addrs []string, connOpts tarantool.Opts) (connPool *ConnectionPool, err error) {
128✔
183
        opts := OptsPool{
128✔
184
                CheckTimeout: 1 * time.Second,
128✔
185
        }
128✔
186
        return ConnectWithOpts(addrs, connOpts, opts)
128✔
187
}
128✔
188

189
// ConnectedNow gets connected status of pool.
190
func (connPool *ConnectionPool) ConnectedNow(mode Mode) (bool, error) {
66✔
191
        connPool.poolsMutex.RLock()
66✔
192
        defer connPool.poolsMutex.RUnlock()
66✔
193

66✔
194
        if connPool.state.get() != connectedState {
74✔
195
                return false, nil
8✔
196
        }
8✔
197
        switch mode {
58✔
198
        case ANY:
58✔
199
                return !connPool.anyPool.IsEmpty(), nil
58✔
200
        case RW:
×
201
                return !connPool.rwPool.IsEmpty(), nil
×
202
        case RO:
×
203
                return !connPool.roPool.IsEmpty(), nil
×
204
        case PreferRW:
×
205
                fallthrough
×
206
        case PreferRO:
×
207
                return !connPool.rwPool.IsEmpty() || !connPool.roPool.IsEmpty(), nil
×
208
        default:
×
209
                return false, ErrNoHealthyInstance
×
210
        }
211
}
212

213
// ConfiguredTimeout gets timeout of current connection.
214
func (connPool *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, error) {
×
215
        conn, err := connPool.getNextConnection(mode)
×
216
        if err != nil {
×
217
                return 0, err
×
218
        }
×
219

220
        return conn.ConfiguredTimeout(), nil
×
221
}
222

223
// Add adds a new endpoint with the address into the pool. This function
224
// the endpoint only after successful connection.
225
func (pool *ConnectionPool) Add(addr string) error {
18✔
226
        e := newEndpoint(addr)
18✔
227

18✔
228
        pool.addrsMutex.Lock()
18✔
229
        // Ensure that Close()/CloseGraceful() not in progress/done.
18✔
230
        if pool.state.get() != connectedState {
24✔
231
                pool.addrsMutex.Unlock()
6✔
232
                return ErrClosed
6✔
233
        }
6✔
234
        if _, ok := pool.addrs[addr]; ok {
14✔
235
                pool.addrsMutex.Unlock()
2✔
236
                return errors.New("endpoint exist")
2✔
237
        }
2✔
238
        pool.addrs[addr] = e
10✔
239
        pool.addrsMutex.Unlock()
10✔
240

10✔
241
        if err := pool.tryConnect(e); err != nil {
12✔
242
                pool.addrsMutex.Lock()
2✔
243
                delete(pool.addrs, addr)
2✔
244
                pool.addrsMutex.Unlock()
2✔
245
                close(e.closed)
2✔
246
                return err
2✔
247
        }
2✔
248

249
        go pool.controller(e)
8✔
250
        return nil
8✔
251
}
252

253
// Remove removes an endpoint with the address from the pool. The call
254
// closes an active connection gracefully.
255
func (pool *ConnectionPool) Remove(addr string) error {
38✔
256
        pool.addrsMutex.Lock()
38✔
257
        endpoint, ok := pool.addrs[addr]
38✔
258
        if !ok {
60✔
259
                pool.addrsMutex.Unlock()
22✔
260
                return errors.New("endpoint not exist")
22✔
261
        }
22✔
262

263
        select {
16✔
264
        case <-endpoint.close:
2✔
265
                // Close() in progress/done.
266
        case <-endpoint.shutdown:
2✔
267
                // CloseGraceful()/Remove() in progress/done.
268
        default:
12✔
269
                close(endpoint.shutdown)
12✔
270
        }
271

272
        delete(pool.addrs, addr)
16✔
273
        pool.addrsMutex.Unlock()
16✔
274

16✔
275
        <-endpoint.closed
16✔
276
        return nil
16✔
277
}
278

279
func (pool *ConnectionPool) waitClose() []error {
134✔
280
        pool.addrsMutex.RLock()
134✔
281
        endpoints := make([]*endpoint, 0, len(pool.addrs))
134✔
282
        for _, e := range pool.addrs {
652✔
283
                endpoints = append(endpoints, e)
518✔
284
        }
518✔
285
        pool.addrsMutex.RUnlock()
134✔
286

134✔
287
        errs := make([]error, 0, len(endpoints))
134✔
288
        for _, e := range endpoints {
652✔
289
                <-e.closed
518✔
290
                if e.closeErr != nil {
518✔
291
                        errs = append(errs, e.closeErr)
×
292
                }
×
293
        }
294
        return errs
134✔
295
}
296

297
// Close closes connections in the ConnectionPool.
298
func (pool *ConnectionPool) Close() []error {
126✔
299
        if pool.state.cas(connectedState, closedState) ||
126✔
300
                pool.state.cas(shutdownState, closedState) {
250✔
301
                pool.addrsMutex.RLock()
124✔
302
                for _, s := range pool.addrs {
618✔
303
                        close(s.close)
494✔
304
                }
494✔
305
                pool.addrsMutex.RUnlock()
124✔
306
        }
307

308
        return pool.waitClose()
126✔
309
}
310

311
// CloseGraceful closes connections in the ConnectionPool gracefully. It waits
312
// for all requests to complete.
313
func (pool *ConnectionPool) CloseGraceful() []error {
8✔
314
        if pool.state.cas(connectedState, shutdownState) {
16✔
315
                pool.addrsMutex.RLock()
8✔
316
                for _, s := range pool.addrs {
28✔
317
                        close(s.shutdown)
20✔
318
                }
20✔
319
                pool.addrsMutex.RUnlock()
8✔
320
        }
321

322
        return pool.waitClose()
8✔
323
}
324

325
// GetAddrs gets addresses of connections in pool.
326
func (pool *ConnectionPool) GetAddrs() []string {
6✔
327
        pool.addrsMutex.RLock()
6✔
328
        defer pool.addrsMutex.RUnlock()
6✔
329

6✔
330
        cpy := make([]string, len(pool.addrs))
6✔
331

6✔
332
        i := 0
6✔
333
        for addr := range pool.addrs {
16✔
334
                cpy[i] = addr
10✔
335
                i++
10✔
336
        }
10✔
337

338
        return cpy
6✔
339
}
340

341
// GetPoolInfo gets information of connections (connected status, ro/rw role).
342
func (pool *ConnectionPool) GetPoolInfo() map[string]*ConnectionInfo {
45✔
343
        info := make(map[string]*ConnectionInfo)
45✔
344

45✔
345
        pool.addrsMutex.RLock()
45✔
346
        defer pool.addrsMutex.RUnlock()
45✔
347
        pool.poolsMutex.RLock()
45✔
348
        defer pool.poolsMutex.RUnlock()
45✔
349

45✔
350
        if pool.state.get() != connectedState {
49✔
351
                return info
4✔
352
        }
4✔
353

354
        for addr := range pool.addrs {
140✔
355
                conn, role := pool.getConnectionFromPool(addr)
99✔
356
                if conn != nil {
182✔
357
                        info[addr] = &ConnectionInfo{ConnectedNow: conn.ConnectedNow(), ConnRole: role}
83✔
358
                }
83✔
359
        }
360

361
        return info
41✔
362
}
363

364
// Ping sends empty request to Tarantool to check connection.
365
func (connPool *ConnectionPool) Ping(userMode Mode) (*tarantool.Response, error) {
16✔
366
        conn, err := connPool.getNextConnection(userMode)
16✔
367
        if err != nil {
18✔
368
                return nil, err
2✔
369
        }
2✔
370

371
        return conn.Ping()
14✔
372
}
373

374
// Select performs select to box space.
375
func (connPool *ConnectionPool) Select(space, index interface{}, offset, limit, iterator uint32, key interface{}, userMode ...Mode) (resp *tarantool.Response, err error) {
14✔
376
        conn, err := connPool.getConnByMode(ANY, userMode)
14✔
377
        if err != nil {
14✔
378
                return nil, err
×
379
        }
×
380

381
        return conn.Select(space, index, offset, limit, iterator, key)
14✔
382
}
383

384
// Insert performs insertion to box space.
385
// Tarantool will reject Insert when tuple with same primary key exists.
386
func (connPool *ConnectionPool) Insert(space interface{}, tuple interface{}, userMode ...Mode) (resp *tarantool.Response, err error) {
8✔
387
        conn, err := connPool.getConnByMode(RW, userMode)
8✔
388
        if err != nil {
8✔
389
                return nil, err
×
390
        }
×
391

392
        return conn.Insert(space, tuple)
8✔
393
}
394

395
// Replace performs "insert or replace" action to box space.
396
// If tuple with same primary key exists, it will be replaced.
397
func (connPool *ConnectionPool) Replace(space interface{}, tuple interface{}, userMode ...Mode) (resp *tarantool.Response, err error) {
12✔
398
        conn, err := connPool.getConnByMode(RW, userMode)
12✔
399
        if err != nil {
12✔
400
                return nil, err
×
401
        }
×
402

403
        return conn.Replace(space, tuple)
12✔
404
}
405

406
// Delete performs deletion of a tuple by key.
407
// Result will contain array with deleted tuple.
408
func (connPool *ConnectionPool) Delete(space, index interface{}, key interface{}, userMode ...Mode) (resp *tarantool.Response, err error) {
6✔
409
        conn, err := connPool.getConnByMode(RW, userMode)
6✔
410
        if err != nil {
6✔
411
                return nil, err
×
412
        }
×
413

414
        return conn.Delete(space, index, key)
6✔
415
}
416

417
// Update performs update of a tuple by key.
418
// Result will contain array with updated tuple.
419
func (connPool *ConnectionPool) Update(space, index interface{}, key, ops interface{}, userMode ...Mode) (resp *tarantool.Response, err error) {
6✔
420
        conn, err := connPool.getConnByMode(RW, userMode)
6✔
421
        if err != nil {
6✔
422
                return nil, err
×
423
        }
×
424

425
        return conn.Update(space, index, key, ops)
6✔
426
}
427

428
// Upsert performs "update or insert" action of a tuple by key.
429
// Result will not contain any tuple.
430
func (connPool *ConnectionPool) Upsert(space interface{}, tuple, ops interface{}, userMode ...Mode) (resp *tarantool.Response, err error) {
4✔
431
        conn, err := connPool.getConnByMode(RW, userMode)
4✔
432
        if err != nil {
4✔
433
                return nil, err
×
434
        }
×
435

436
        return conn.Upsert(space, tuple, ops)
4✔
437
}
438

439
// Call16 calls registered Tarantool function.
440
// It uses request code for Tarantool >= 1.7 if go-tarantool
441
// was build with go_tarantool_call_17 tag.
442
// Otherwise, uses request code for Tarantool 1.6.
443
func (connPool *ConnectionPool) Call(functionName string, args interface{}, userMode Mode) (resp *tarantool.Response, err error) {
8✔
444
        conn, err := connPool.getNextConnection(userMode)
8✔
445
        if err != nil {
8✔
446
                return nil, err
×
447
        }
×
448

449
        return conn.Call(functionName, args)
8✔
450
}
451

452
// Call16 calls registered Tarantool function.
453
// It uses request code for Tarantool 1.6, so result is converted to array of arrays.
454
// Deprecated since Tarantool 1.7.2.
455
func (connPool *ConnectionPool) Call16(functionName string, args interface{}, userMode Mode) (resp *tarantool.Response, err error) {
×
456
        conn, err := connPool.getNextConnection(userMode)
×
457
        if err != nil {
×
458
                return nil, err
×
459
        }
×
460

461
        return conn.Call16(functionName, args)
×
462
}
463

464
// Call17 calls registered Tarantool function.
465
// It uses request code for Tarantool >= 1.7, so result is not converted
466
// (though, keep in mind, result is always array).
467
func (connPool *ConnectionPool) Call17(functionName string, args interface{}, userMode Mode) (resp *tarantool.Response, err error) {
16✔
468
        conn, err := connPool.getNextConnection(userMode)
16✔
469
        if err != nil {
16✔
470
                return nil, err
×
471
        }
×
472

473
        return conn.Call17(functionName, args)
16✔
474
}
475

476
// Eval passes lua expression for evaluation.
477
func (connPool *ConnectionPool) Eval(expr string, args interface{}, userMode Mode) (resp *tarantool.Response, err error) {
264✔
478
        conn, err := connPool.getNextConnection(userMode)
264✔
479
        if err != nil {
268✔
480
                return nil, err
4✔
481
        }
4✔
482

483
        return conn.Eval(expr, args)
260✔
484
}
485

486
// Execute passes sql expression to Tarantool for execution.
487
func (connPool *ConnectionPool) Execute(expr string, args interface{}, userMode Mode) (resp *tarantool.Response, err error) {
2✔
488
        conn, err := connPool.getNextConnection(userMode)
2✔
489
        if err != nil {
2✔
490
                return nil, err
×
491
        }
×
492

493
        return conn.Execute(expr, args)
2✔
494
}
495

496
// GetTyped performs select (with limit = 1 and offset = 0)
497
// to box space and fills typed result.
498
func (connPool *ConnectionPool) GetTyped(space, index interface{}, key interface{}, result interface{}, userMode ...Mode) (err error) {
×
499
        conn, err := connPool.getConnByMode(ANY, userMode)
×
500
        if err != nil {
×
501
                return err
×
502
        }
×
503

504
        return conn.GetTyped(space, index, key, result)
×
505
}
506

507
// SelectTyped performs select to box space and fills typed result.
508
func (connPool *ConnectionPool) SelectTyped(space, index interface{}, offset, limit, iterator uint32, key interface{}, result interface{}, userMode ...Mode) (err error) {
4✔
509
        conn, err := connPool.getConnByMode(ANY, userMode)
4✔
510
        if err != nil {
4✔
511
                return err
×
512
        }
×
513

514
        return conn.SelectTyped(space, index, offset, limit, iterator, key, result)
4✔
515
}
516

517
// InsertTyped performs insertion to box space.
518
// Tarantool will reject Insert when tuple with same primary key exists.
519
func (connPool *ConnectionPool) InsertTyped(space interface{}, tuple interface{}, result interface{}, userMode ...Mode) (err error) {
×
520
        conn, err := connPool.getConnByMode(RW, userMode)
×
521
        if err != nil {
×
522
                return err
×
523
        }
×
524

525
        return conn.InsertTyped(space, tuple, result)
×
526
}
527

528
// ReplaceTyped performs "insert or replace" action to box space.
529
// If tuple with same primary key exists, it will be replaced.
530
func (connPool *ConnectionPool) ReplaceTyped(space interface{}, tuple interface{}, result interface{}, userMode ...Mode) (err error) {
×
531
        conn, err := connPool.getConnByMode(RW, userMode)
×
532
        if err != nil {
×
533
                return err
×
534
        }
×
535

536
        return conn.ReplaceTyped(space, tuple, result)
×
537
}
538

539
// DeleteTyped performs deletion of a tuple by key and fills result with deleted tuple.
540
func (connPool *ConnectionPool) DeleteTyped(space, index interface{}, key interface{}, result interface{}, userMode ...Mode) (err error) {
×
541
        conn, err := connPool.getConnByMode(RW, userMode)
×
542
        if err != nil {
×
543
                return err
×
544
        }
×
545

546
        return conn.DeleteTyped(space, index, key, result)
×
547
}
548

549
// UpdateTyped performs update of a tuple by key and fills result with updated tuple.
550
func (connPool *ConnectionPool) UpdateTyped(space, index interface{}, key, ops interface{}, result interface{}, userMode ...Mode) (err error) {
×
551
        conn, err := connPool.getConnByMode(RW, userMode)
×
552
        if err != nil {
×
553
                return err
×
554
        }
×
555

556
        return conn.UpdateTyped(space, index, key, ops, result)
×
557
}
558

559
// CallTyped calls registered function.
560
// It uses request code for Tarantool >= 1.7 if go-tarantool
561
// was build with go_tarantool_call_17 tag.
562
// Otherwise, uses request code for Tarantool 1.6.
563
func (connPool *ConnectionPool) CallTyped(functionName string, args interface{}, result interface{}, userMode Mode) (err error) {
×
564
        conn, err := connPool.getNextConnection(userMode)
×
565
        if err != nil {
×
566
                return err
×
567
        }
×
568

569
        return conn.CallTyped(functionName, args, result)
×
570
}
571

572
// Call16Typed calls registered function.
573
// It uses request code for Tarantool 1.6, so result is converted to array of arrays.
574
// Deprecated since Tarantool 1.7.2.
575
func (connPool *ConnectionPool) Call16Typed(functionName string, args interface{}, result interface{}, userMode Mode) (err error) {
×
576
        conn, err := connPool.getNextConnection(userMode)
×
577
        if err != nil {
×
578
                return err
×
579
        }
×
580

581
        return conn.Call16Typed(functionName, args, result)
×
582
}
583

584
// Call17Typed calls registered function.
585
// It uses request code for Tarantool >= 1.7, so result is not converted
586
// (though, keep in mind, result is always array).
587
func (connPool *ConnectionPool) Call17Typed(functionName string, args interface{}, result interface{}, userMode Mode) (err error) {
×
588
        conn, err := connPool.getNextConnection(userMode)
×
589
        if err != nil {
×
590
                return err
×
591
        }
×
592

593
        return conn.Call17Typed(functionName, args, result)
×
594
}
595

596
// EvalTyped passes lua expression for evaluation.
597
func (connPool *ConnectionPool) EvalTyped(expr string, args interface{}, result interface{}, userMode Mode) (err error) {
×
598
        conn, err := connPool.getNextConnection(userMode)
×
599
        if err != nil {
×
600
                return err
×
601
        }
×
602

603
        return conn.EvalTyped(expr, args, result)
×
604
}
605

606
// ExecuteTyped passes sql expression to Tarantool for execution.
607
func (connPool *ConnectionPool) ExecuteTyped(expr string, args interface{}, result interface{}, userMode Mode) (tarantool.SQLInfo, []tarantool.ColumnMetaData, error) {
2✔
608
        conn, err := connPool.getNextConnection(userMode)
2✔
609
        if err != nil {
2✔
610
                return tarantool.SQLInfo{}, nil, err
×
611
        }
×
612

613
        return conn.ExecuteTyped(expr, args, result)
2✔
614
}
615

616
// SelectAsync sends select request to Tarantool and returns Future.
617
func (connPool *ConnectionPool) SelectAsync(space, index interface{}, offset, limit, iterator uint32, key interface{}, userMode ...Mode) *tarantool.Future {
8✔
618
        conn, err := connPool.getConnByMode(ANY, userMode)
8✔
619
        if err != nil {
10✔
620
                return newErrorFuture(err)
2✔
621
        }
2✔
622

623
        return conn.SelectAsync(space, index, offset, limit, iterator, key)
6✔
624
}
625

626
// InsertAsync sends insert action to Tarantool and returns Future.
627
// Tarantool will reject Insert when tuple with same primary key exists.
628
func (connPool *ConnectionPool) InsertAsync(space interface{}, tuple interface{}, userMode ...Mode) *tarantool.Future {
×
629
        conn, err := connPool.getConnByMode(RW, userMode)
×
630
        if err != nil {
×
631
                return newErrorFuture(err)
×
632
        }
×
633

634
        return conn.InsertAsync(space, tuple)
×
635
}
636

637
// ReplaceAsync sends "insert or replace" action to Tarantool and returns Future.
638
// If tuple with same primary key exists, it will be replaced.
639
func (connPool *ConnectionPool) ReplaceAsync(space interface{}, tuple interface{}, userMode ...Mode) *tarantool.Future {
×
640
        conn, err := connPool.getConnByMode(RW, userMode)
×
641
        if err != nil {
×
642
                return newErrorFuture(err)
×
643
        }
×
644

645
        return conn.ReplaceAsync(space, tuple)
×
646
}
647

648
// DeleteAsync sends deletion action to Tarantool and returns Future.
649
// Future's result will contain array with deleted tuple.
650
func (connPool *ConnectionPool) DeleteAsync(space, index interface{}, key interface{}, userMode ...Mode) *tarantool.Future {
×
651
        conn, err := connPool.getConnByMode(RW, userMode)
×
652
        if err != nil {
×
653
                return newErrorFuture(err)
×
654
        }
×
655

656
        return conn.DeleteAsync(space, index, key)
×
657
}
658

659
// UpdateAsync sends deletion of a tuple by key and returns Future.
660
// Future's result will contain array with updated tuple.
661
func (connPool *ConnectionPool) UpdateAsync(space, index interface{}, key, ops interface{}, userMode ...Mode) *tarantool.Future {
×
662
        conn, err := connPool.getConnByMode(RW, userMode)
×
663
        if err != nil {
×
664
                return newErrorFuture(err)
×
665
        }
×
666

667
        return conn.UpdateAsync(space, index, key, ops)
×
668
}
669

670
// UpsertAsync sends "update or insert" action to Tarantool and returns Future.
671
// Future's sesult will not contain any tuple.
672
func (connPool *ConnectionPool) UpsertAsync(space interface{}, tuple interface{}, ops interface{}, userMode ...Mode) *tarantool.Future {
×
673
        conn, err := connPool.getConnByMode(RW, userMode)
×
674
        if err != nil {
×
675
                return newErrorFuture(err)
×
676
        }
×
677

678
        return conn.UpsertAsync(space, tuple, ops)
×
679
}
680

681
// CallAsync sends a call to registered Tarantool function and returns Future.
682
// It uses request code for Tarantool >= 1.7 if go-tarantool
683
// was build with go_tarantool_call_17 tag.
684
// Otherwise, uses request code for Tarantool 1.6.
685
func (connPool *ConnectionPool) CallAsync(functionName string, args interface{}, userMode Mode) *tarantool.Future {
×
686
        conn, err := connPool.getNextConnection(userMode)
×
687
        if err != nil {
×
688
                return newErrorFuture(err)
×
689
        }
×
690

691
        return conn.CallAsync(functionName, args)
×
692
}
693

694
// Call16Async sends a call to registered Tarantool function and returns Future.
695
// It uses request code for Tarantool 1.6, so future's result is always array of arrays.
696
// Deprecated since Tarantool 1.7.2.
697
func (connPool *ConnectionPool) Call16Async(functionName string, args interface{}, userMode Mode) *tarantool.Future {
×
698
        conn, err := connPool.getNextConnection(userMode)
×
699
        if err != nil {
×
700
                return newErrorFuture(err)
×
701
        }
×
702

703
        return conn.Call16Async(functionName, args)
×
704
}
705

706
// Call17Async sends a call to registered Tarantool function and returns Future.
707
// It uses request code for Tarantool >= 1.7, so future's result will not be converted
708
// (though, keep in mind, result is always array).
709
func (connPool *ConnectionPool) Call17Async(functionName string, args interface{}, userMode Mode) *tarantool.Future {
×
710
        conn, err := connPool.getNextConnection(userMode)
×
711
        if err != nil {
×
712
                return newErrorFuture(err)
×
713
        }
×
714

715
        return conn.Call17Async(functionName, args)
×
716
}
717

718
// EvalAsync sends a lua expression for evaluation and returns Future.
719
func (connPool *ConnectionPool) EvalAsync(expr string, args interface{}, userMode Mode) *tarantool.Future {
×
720
        conn, err := connPool.getNextConnection(userMode)
×
721
        if err != nil {
×
722
                return newErrorFuture(err)
×
723
        }
×
724

725
        return conn.EvalAsync(expr, args)
×
726
}
727

728
// ExecuteAsync sends sql expression to Tarantool for execution and returns
729
// Future.
730
func (connPool *ConnectionPool) ExecuteAsync(expr string, args interface{}, userMode Mode) *tarantool.Future {
2✔
731
        conn, err := connPool.getNextConnection(userMode)
2✔
732
        if err != nil {
2✔
733
                return newErrorFuture(err)
×
734
        }
×
735

736
        return conn.ExecuteAsync(expr, args)
2✔
737
}
738

739
// NewStream creates new Stream object for connection selected
740
// by userMode from connPool.
741
//
742
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
743
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
744
// Since 1.7.0
745
func (connPool *ConnectionPool) NewStream(userMode Mode) (*tarantool.Stream, error) {
6✔
746
        conn, err := connPool.getNextConnection(userMode)
6✔
747
        if err != nil {
6✔
748
                return nil, err
×
749
        }
×
750
        return conn.NewStream()
6✔
751
}
752

753
// NewPrepared passes a sql statement to Tarantool for preparation synchronously.
754
func (connPool *ConnectionPool) NewPrepared(expr string, userMode Mode) (*tarantool.Prepared, error) {
2✔
755
        conn, err := connPool.getNextConnection(userMode)
2✔
756
        if err != nil {
2✔
757
                return nil, err
×
758
        }
×
759
        return conn.NewPrepared(expr)
2✔
760
}
761

762
// NewWatcher creates a new Watcher object for the connection pool.
763
//
764
// You need to require WatchersFeature to use watchers, see examples for the
765
// function.
766
//
767
// The behavior is same as if Connection.NewWatcher() called for each
768
// connection with a suitable role.
769
//
770
// Keep in mind that garbage collection of a watcher handle doesn’t lead to the
771
// watcher’s destruction. In this case, the watcher remains registered. You
772
// need to call Unregister() directly.
773
//
774
// Unregister() guarantees that there will be no the watcher's callback calls
775
// after it, but Unregister() call from the callback leads to a deadlock.
776
//
777
// See:
778
// https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_events/#box-watchers
779
//
780
// Since 1.10.0
781
func (pool *ConnectionPool) NewWatcher(key string,
782
        callback tarantool.WatchCallback, mode Mode) (tarantool.Watcher, error) {
2,020✔
783
        watchersRequired := false
2,020✔
784
        for _, feature := range pool.connOpts.RequiredProtocolInfo.Features {
4,036✔
785
                if tarantool.WatchersFeature == feature {
4,032✔
786
                        watchersRequired = true
2,016✔
787
                        break
2,016✔
788
                }
789
        }
790
        if !watchersRequired {
2,024✔
791
                return nil, errors.New("the feature WatchersFeature must be " +
4✔
792
                        "required by connection options to create a watcher")
4✔
793
        }
4✔
794

795
        watcher := &poolWatcher{
2,016✔
796
                container:    &pool.watcherContainer,
2,016✔
797
                mode:         mode,
2,016✔
798
                key:          key,
2,016✔
799
                callback:     callback,
2,016✔
800
                watchers:     make(map[string]tarantool.Watcher),
2,016✔
801
                unregistered: false,
2,016✔
802
        }
2,016✔
803

2,016✔
804
        watcher.container.add(watcher)
2,016✔
805

2,016✔
806
        rr := pool.anyPool
2,016✔
807
        if mode == RW {
2,022✔
808
                rr = pool.rwPool
6✔
809
        } else if mode == RO {
2,018✔
810
                rr = pool.roPool
2✔
811
        }
2✔
812

813
        conns := rr.GetConnections()
2,016✔
814
        for _, conn := range conns {
12,074✔
815
                if err := watcher.watch(conn); err != nil {
10,058✔
816
                        conn.Close()
×
817
                }
×
818
        }
819

820
        return watcher, nil
2,016✔
821
}
822

823
// Do sends the request and returns a future.
824
// For requests that belong to the only one connection (e.g. Unprepare or ExecutePrepared)
825
// the argument of type Mode is unused.
826
func (connPool *ConnectionPool) Do(req tarantool.Request, userMode Mode) *tarantool.Future {
36✔
827
        if connectedReq, ok := req.(tarantool.ConnectedRequest); ok {
46✔
828
                conn, _ := connPool.getConnectionFromPool(connectedReq.Conn().Addr())
10✔
829
                if conn == nil {
12✔
830
                        return newErrorFuture(fmt.Errorf("the passed connected request doesn't belong to the current connection or connection pool"))
2✔
831
                }
2✔
832
                return connectedReq.Conn().Do(req)
8✔
833
        }
834
        conn, err := connPool.getNextConnection(userMode)
26✔
835
        if err != nil {
28✔
836
                return newErrorFuture(err)
2✔
837
        }
2✔
838

839
        return conn.Do(req)
24✔
840
}
841

842
//
843
// private
844
//
845

846
func (connPool *ConnectionPool) getConnectionRole(conn *tarantool.Connection) (Role, error) {
583✔
847
        resp, err := conn.Call17("box.info", []interface{}{})
583✔
848
        if err != nil {
585✔
849
                return UnknownRole, err
2✔
850
        }
2✔
851
        if resp == nil {
581✔
852
                return UnknownRole, ErrIncorrectResponse
×
853
        }
×
854
        if len(resp.Data) < 1 {
581✔
855
                return UnknownRole, ErrIncorrectResponse
×
856
        }
×
857

858
        instanceStatus, ok := resp.Data[0].(map[interface{}]interface{})["status"]
581✔
859
        if !ok {
581✔
860
                return UnknownRole, ErrIncorrectResponse
×
861
        }
×
862
        if instanceStatus != "running" {
581✔
863
                return UnknownRole, ErrIncorrectStatus
×
864
        }
×
865

866
        replicaRole, ok := resp.Data[0].(map[interface{}]interface{})["ro"]
581✔
867
        if !ok {
581✔
868
                return UnknownRole, ErrIncorrectResponse
×
869
        }
×
870

871
        switch replicaRole {
581✔
872
        case false:
241✔
873
                return MasterRole, nil
241✔
874
        case true:
340✔
875
                return ReplicaRole, nil
340✔
876
        }
877

878
        return UnknownRole, nil
×
879
}
880

881
func (connPool *ConnectionPool) getConnectionFromPool(addr string) (*tarantool.Connection, Role) {
109✔
882
        if conn := connPool.rwPool.GetConnByAddr(addr); conn != nil {
170✔
883
                return conn, MasterRole
61✔
884
        }
61✔
885

886
        if conn := connPool.roPool.GetConnByAddr(addr); conn != nil {
78✔
887
                return conn, ReplicaRole
30✔
888
        }
30✔
889

890
        return connPool.anyPool.GetConnByAddr(addr), UnknownRole
18✔
891
}
892

893
func (pool *ConnectionPool) deleteConnection(addr string) {
547✔
894
        if conn := pool.anyPool.DeleteConnByAddr(addr); conn != nil {
1,092✔
895
                if conn := pool.rwPool.DeleteConnByAddr(addr); conn == nil {
858✔
896
                        pool.roPool.DeleteConnByAddr(addr)
313✔
897
                }
313✔
898
                // The internal connection deinitialization.
899
                pool.watcherContainer.mutex.RLock()
545✔
900
                defer pool.watcherContainer.mutex.RUnlock()
545✔
901

545✔
902
                pool.watcherContainer.foreach(func(watcher *poolWatcher) error {
555✔
903
                        watcher.unwatch(conn)
10✔
904
                        return nil
10✔
905
                })
10✔
906
        }
907
}
908

909
func (pool *ConnectionPool) addConnection(addr string,
910
        conn *tarantool.Connection, role Role) error {
545✔
911
        // The internal connection initialization.
545✔
912
        pool.watcherContainer.mutex.RLock()
545✔
913
        defer pool.watcherContainer.mutex.RUnlock()
545✔
914

545✔
915
        watched := []*poolWatcher{}
545✔
916
        err := pool.watcherContainer.foreach(func(watcher *poolWatcher) error {
555✔
917
                watch := false
10✔
918
                if watcher.mode == RW {
20✔
919
                        watch = role == MasterRole
10✔
920
                } else if watcher.mode == RO {
10✔
921
                        watch = role == ReplicaRole
×
922
                } else {
×
923
                        watch = true
×
924
                }
×
925
                if watch {
16✔
926
                        if err := watcher.watch(conn); err != nil {
6✔
927
                                return err
×
928
                        }
×
929
                        watched = append(watched, watcher)
6✔
930
                }
931
                return nil
10✔
932
        })
933
        if err != nil {
545✔
934
                for _, watcher := range watched {
×
935
                        watcher.unwatch(conn)
×
936
                }
×
937
                log.Printf("tarantool: failed initialize watchers for %s: %s", addr, err)
×
938
                return err
×
939
        }
940

941
        pool.anyPool.AddConn(addr, conn)
545✔
942

545✔
943
        switch role {
545✔
944
        case MasterRole:
232✔
945
                pool.rwPool.AddConn(addr, conn)
232✔
946
        case ReplicaRole:
313✔
947
                pool.roPool.AddConn(addr, conn)
313✔
948
        }
949
        return nil
545✔
950
}
951

952
func (connPool *ConnectionPool) handlerDiscovered(conn *tarantool.Connection,
953
        role Role) bool {
557✔
954
        var err error
557✔
955
        if connPool.opts.ConnectionHandler != nil {
578✔
956
                err = connPool.opts.ConnectionHandler.Discovered(conn, role)
21✔
957
        }
21✔
958

959
        if err != nil {
568✔
960
                addr := conn.Addr()
11✔
961
                log.Printf("tarantool: storing connection to %s canceled: %s\n", addr, err)
11✔
962
                return false
11✔
963
        }
11✔
964
        return true
546✔
965
}
966

967
func (connPool *ConnectionPool) handlerDeactivated(conn *tarantool.Connection,
968
        role Role) {
514✔
969
        var err error
514✔
970
        if connPool.opts.ConnectionHandler != nil {
524✔
971
                err = connPool.opts.ConnectionHandler.Deactivated(conn, role)
10✔
972
        }
10✔
973

974
        if err != nil {
514✔
975
                addr := conn.Addr()
×
976
                log.Printf("tarantool: deactivating connection to %s by user failed: %s\n", addr, err)
×
977
        }
×
978
}
979

980
func (connPool *ConnectionPool) fillPools() bool {
134✔
981
        somebodyAlive := false
134✔
982

134✔
983
        // It is called before controller() goroutines so we don't expect
134✔
984
        // concurrency issues here.
134✔
985
        for addr := range connPool.addrs {
650✔
986
                end := newEndpoint(addr)
516✔
987
                connPool.addrs[addr] = end
516✔
988

516✔
989
                connOpts := connPool.connOpts
516✔
990
                connOpts.Notify = end.notify
516✔
991
                conn, err := tarantool.Connect(addr, connOpts)
516✔
992
                if err != nil {
522✔
993
                        log.Printf("tarantool: connect to %s failed: %s\n", addr, err.Error())
6✔
994
                } else if conn != nil {
1,026✔
995
                        role, err := connPool.getConnectionRole(conn)
510✔
996
                        if err != nil {
510✔
997
                                conn.Close()
×
998
                                log.Printf("tarantool: storing connection to %s failed: %s\n", addr, err)
×
999
                                continue
×
1000
                        }
1001

1002
                        if connPool.handlerDiscovered(conn, role) {
1,016✔
1003
                                if connPool.addConnection(addr, conn, role) != nil {
506✔
1004
                                        conn.Close()
×
1005
                                        connPool.handlerDeactivated(conn, role)
×
1006
                                }
×
1007

1008
                                if conn.ConnectedNow() {
1,012✔
1009
                                        end.conn = conn
506✔
1010
                                        end.role = role
506✔
1011
                                        somebodyAlive = true
506✔
1012
                                } else {
506✔
1013
                                        connPool.deleteConnection(addr)
×
1014
                                        conn.Close()
×
1015
                                        connPool.handlerDeactivated(conn, role)
×
1016
                                }
×
1017
                        } else {
4✔
1018
                                conn.Close()
4✔
1019
                        }
4✔
1020
                }
1021
        }
1022

1023
        return somebodyAlive
134✔
1024
}
1025

1026
func (pool *ConnectionPool) updateConnection(e *endpoint) {
57✔
1027
        pool.poolsMutex.Lock()
57✔
1028

57✔
1029
        if pool.state.get() != connectedState {
62✔
1030
                pool.poolsMutex.Unlock()
5✔
1031
                return
5✔
1032
        }
5✔
1033

1034
        if role, err := pool.getConnectionRole(e.conn); err == nil {
102✔
1035
                if e.role != role {
76✔
1036
                        pool.deleteConnection(e.addr)
26✔
1037
                        pool.poolsMutex.Unlock()
26✔
1038

26✔
1039
                        pool.handlerDeactivated(e.conn, e.role)
26✔
1040
                        opened := pool.handlerDiscovered(e.conn, role)
26✔
1041
                        if !opened {
30✔
1042
                                e.conn.Close()
4✔
1043
                                e.conn = nil
4✔
1044
                                e.role = UnknownRole
4✔
1045
                                return
4✔
1046
                        }
4✔
1047

1048
                        pool.poolsMutex.Lock()
22✔
1049
                        if pool.state.get() != connectedState {
23✔
1050
                                pool.poolsMutex.Unlock()
1✔
1051

1✔
1052
                                e.conn.Close()
1✔
1053
                                pool.handlerDeactivated(e.conn, role)
1✔
1054
                                e.conn = nil
1✔
1055
                                e.role = UnknownRole
1✔
1056
                                return
1✔
1057
                        }
1✔
1058

1059
                        if pool.addConnection(e.addr, e.conn, role) != nil {
21✔
1060
                                pool.poolsMutex.Unlock()
×
1061

×
1062
                                e.conn.Close()
×
1063
                                pool.handlerDeactivated(e.conn, role)
×
1064
                                e.conn = nil
×
1065
                                e.role = UnknownRole
×
1066
                                return
×
1067
                        }
×
1068
                        e.role = role
21✔
1069
                }
1070
                pool.poolsMutex.Unlock()
45✔
1071
                return
45✔
1072
        } else {
2✔
1073
                pool.deleteConnection(e.addr)
2✔
1074
                pool.poolsMutex.Unlock()
2✔
1075

2✔
1076
                e.conn.Close()
2✔
1077
                pool.handlerDeactivated(e.conn, e.role)
2✔
1078
                e.conn = nil
2✔
1079
                e.role = UnknownRole
2✔
1080
                return
2✔
1081
        }
2✔
1082
}
1083

1084
func (pool *ConnectionPool) tryConnect(e *endpoint) error {
26✔
1085
        pool.poolsMutex.Lock()
26✔
1086

26✔
1087
        if pool.state.get() != connectedState {
29✔
1088
                pool.poolsMutex.Unlock()
3✔
1089
                return ErrClosed
3✔
1090
        }
3✔
1091

1092
        e.conn = nil
23✔
1093
        e.role = UnknownRole
23✔
1094

23✔
1095
        connOpts := pool.connOpts
23✔
1096
        connOpts.Notify = e.notify
23✔
1097
        conn, err := tarantool.Connect(e.addr, connOpts)
23✔
1098
        if err == nil {
44✔
1099
                role, err := pool.getConnectionRole(conn)
21✔
1100
                pool.poolsMutex.Unlock()
21✔
1101

21✔
1102
                if err != nil {
21✔
1103
                        conn.Close()
×
1104
                        log.Printf("tarantool: storing connection to %s failed: %s\n", e.addr, err)
×
1105
                        return err
×
1106
                }
×
1107

1108
                opened := pool.handlerDiscovered(conn, role)
21✔
1109
                if !opened {
24✔
1110
                        conn.Close()
3✔
1111
                        return errors.New("storing connection canceled")
3✔
1112
                }
3✔
1113

1114
                pool.poolsMutex.Lock()
18✔
1115
                if pool.state.get() != connectedState {
18✔
1116
                        pool.poolsMutex.Unlock()
×
1117
                        conn.Close()
×
1118
                        pool.handlerDeactivated(conn, role)
×
1119
                        return ErrClosed
×
1120
                }
×
1121

1122
                if err = pool.addConnection(e.addr, conn, role); err != nil {
18✔
1123
                        pool.poolsMutex.Unlock()
×
1124
                        conn.Close()
×
1125
                        pool.handlerDeactivated(conn, role)
×
1126
                        return err
×
1127
                }
×
1128
                e.conn = conn
18✔
1129
                e.role = role
18✔
1130
        }
1131

1132
        pool.poolsMutex.Unlock()
20✔
1133
        return err
20✔
1134
}
1135

1136
func (pool *ConnectionPool) reconnect(e *endpoint) {
×
1137
        pool.poolsMutex.Lock()
×
1138

×
1139
        if pool.state.get() != connectedState {
×
1140
                pool.poolsMutex.Unlock()
×
1141
                return
×
1142
        }
×
1143

1144
        pool.deleteConnection(e.addr)
×
1145
        pool.poolsMutex.Unlock()
×
1146

×
1147
        pool.handlerDeactivated(e.conn, e.role)
×
1148
        e.conn = nil
×
1149
        e.role = UnknownRole
×
1150

×
1151
        pool.tryConnect(e)
×
1152
}
1153

1154
func (pool *ConnectionPool) controller(e *endpoint) {
516✔
1155
        timer := time.NewTicker(pool.opts.CheckTimeout)
516✔
1156
        defer timer.Stop()
516✔
1157

516✔
1158
        shutdown := false
516✔
1159
        for {
2,498✔
1160
                if shutdown {
2,016✔
1161
                        // Graceful shutdown in progress. We need to wait for a finish or
34✔
1162
                        // to force close.
34✔
1163
                        select {
34✔
1164
                        case <-e.closed:
31✔
1165
                        case <-e.close:
3✔
1166
                        }
1167
                }
1168

1169
                select {
1,982✔
1170
                case <-e.closed:
516✔
1171
                        return
516✔
1172
                default:
1,466✔
1173
                }
1174

1175
                select {
1,466✔
1176
                // e.close has priority to avoid concurrency with e.shutdown.
1177
                case <-e.close:
486✔
1178
                        if e.conn != nil {
963✔
1179
                                pool.poolsMutex.Lock()
477✔
1180
                                pool.deleteConnection(e.addr)
477✔
1181
                                pool.poolsMutex.Unlock()
477✔
1182

477✔
1183
                                if !shutdown {
952✔
1184
                                        e.closeErr = e.conn.Close()
475✔
1185
                                        pool.handlerDeactivated(e.conn, e.role)
475✔
1186
                                        close(e.closed)
475✔
1187
                                } else {
477✔
1188
                                        // Force close the connection.
2✔
1189
                                        e.conn.Close()
2✔
1190
                                        // And wait for a finish.
2✔
1191
                                        <-e.closed
2✔
1192
                                }
2✔
1193
                        } else {
9✔
1194
                                close(e.closed)
9✔
1195
                        }
9✔
1196
                default:
980✔
1197
                        select {
980✔
1198
                        case <-e.shutdown:
32✔
1199
                                shutdown = true
32✔
1200
                                if e.conn != nil {
64✔
1201
                                        pool.poolsMutex.Lock()
32✔
1202
                                        pool.deleteConnection(e.addr)
32✔
1203
                                        pool.poolsMutex.Unlock()
32✔
1204

32✔
1205
                                        // We need to catch s.close in the current goroutine, so
32✔
1206
                                        // we need to start an another one for the shutdown.
32✔
1207
                                        go func() {
64✔
1208
                                                e.closeErr = e.conn.CloseGraceful()
32✔
1209
                                                close(e.closed)
32✔
1210
                                        }()
32✔
1211
                                } else {
×
1212
                                        close(e.closed)
×
1213
                                }
×
1214
                        default:
948✔
1215
                                select {
948✔
1216
                                case <-e.close:
409✔
1217
                                        // Will be processed at an upper level.
1218
                                case <-e.shutdown:
5✔
1219
                                        // Will be processed at an upper level.
1220
                                case <-e.notify:
461✔
1221
                                        if e.conn != nil && e.conn.ClosedNow() {
471✔
1222
                                                pool.poolsMutex.Lock()
10✔
1223
                                                if pool.state.get() == connectedState {
20✔
1224
                                                        pool.deleteConnection(e.addr)
10✔
1225
                                                        pool.poolsMutex.Unlock()
10✔
1226
                                                        pool.handlerDeactivated(e.conn, e.role)
10✔
1227
                                                        e.conn = nil
10✔
1228
                                                        e.role = UnknownRole
10✔
1229
                                                } else {
10✔
1230
                                                        pool.poolsMutex.Unlock()
×
1231
                                                }
×
1232
                                        }
1233
                                case <-timer.C:
73✔
1234
                                        // Reopen connection.
73✔
1235
                                        // Relocate connection between subpools
73✔
1236
                                        // if ro/rw was updated.
73✔
1237
                                        if e.conn == nil {
89✔
1238
                                                pool.tryConnect(e)
16✔
1239
                                        } else if !e.conn.ClosedNow() {
130✔
1240
                                                pool.updateConnection(e)
57✔
1241
                                        } else {
57✔
1242
                                                pool.reconnect(e)
×
1243
                                        }
×
1244
                                }
1245
                        }
1246
                }
1247
        }
1248
}
1249

1250
func (connPool *ConnectionPool) getNextConnection(mode Mode) (*tarantool.Connection, error) {
406✔
1251

406✔
1252
        switch mode {
406✔
1253
        case ANY:
74✔
1254
                if next := connPool.anyPool.GetNextConnection(); next != nil {
144✔
1255
                        return next, nil
70✔
1256
                }
70✔
1257
        case RW:
118✔
1258
                if next := connPool.rwPool.GetNextConnection(); next != nil {
232✔
1259
                        return next, nil
114✔
1260
                }
114✔
1261
                return nil, ErrNoRwInstance
4✔
1262
        case RO:
56✔
1263
                if next := connPool.roPool.GetNextConnection(); next != nil {
110✔
1264
                        return next, nil
54✔
1265
                }
54✔
1266
                return nil, ErrNoRoInstance
2✔
1267
        case PreferRW:
96✔
1268
                if next := connPool.rwPool.GetNextConnection(); next != nil {
182✔
1269
                        return next, nil
86✔
1270
                }
86✔
1271
                if next := connPool.roPool.GetNextConnection(); next != nil {
20✔
1272
                        return next, nil
10✔
1273
                }
10✔
1274
        case PreferRO:
62✔
1275
                if next := connPool.roPool.GetNextConnection(); next != nil {
114✔
1276
                        return next, nil
52✔
1277
                }
52✔
1278
                if next := connPool.rwPool.GetNextConnection(); next != nil {
20✔
1279
                        return next, nil
10✔
1280
                }
10✔
1281
        }
1282
        return nil, ErrNoHealthyInstance
4✔
1283
}
1284

1285
func (connPool *ConnectionPool) getConnByMode(defaultMode Mode, userMode []Mode) (*tarantool.Connection, error) {
62✔
1286
        if len(userMode) > 1 {
62✔
1287
                return nil, ErrTooManyArgs
×
1288
        }
×
1289

1290
        mode := defaultMode
62✔
1291
        if len(userMode) > 0 {
100✔
1292
                mode = userMode[0]
38✔
1293
        }
38✔
1294

1295
        return connPool.getNextConnection(mode)
62✔
1296
}
1297

1298
func newErrorFuture(err error) *tarantool.Future {
6✔
1299
        fut := tarantool.NewFuture()
6✔
1300
        fut.SetError(err)
6✔
1301
        return fut
6✔
1302
}
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc