• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / go-tarantool / 6933294594

20 Nov 2023 04:56PM UTC coverage: 79.76% (+0.08%) from 79.68%
6933294594

Pull #349

github

askalt
docs: update according to the changes
Pull Request #349: connection: support connection via an existing socket fd

255 of 309 new or added lines in 7 files covered. (82.52%)

24 existing lines in 3 files now uncovered.

5647 of 7080 relevant lines covered (79.76%)

11113.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.47
/pool/connection_pool.go
1
// Package with methods to work with a Tarantool cluster
2
// considering master discovery.
3
//
4
// Main features:
5
//
6
// - Return available connection from pool according to round-robin strategy.
7
//
8
// - Automatic master discovery by mode parameter.
9
//
10
// Since: 1.6.0
11
package pool
12

13
import (
14
        "context"
15
        "errors"
16
        "log"
17
        "sync"
18
        "time"
19

20
        "github.com/tarantool/go-iproto"
21

22
        "github.com/tarantool/go-tarantool/v2"
23
)
24

25
var (
26
        ErrEmptyDialers      = errors.New("dialers (second argument) should not be empty")
27
        ErrWrongCheckTimeout = errors.New("wrong check timeout, must be greater than 0")
28
        ErrNoConnection      = errors.New("no active connections")
29
        ErrTooManyArgs       = errors.New("too many arguments")
30
        ErrIncorrectResponse = errors.New("incorrect response format")
31
        ErrIncorrectStatus   = errors.New("incorrect instance status: status should be `running`")
32
        ErrNoRwInstance      = errors.New("can't find rw instance in pool")
33
        ErrNoRoInstance      = errors.New("can't find ro instance in pool")
34
        ErrNoHealthyInstance = errors.New("can't find healthy instance in pool")
35
        ErrExists            = errors.New("endpoint exists")
36
        ErrClosed            = errors.New("pool is closed")
37
        ErrUnknownRequest    = errors.New("the passed connected request doesn't belong to " +
38
                "the current connection pool")
39
        ErrContextCanceled = errors.New("operation was canceled")
40
)
41

42
// ConnectionHandler provides callbacks for components interested in handling
43
// changes of connections in a ConnectionPool.
44
type ConnectionHandler interface {
45
        // Discovered is called when a connection with a role has been detected
46
        // (for the first time or when a role of a connection has been changed),
47
        // but is not yet available to send requests. It allows for a client to
48
        // initialize the connection before using it in a pool.
49
        //
50
        // The client code may cancel adding a connection to the pool. The client
51
        // need to return an error from the Discovered call for that. In this case
52
        // the pool will close connection and will try to reopen it later.
53
        Discovered(id string, conn *tarantool.Connection, role Role) error
54
        // Deactivated is called when a connection with a role has become
55
        // unavaileble to send requests. It happens if the connection is closed or
56
        // the connection role is switched.
57
        //
58
        // So if a connection switches a role, a pool calls:
59
        // Deactivated() + Discovered().
60
        //
61
        // Deactivated will not be called if a previous Discovered() call returns
62
        // an error. Because in this case, the connection does not become available
63
        // for sending requests.
64
        Deactivated(id string, conn *tarantool.Connection, role Role) error
65
}
66

67
// Opts provides additional options (configurable via ConnectWithOpts).
68
type Opts struct {
69
        // Timeout for timer to reopen connections that have been closed by some
70
        // events and to relocate connection between subpools if ro/rw role has
71
        // been updated.
72
        CheckTimeout time.Duration
73
        // ConnectionHandler provides an ability to handle connection updates.
74
        ConnectionHandler ConnectionHandler
75
}
76

77
/*
78
ConnectionInfo structure for information about connection statuses:
79

80
- ConnectedNow reports if connection is established at the moment.
81

82
- ConnRole reports master/replica role of instance.
83
*/
84
type ConnectionInfo struct {
85
        ConnectedNow bool
86
        ConnRole     Role
87
}
88

89
/*
90
Main features:
91

92
- Return available connection from pool according to round-robin strategy.
93

94
- Automatic master discovery by mode parameter.
95
*/
96
type ConnectionPool struct {
97
        ends      map[string]*endpoint
98
        endsMutex sync.RWMutex
99

100
        connOpts tarantool.Opts
101
        opts     Opts
102

103
        state            state
104
        done             chan struct{}
105
        roPool           *roundRobinStrategy
106
        rwPool           *roundRobinStrategy
107
        anyPool          *roundRobinStrategy
108
        poolsMutex       sync.RWMutex
109
        watcherContainer watcherContainer
110
}
111

112
var _ Pooler = (*ConnectionPool)(nil)
113

114
type endpoint struct {
115
        id     string
116
        dialer tarantool.Dialer
117
        notify chan tarantool.ConnEvent
118
        conn   *tarantool.Connection
119
        role   Role
120
        // This is used to switch a connection states.
121
        shutdown chan struct{}
122
        close    chan struct{}
123
        closed   chan struct{}
124
        cancel   context.CancelFunc
125
        closeErr error
126
}
127

128
func newEndpoint(id string, dialer tarantool.Dialer) *endpoint {
432✔
129
        return &endpoint{
432✔
130
                id:       id,
432✔
131
                dialer:   dialer,
432✔
132
                notify:   make(chan tarantool.ConnEvent, 100),
432✔
133
                conn:     nil,
432✔
134
                role:     UnknownRole,
432✔
135
                shutdown: make(chan struct{}),
432✔
136
                close:    make(chan struct{}),
432✔
137
                closed:   make(chan struct{}),
432✔
138
                cancel:   nil,
432✔
139
        }
432✔
140
}
432✔
141

142
// ConnectWithOpts creates pool for instances with specified dialers and options opts.
143
// Each dialer corresponds to a certain id by which they will be distinguished.
144
func ConnectWithOpts(ctx context.Context, dialers map[string]tarantool.Dialer,
145
        connOpts tarantool.Opts, opts Opts) (*ConnectionPool, error) {
118✔
146
        if len(dialers) == 0 {
120✔
147
                return nil, ErrEmptyDialers
2✔
148
        }
2✔
149
        if opts.CheckTimeout <= 0 {
118✔
150
                return nil, ErrWrongCheckTimeout
2✔
151
        }
2✔
152

153
        size := len(dialers)
114✔
154
        rwPool := newRoundRobinStrategy(size)
114✔
155
        roPool := newRoundRobinStrategy(size)
114✔
156
        anyPool := newRoundRobinStrategy(size)
114✔
157

114✔
158
        connPool := &ConnectionPool{
114✔
159
                ends:     make(map[string]*endpoint),
114✔
160
                connOpts: connOpts,
114✔
161
                opts:     opts,
114✔
162
                state:    unknownState,
114✔
163
                done:     make(chan struct{}),
114✔
164
                rwPool:   rwPool,
114✔
165
                roPool:   roPool,
114✔
166
                anyPool:  anyPool,
114✔
167
        }
114✔
168

114✔
169
        somebodyAlive, ctxCanceled := connPool.fillPools(ctx, dialers)
114✔
170
        if !somebodyAlive {
122✔
171
                connPool.state.set(closedState)
8✔
172
                if ctxCanceled {
12✔
173
                        return nil, ErrContextCanceled
4✔
174
                }
4✔
175
                return nil, ErrNoConnection
4✔
176
        }
177

178
        connPool.state.set(connectedState)
106✔
179

106✔
180
        for _, s := range connPool.ends {
506✔
181
                endpointCtx, cancel := context.WithCancel(context.Background())
400✔
182
                s.cancel = cancel
400✔
183
                go connPool.controller(endpointCtx, s)
400✔
184
        }
400✔
185

186
        return connPool, nil
106✔
187
}
188

189
// Connect creates pool for instances with specified dialers.
190
// Each dialer corresponds to a certain id by which they will be distinguished.
191
//
192
// It is useless to set up tarantool.Opts.Reconnect value for a connection.
193
// The connection pool has its own reconnection logic. See
194
// Opts.CheckTimeout description.
195
func Connect(ctx context.Context, dialers map[string]tarantool.Dialer,
196
        connOpts tarantool.Opts) (*ConnectionPool, error) {
108✔
197
        opts := Opts{
108✔
198
                CheckTimeout: 1 * time.Second,
108✔
199
        }
108✔
200
        return ConnectWithOpts(ctx, dialers, connOpts, opts)
108✔
201
}
108✔
202

203
// ConnectedNow gets connected status of pool.
204
func (p *ConnectionPool) ConnectedNow(mode Mode) (bool, error) {
68✔
205
        p.poolsMutex.RLock()
68✔
206
        defer p.poolsMutex.RUnlock()
68✔
207

68✔
208
        if p.state.get() != connectedState {
76✔
209
                return false, nil
8✔
210
        }
8✔
211
        switch mode {
60✔
212
        case ANY:
60✔
213
                return !p.anyPool.IsEmpty(), nil
60✔
214
        case RW:
×
215
                return !p.rwPool.IsEmpty(), nil
×
216
        case RO:
×
217
                return !p.roPool.IsEmpty(), nil
×
218
        case PreferRW:
×
219
                fallthrough
×
220
        case PreferRO:
×
221
                return !p.rwPool.IsEmpty() || !p.roPool.IsEmpty(), nil
×
222
        default:
×
223
                return false, ErrNoHealthyInstance
×
224
        }
225
}
226

227
// ConfiguredTimeout gets timeout of current connection.
228
func (p *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, error) {
×
229
        conn, err := p.getNextConnection(mode)
×
230
        if err != nil {
×
231
                return 0, err
×
232
        }
×
233

234
        return conn.ConfiguredTimeout(), nil
×
235
}
236

237
// Add adds a new endpoint with the id into the pool. This function
238
// adds the endpoint only after successful connection.
239
func (p *ConnectionPool) Add(ctx context.Context, id string, dialer tarantool.Dialer) error {
18✔
240
        e := newEndpoint(id, dialer)
18✔
241

18✔
242
        p.endsMutex.Lock()
18✔
243
        // Ensure that Close()/CloseGraceful() not in progress/done.
18✔
244
        if p.state.get() != connectedState {
24✔
245
                p.endsMutex.Unlock()
6✔
246
                return ErrClosed
6✔
247
        }
6✔
248
        if _, ok := p.ends[id]; ok {
14✔
249
                p.endsMutex.Unlock()
2✔
250
                return ErrExists
2✔
251
        }
2✔
252

253
        endpointCtx, cancel := context.WithCancel(context.Background())
10✔
254
        e.cancel = cancel
10✔
255

10✔
256
        p.ends[id] = e
10✔
257
        p.endsMutex.Unlock()
10✔
258

10✔
259
        if err := p.tryConnect(ctx, e); err != nil {
12✔
260
                p.endsMutex.Lock()
2✔
261
                delete(p.ends, id)
2✔
262
                p.endsMutex.Unlock()
2✔
263
                e.cancel()
2✔
264
                close(e.closed)
2✔
265
                return err
2✔
266
        }
2✔
267

268
        go p.controller(endpointCtx, e)
8✔
269
        return nil
8✔
270
}
271

272
// Remove removes an endpoint with the id from the pool. The call
273
// closes an active connection gracefully.
274
func (p *ConnectionPool) Remove(id string) error {
38✔
275
        p.endsMutex.Lock()
38✔
276
        endpoint, ok := p.ends[id]
38✔
277
        if !ok {
60✔
278
                p.endsMutex.Unlock()
22✔
279
                return errors.New("endpoint not exist")
22✔
280
        }
22✔
281

282
        select {
16✔
283
        case <-endpoint.close:
2✔
284
                // Close() in progress/done.
285
        case <-endpoint.shutdown:
2✔
286
                // CloseGraceful()/Remove() in progress/done.
287
        default:
12✔
288
                endpoint.cancel()
12✔
289
                close(endpoint.shutdown)
12✔
290
        }
291

292
        delete(p.ends, id)
16✔
293
        p.endsMutex.Unlock()
16✔
294

16✔
295
        <-endpoint.closed
16✔
296
        return nil
16✔
297
}
298

299
func (p *ConnectionPool) waitClose() []error {
110✔
300
        p.endsMutex.RLock()
110✔
301
        endpoints := make([]*endpoint, 0, len(p.ends))
110✔
302
        for _, e := range p.ends {
520✔
303
                endpoints = append(endpoints, e)
410✔
304
        }
410✔
305
        p.endsMutex.RUnlock()
110✔
306

110✔
307
        errs := make([]error, 0, len(endpoints))
110✔
308
        for _, e := range endpoints {
520✔
309
                <-e.closed
410✔
310
                if e.closeErr != nil {
410✔
311
                        errs = append(errs, e.closeErr)
×
312
                }
×
313
        }
314
        return errs
110✔
315
}
316

317
// Close closes connections in the ConnectionPool.
318
func (p *ConnectionPool) Close() []error {
102✔
319
        if p.state.cas(connectedState, closedState) ||
102✔
320
                p.state.cas(shutdownState, closedState) {
202✔
321
                p.endsMutex.RLock()
100✔
322
                for _, s := range p.ends {
486✔
323
                        s.cancel()
386✔
324
                        close(s.close)
386✔
325
                }
386✔
326
                p.endsMutex.RUnlock()
100✔
327
        }
328

329
        return p.waitClose()
102✔
330
}
331

332
// CloseGraceful closes connections in the ConnectionPool gracefully. It waits
333
// for all requests to complete.
334
func (p *ConnectionPool) CloseGraceful() []error {
8✔
335
        if p.state.cas(connectedState, shutdownState) {
16✔
336
                p.endsMutex.RLock()
8✔
337
                for _, s := range p.ends {
28✔
338
                        s.cancel()
20✔
339
                        close(s.shutdown)
20✔
340
                }
20✔
341
                p.endsMutex.RUnlock()
8✔
342
        }
343

344
        return p.waitClose()
8✔
345
}
346

347
// GetInfo gets information of connections (connected status, ro/rw role).
348
func (p *ConnectionPool) GetInfo() map[string]ConnectionInfo {
44✔
349
        info := make(map[string]ConnectionInfo)
44✔
350

44✔
351
        p.endsMutex.RLock()
44✔
352
        defer p.endsMutex.RUnlock()
44✔
353
        p.poolsMutex.RLock()
44✔
354
        defer p.poolsMutex.RUnlock()
44✔
355

44✔
356
        if p.state.get() != connectedState {
48✔
357
                return info
4✔
358
        }
4✔
359

360
        for id := range p.ends {
140✔
361
                conn, role := p.getConnectionFromPool(id)
100✔
362
                if conn != nil {
182✔
363
                        info[id] = ConnectionInfo{ConnectedNow: conn.ConnectedNow(), ConnRole: role}
82✔
364
                } else {
100✔
365
                        info[id] = ConnectionInfo{}
18✔
366
                }
18✔
367
        }
368

369
        return info
40✔
370
}
371

372
// Ping sends empty request to Tarantool to check connection.
373
//
374
// Deprecated: the method will be removed in the next major version,
375
// use a PingRequest object + Do() instead.
376
func (p *ConnectionPool) Ping(userMode Mode) (*tarantool.Response, error) {
12✔
377
        conn, err := p.getNextConnection(userMode)
12✔
378
        if err != nil {
14✔
379
                return nil, err
2✔
380
        }
2✔
381

382
        return conn.Ping()
10✔
383
}
384

385
// Select performs select to box space.
386
//
387
// Deprecated: the method will be removed in the next major version,
388
// use a SelectRequest object + Do() instead.
389
func (p *ConnectionPool) Select(space, index interface{},
390
        offset, limit uint32,
391
        iterator tarantool.Iter, key interface{}, userMode ...Mode) (*tarantool.Response, error) {
10✔
392
        conn, err := p.getConnByMode(ANY, userMode)
10✔
393
        if err != nil {
10✔
394
                return nil, err
×
395
        }
×
396

397
        return conn.Select(space, index, offset, limit, iterator, key)
10✔
398
}
399

400
// Insert performs insertion to box space.
401
// Tarantool will reject Insert when tuple with same primary key exists.
402
//
403
// Deprecated: the method will be removed in the next major version,
404
// use an InsertRequest object + Do() instead.
405
func (p *ConnectionPool) Insert(space interface{}, tuple interface{},
406
        userMode ...Mode) (*tarantool.Response, error) {
4✔
407
        conn, err := p.getConnByMode(RW, userMode)
4✔
408
        if err != nil {
4✔
409
                return nil, err
×
410
        }
×
411

412
        return conn.Insert(space, tuple)
4✔
413
}
414

415
// Replace performs "insert or replace" action to box space.
416
// If tuple with same primary key exists, it will be replaced.
417
//
418
// Deprecated: the method will be removed in the next major version,
419
// use a ReplaceRequest object + Do() instead.
420
func (p *ConnectionPool) Replace(space interface{}, tuple interface{},
421
        userMode ...Mode) (*tarantool.Response, error) {
4✔
422
        conn, err := p.getConnByMode(RW, userMode)
4✔
423
        if err != nil {
4✔
424
                return nil, err
×
425
        }
×
426

427
        return conn.Replace(space, tuple)
4✔
428
}
429

430
// Delete performs deletion of a tuple by key.
431
// Result will contain array with deleted tuple.
432
//
433
// Deprecated: the method will be removed in the next major version,
434
// use a DeleteRequest object + Do() instead.
435
func (p *ConnectionPool) Delete(space, index interface{}, key interface{},
436
        userMode ...Mode) (*tarantool.Response, error) {
2✔
437
        conn, err := p.getConnByMode(RW, userMode)
2✔
438
        if err != nil {
2✔
439
                return nil, err
×
440
        }
×
441

442
        return conn.Delete(space, index, key)
2✔
443
}
444

445
// Update performs update of a tuple by key.
446
// Result will contain array with updated tuple.
447
//
448
// Deprecated: the method will be removed in the next major version,
449
// use a UpdateRequest object + Do() instead.
450
func (p *ConnectionPool) Update(space, index interface{}, key interface{},
451
        ops *tarantool.Operations, userMode ...Mode) (*tarantool.Response, error) {
4✔
452
        conn, err := p.getConnByMode(RW, userMode)
4✔
453
        if err != nil {
4✔
454
                return nil, err
×
455
        }
×
456

457
        return conn.Update(space, index, key, ops)
4✔
458
}
459

460
// Upsert performs "update or insert" action of a tuple by key.
461
// Result will not contain any tuple.
462
//
463
// Deprecated: the method will be removed in the next major version,
464
// use a UpsertRequest object + Do() instead.
465
func (p *ConnectionPool) Upsert(space interface{}, tuple interface{},
466
        ops *tarantool.Operations, userMode ...Mode) (*tarantool.Response, error) {
4✔
467
        conn, err := p.getConnByMode(RW, userMode)
4✔
468
        if err != nil {
4✔
469
                return nil, err
×
470
        }
×
471

472
        return conn.Upsert(space, tuple, ops)
4✔
473
}
474

475
// Call calls registered Tarantool function.
476
// It uses request code for Tarantool >= 1.7, result is an array.
477
//
478
// Deprecated: the method will be removed in the next major version,
479
// use a CallRequest object + Do() instead.
480
func (p *ConnectionPool) Call(functionName string, args interface{},
481
        userMode Mode) (*tarantool.Response, error) {
8✔
482
        conn, err := p.getNextConnection(userMode)
8✔
483
        if err != nil {
8✔
484
                return nil, err
×
485
        }
×
486

487
        return conn.Call(functionName, args)
8✔
488
}
489

490
// Call16 calls registered Tarantool function.
491
// It uses request code for Tarantool 1.6, result is an array of arrays.
492
// Deprecated since Tarantool 1.7.2.
493
//
494
// Deprecated: the method will be removed in the next major version,
495
// use a Call16Request object + Do() instead.
496
func (p *ConnectionPool) Call16(functionName string, args interface{},
497
        userMode Mode) (*tarantool.Response, error) {
8✔
498
        conn, err := p.getNextConnection(userMode)
8✔
499
        if err != nil {
8✔
500
                return nil, err
×
501
        }
×
502

503
        return conn.Call16(functionName, args)
8✔
504
}
505

506
// Call17 calls registered Tarantool function.
507
// It uses request code for Tarantool >= 1.7, result is an array.
508
//
509
// Deprecated: the method will be removed in the next major version,
510
// use a Call17Request object + Do() instead.
511
func (p *ConnectionPool) Call17(functionName string, args interface{},
512
        userMode Mode) (*tarantool.Response, error) {
14✔
513
        conn, err := p.getNextConnection(userMode)
14✔
514
        if err != nil {
14✔
515
                return nil, err
×
516
        }
×
517

518
        return conn.Call17(functionName, args)
14✔
519
}
520

521
// Eval passes lua expression for evaluation.
522
//
523
// Deprecated: the method will be removed in the next major version,
524
// use an EvalRequest object + Do() instead.
525
func (p *ConnectionPool) Eval(expr string, args interface{},
526
        userMode Mode) (*tarantool.Response, error) {
12✔
527
        conn, err := p.getNextConnection(userMode)
12✔
528
        if err != nil {
16✔
529
                return nil, err
4✔
530
        }
4✔
531

532
        return conn.Eval(expr, args)
8✔
533
}
534

535
// Execute passes sql expression to Tarantool for execution.
536
//
537
// Deprecated: the method will be removed in the next major version,
538
// use an ExecuteRequest object + Do() instead.
539
func (p *ConnectionPool) Execute(expr string, args interface{},
540
        userMode Mode) (*tarantool.Response, error) {
2✔
541
        conn, err := p.getNextConnection(userMode)
2✔
542
        if err != nil {
2✔
543
                return nil, err
×
544
        }
×
545

546
        return conn.Execute(expr, args)
2✔
547
}
548

549
// GetTyped performs select (with limit = 1 and offset = 0)
550
// to box space and fills typed result.
551
//
552
// Deprecated: the method will be removed in the next major version,
553
// use a SelectRequest object + Do() instead.
554
func (p *ConnectionPool) GetTyped(space, index interface{}, key interface{}, result interface{},
555
        userMode ...Mode) error {
×
556
        conn, err := p.getConnByMode(ANY, userMode)
×
557
        if err != nil {
×
558
                return err
×
559
        }
×
560

561
        return conn.GetTyped(space, index, key, result)
×
562
}
563

564
// SelectTyped performs select to box space and fills typed result.
565
//
566
// Deprecated: the method will be removed in the next major version,
567
// use a SelectRequest object + Do() instead.
568
func (p *ConnectionPool) SelectTyped(space, index interface{},
569
        offset, limit uint32,
570
        iterator tarantool.Iter, key interface{}, result interface{}, userMode ...Mode) error {
×
571
        conn, err := p.getConnByMode(ANY, userMode)
×
572
        if err != nil {
×
573
                return err
×
574
        }
×
575

576
        return conn.SelectTyped(space, index, offset, limit, iterator, key, result)
×
577
}
578

579
// InsertTyped performs insertion to box space.
580
// Tarantool will reject Insert when tuple with same primary key exists.
581
//
582
// Deprecated: the method will be removed in the next major version,
583
// use an InsertRequest object + Do() instead.
584
func (p *ConnectionPool) InsertTyped(space interface{}, tuple interface{}, result interface{},
585
        userMode ...Mode) error {
×
586
        conn, err := p.getConnByMode(RW, userMode)
×
587
        if err != nil {
×
588
                return err
×
589
        }
×
590

591
        return conn.InsertTyped(space, tuple, result)
×
592
}
593

594
// ReplaceTyped performs "insert or replace" action to box space.
595
// If tuple with same primary key exists, it will be replaced.
596
//
597
// Deprecated: the method will be removed in the next major version,
598
// use a ReplaceRequest object + Do() instead.
599
func (p *ConnectionPool) ReplaceTyped(space interface{}, tuple interface{}, result interface{},
600
        userMode ...Mode) error {
×
601
        conn, err := p.getConnByMode(RW, userMode)
×
602
        if err != nil {
×
603
                return err
×
604
        }
×
605

606
        return conn.ReplaceTyped(space, tuple, result)
×
607
}
608

609
// DeleteTyped performs deletion of a tuple by key and fills result with deleted tuple.
610
//
611
// Deprecated: the method will be removed in the next major version,
612
// use a DeleteRequest object + Do() instead.
613
func (p *ConnectionPool) DeleteTyped(space, index interface{}, key interface{}, result interface{},
614
        userMode ...Mode) error {
×
615
        conn, err := p.getConnByMode(RW, userMode)
×
616
        if err != nil {
×
617
                return err
×
618
        }
×
619

620
        return conn.DeleteTyped(space, index, key, result)
×
621
}
622

623
// UpdateTyped performs update of a tuple by key and fills result with updated tuple.
624
//
625
// Deprecated: the method will be removed in the next major version,
626
// use a UpdateRequest object + Do() instead.
627
func (p *ConnectionPool) UpdateTyped(space, index interface{}, key interface{},
628
        ops *tarantool.Operations, result interface{}, userMode ...Mode) error {
×
629
        conn, err := p.getConnByMode(RW, userMode)
×
630
        if err != nil {
×
631
                return err
×
632
        }
×
633

634
        return conn.UpdateTyped(space, index, key, ops, result)
×
635
}
636

637
// CallTyped calls registered function.
638
// It uses request code for Tarantool >= 1.7, result is an array.
639
//
640
// Deprecated: the method will be removed in the next major version,
641
// use a CallRequest object + Do() instead.
642
func (p *ConnectionPool) CallTyped(functionName string, args interface{}, result interface{},
643
        userMode Mode) error {
×
644
        conn, err := p.getNextConnection(userMode)
×
645
        if err != nil {
×
646
                return err
×
647
        }
×
648

649
        return conn.CallTyped(functionName, args, result)
×
650
}
651

652
// Call16Typed calls registered function.
653
// It uses request code for Tarantool 1.6, result is an array of arrays.
654
// Deprecated since Tarantool 1.7.2.
655
//
656
// Deprecated: the method will be removed in the next major version,
657
// use a Call16Request object + Do() instead.
658
func (p *ConnectionPool) Call16Typed(functionName string, args interface{}, result interface{},
659
        userMode Mode) error {
×
660
        conn, err := p.getNextConnection(userMode)
×
661
        if err != nil {
×
662
                return err
×
663
        }
×
664

665
        return conn.Call16Typed(functionName, args, result)
×
666
}
667

668
// Call17Typed calls registered function.
669
// It uses request code for Tarantool >= 1.7, result is an array.
670
//
671
// Deprecated: the method will be removed in the next major version,
672
// use a Call17Request object + Do() instead.
673
func (p *ConnectionPool) Call17Typed(functionName string, args interface{}, result interface{},
674
        userMode Mode) error {
×
675
        conn, err := p.getNextConnection(userMode)
×
676
        if err != nil {
×
677
                return err
×
678
        }
×
679

680
        return conn.Call17Typed(functionName, args, result)
×
681
}
682

683
// EvalTyped passes lua expression for evaluation.
684
//
685
// Deprecated: the method will be removed in the next major version,
686
// use an EvalRequest object + Do() instead.
687
func (p *ConnectionPool) EvalTyped(expr string, args interface{}, result interface{},
688
        userMode Mode) error {
×
689
        conn, err := p.getNextConnection(userMode)
×
690
        if err != nil {
×
691
                return err
×
692
        }
×
693

694
        return conn.EvalTyped(expr, args, result)
×
695
}
696

697
// ExecuteTyped passes sql expression to Tarantool for execution.
698
//
699
// Deprecated: the method will be removed in the next major version,
700
// use an ExecuteRequest object + Do() instead.
701
func (p *ConnectionPool) ExecuteTyped(expr string, args interface{}, result interface{},
702
        userMode Mode) (tarantool.SQLInfo, []tarantool.ColumnMetaData, error) {
2✔
703
        conn, err := p.getNextConnection(userMode)
2✔
704
        if err != nil {
2✔
705
                return tarantool.SQLInfo{}, nil, err
×
706
        }
×
707

708
        return conn.ExecuteTyped(expr, args, result)
2✔
709
}
710

711
// SelectAsync sends select request to Tarantool and returns Future.
712
//
713
// Deprecated: the method will be removed in the next major version,
714
// use a SelectRequest object + Do() instead.
715
func (p *ConnectionPool) SelectAsync(space, index interface{},
716
        offset, limit uint32,
717
        iterator tarantool.Iter, key interface{}, userMode ...Mode) *tarantool.Future {
×
718
        conn, err := p.getConnByMode(ANY, userMode)
×
719
        if err != nil {
×
720
                return newErrorFuture(err)
×
721
        }
×
722

723
        return conn.SelectAsync(space, index, offset, limit, iterator, key)
×
724
}
725

726
// InsertAsync sends insert action to Tarantool and returns Future.
727
// Tarantool will reject Insert when tuple with same primary key exists.
728
//
729
// Deprecated: the method will be removed in the next major version,
730
// use an InsertRequest object + Do() instead.
731
func (p *ConnectionPool) InsertAsync(space interface{}, tuple interface{},
732
        userMode ...Mode) *tarantool.Future {
×
733
        conn, err := p.getConnByMode(RW, userMode)
×
734
        if err != nil {
×
735
                return newErrorFuture(err)
×
736
        }
×
737

738
        return conn.InsertAsync(space, tuple)
×
739
}
740

741
// ReplaceAsync sends "insert or replace" action to Tarantool and returns Future.
742
// If tuple with same primary key exists, it will be replaced.
743
//
744
// Deprecated: the method will be removed in the next major version,
745
// use a ReplaceRequest object + Do() instead.
746
func (p *ConnectionPool) ReplaceAsync(space interface{}, tuple interface{},
747
        userMode ...Mode) *tarantool.Future {
×
748
        conn, err := p.getConnByMode(RW, userMode)
×
749
        if err != nil {
×
750
                return newErrorFuture(err)
×
751
        }
×
752

753
        return conn.ReplaceAsync(space, tuple)
×
754
}
755

756
// DeleteAsync sends deletion action to Tarantool and returns Future.
757
// Future's result will contain array with deleted tuple.
758
//
759
// Deprecated: the method will be removed in the next major version,
760
// use a DeleteRequest object + Do() instead.
761
func (p *ConnectionPool) DeleteAsync(space, index interface{}, key interface{},
762
        userMode ...Mode) *tarantool.Future {
×
763
        conn, err := p.getConnByMode(RW, userMode)
×
764
        if err != nil {
×
765
                return newErrorFuture(err)
×
766
        }
×
767

768
        return conn.DeleteAsync(space, index, key)
×
769
}
770

771
// UpdateAsync sends deletion of a tuple by key and returns Future.
772
// Future's result will contain array with updated tuple.
773
//
774
// Deprecated: the method will be removed in the next major version,
775
// use a UpdateRequest object + Do() instead.
776
func (p *ConnectionPool) UpdateAsync(space, index interface{}, key interface{},
777
        ops *tarantool.Operations, userMode ...Mode) *tarantool.Future {
×
778
        conn, err := p.getConnByMode(RW, userMode)
×
779
        if err != nil {
×
780
                return newErrorFuture(err)
×
781
        }
×
782

783
        return conn.UpdateAsync(space, index, key, ops)
×
784
}
785

786
// UpsertAsync sends "update or insert" action to Tarantool and returns Future.
787
// Future's sesult will not contain any tuple.
788
//
789
// Deprecated: the method will be removed in the next major version,
790
// use a UpsertRequest object + Do() instead.
791
func (p *ConnectionPool) UpsertAsync(space interface{}, tuple interface{},
792
        ops *tarantool.Operations, userMode ...Mode) *tarantool.Future {
×
793
        conn, err := p.getConnByMode(RW, userMode)
×
794
        if err != nil {
×
795
                return newErrorFuture(err)
×
796
        }
×
797

798
        return conn.UpsertAsync(space, tuple, ops)
×
799
}
800

801
// CallAsync sends a call to registered Tarantool function and returns Future.
802
// It uses request code for Tarantool >= 1.7, future's result is an array.
803
//
804
// Deprecated: the method will be removed in the next major version,
805
// use a CallRequest object + Do() instead.
806
func (p *ConnectionPool) CallAsync(functionName string, args interface{},
807
        userMode Mode) *tarantool.Future {
×
808
        conn, err := p.getNextConnection(userMode)
×
809
        if err != nil {
×
810
                return newErrorFuture(err)
×
811
        }
×
812

813
        return conn.CallAsync(functionName, args)
×
814
}
815

816
// Call16Async sends a call to registered Tarantool function and returns Future.
817
// It uses request code for Tarantool 1.6, so future's result is an array of arrays.
818
// Deprecated since Tarantool 1.7.2.
819
//
820
// Deprecated: the method will be removed in the next major version,
821
// use a Call16Request object + Do() instead.
822
func (p *ConnectionPool) Call16Async(functionName string, args interface{},
823
        userMode Mode) *tarantool.Future {
×
824
        conn, err := p.getNextConnection(userMode)
×
825
        if err != nil {
×
826
                return newErrorFuture(err)
×
827
        }
×
828

829
        return conn.Call16Async(functionName, args)
×
830
}
831

832
// Call17Async sends a call to registered Tarantool function and returns Future.
833
// It uses request code for Tarantool >= 1.7, future's result is an array.
834
//
835
// Deprecated: the method will be removed in the next major version,
836
// use a Call17Request object + Do() instead.
837
func (p *ConnectionPool) Call17Async(functionName string, args interface{},
838
        userMode Mode) *tarantool.Future {
×
839
        conn, err := p.getNextConnection(userMode)
×
840
        if err != nil {
×
841
                return newErrorFuture(err)
×
842
        }
×
843

844
        return conn.Call17Async(functionName, args)
×
845
}
846

847
// EvalAsync sends a lua expression for evaluation and returns Future.
848
//
849
// Deprecated: the method will be removed in the next major version,
850
// use an EvalRequest object + Do() instead.
851
func (p *ConnectionPool) EvalAsync(expr string, args interface{},
852
        userMode Mode) *tarantool.Future {
×
853
        conn, err := p.getNextConnection(userMode)
×
854
        if err != nil {
×
855
                return newErrorFuture(err)
×
856
        }
×
857

858
        return conn.EvalAsync(expr, args)
×
859
}
860

861
// ExecuteAsync sends sql expression to Tarantool for execution and returns
862
// Future.
863
//
864
// Deprecated: the method will be removed in the next major version,
865
// use an ExecuteRequest object + Do() instead.
866
func (p *ConnectionPool) ExecuteAsync(expr string, args interface{},
867
        userMode Mode) *tarantool.Future {
2✔
868
        conn, err := p.getNextConnection(userMode)
2✔
869
        if err != nil {
2✔
870
                return newErrorFuture(err)
×
871
        }
×
872

873
        return conn.ExecuteAsync(expr, args)
2✔
874
}
875

876
// NewStream creates new Stream object for connection selected
877
// by userMode from pool.
878
//
879
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
880
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
881
// Since 1.7.0
882
func (p *ConnectionPool) NewStream(userMode Mode) (*tarantool.Stream, error) {
6✔
883
        conn, err := p.getNextConnection(userMode)
6✔
884
        if err != nil {
6✔
885
                return nil, err
×
886
        }
×
887
        return conn.NewStream()
6✔
888
}
889

890
// NewPrepared passes a sql statement to Tarantool for preparation synchronously.
891
func (p *ConnectionPool) NewPrepared(expr string, userMode Mode) (*tarantool.Prepared, error) {
2✔
892
        conn, err := p.getNextConnection(userMode)
2✔
893
        if err != nil {
2✔
894
                return nil, err
×
895
        }
×
896
        return conn.NewPrepared(expr)
2✔
897
}
898

899
// NewWatcher creates a new Watcher object for the connection pool.
900
// A watcher could be created only for instances with the support.
901
//
902
// The behavior is same as if Connection.NewWatcher() called for each
903
// connection with a suitable mode.
904
//
905
// Keep in mind that garbage collection of a watcher handle doesn’t lead to the
906
// watcher’s destruction. In this case, the watcher remains registered. You
907
// need to call Unregister() directly.
908
//
909
// Unregister() guarantees that there will be no the watcher's callback calls
910
// after it, but Unregister() call from the callback leads to a deadlock.
911
//
912
// See:
913
// https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_events/#box-watchers
914
//
915
// Since 1.10.0
916
func (p *ConnectionPool) NewWatcher(key string,
917
        callback tarantool.WatchCallback, mode Mode) (tarantool.Watcher, error) {
2,016✔
918

2,016✔
919
        watcher := &poolWatcher{
2,016✔
920
                container:    &p.watcherContainer,
2,016✔
921
                mode:         mode,
2,016✔
922
                key:          key,
2,016✔
923
                callback:     callback,
2,016✔
924
                watchers:     make(map[*tarantool.Connection]tarantool.Watcher),
2,016✔
925
                unregistered: false,
2,016✔
926
        }
2,016✔
927

2,016✔
928
        watcher.container.add(watcher)
2,016✔
929

2,016✔
930
        rr := p.anyPool
2,016✔
931
        if mode == RW {
2,022✔
932
                rr = p.rwPool
6✔
933
        } else if mode == RO {
2,018✔
934
                rr = p.roPool
2✔
935
        }
2✔
936

937
        conns := rr.GetConnections()
2,016✔
938
        for _, conn := range conns {
12,074✔
939
                // Check that connection supports watchers.
10,058✔
940
                if !isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS, conn.ProtocolInfo().Features) {
10,058✔
NEW
941
                        continue
×
942
                }
943
                if err := watcher.watch(conn); err != nil {
10,058✔
944
                        conn.Close()
×
945
                }
×
946
        }
947

948
        return watcher, nil
2,016✔
949
}
950

951
// Do sends the request and returns a future.
952
// For requests that belong to the only one connection (e.g. Unprepare or ExecutePrepared)
953
// the argument of type Mode is unused.
954
func (p *ConnectionPool) Do(req tarantool.Request, userMode Mode) *tarantool.Future {
496✔
955
        if connectedReq, ok := req.(tarantool.ConnectedRequest); ok {
506✔
956
                conns := p.anyPool.GetConnections()
10✔
957
                isOurConnection := false
10✔
958
                for _, conn := range conns {
44✔
959
                        // Compare raw pointers.
34✔
960
                        if conn == connectedReq.Conn() {
42✔
961
                                isOurConnection = true
8✔
962
                                break
8✔
963
                        }
964
                }
965
                if !isOurConnection {
12✔
966
                        return newErrorFuture(ErrUnknownRequest)
2✔
967
                }
2✔
968
                return connectedReq.Conn().Do(req)
8✔
969
        }
970
        conn, err := p.getNextConnection(userMode)
486✔
971
        if err != nil {
488✔
972
                return newErrorFuture(err)
2✔
973
        }
2✔
974

975
        return conn.Do(req)
484✔
976
}
977

978
//
979
// private
980
//
981

982
func (p *ConnectionPool) getConnectionRole(conn *tarantool.Connection) (Role, error) {
473✔
983
        resp, err := conn.Do(tarantool.NewCallRequest("box.info")).Get()
473✔
984
        if err != nil {
475✔
985
                return UnknownRole, err
2✔
986
        }
2✔
987
        if resp == nil {
471✔
988
                return UnknownRole, ErrIncorrectResponse
×
989
        }
×
990
        if len(resp.Data) < 1 {
471✔
991
                return UnknownRole, ErrIncorrectResponse
×
992
        }
×
993

994
        instanceStatus, ok := resp.Data[0].(map[interface{}]interface{})["status"]
471✔
995
        if !ok {
471✔
996
                return UnknownRole, ErrIncorrectResponse
×
997
        }
×
998
        if instanceStatus != "running" {
471✔
999
                return UnknownRole, ErrIncorrectStatus
×
1000
        }
×
1001

1002
        replicaRole, ok := resp.Data[0].(map[interface{}]interface{})["ro"]
471✔
1003
        if !ok {
471✔
1004
                return UnknownRole, ErrIncorrectResponse
×
1005
        }
×
1006

1007
        switch replicaRole {
471✔
1008
        case false:
239✔
1009
                return MasterRole, nil
239✔
1010
        case true:
232✔
1011
                return ReplicaRole, nil
232✔
1012
        }
1013

1014
        return UnknownRole, nil
×
1015
}
1016

1017
func (p *ConnectionPool) getConnectionFromPool(id string) (*tarantool.Connection, Role) {
100✔
1018
        if conn := p.rwPool.GetConnById(id); conn != nil {
182✔
1019
                return conn, MasterRole
82✔
1020
        }
82✔
1021

1022
        if conn := p.roPool.GetConnById(id); conn != nil {
18✔
UNCOV
1023
                return conn, ReplicaRole
×
UNCOV
1024
        }
×
1025

1026
        return p.anyPool.GetConnById(id), UnknownRole
18✔
1027
}
1028

1029
func (p *ConnectionPool) deleteConnection(id string) {
438✔
1030
        if conn := p.anyPool.DeleteConnById(id); conn != nil {
874✔
1031
                if conn := p.rwPool.DeleteConnById(id); conn == nil {
644✔
1032
                        p.roPool.DeleteConnById(id)
208✔
1033
                }
208✔
1034
                // The internal connection deinitialization.
1035
                p.watcherContainer.mutex.RLock()
436✔
1036
                defer p.watcherContainer.mutex.RUnlock()
436✔
1037

436✔
1038
                p.watcherContainer.foreach(func(watcher *poolWatcher) error {
445✔
1039
                        watcher.unwatch(conn)
9✔
1040
                        return nil
9✔
1041
                })
9✔
1042
        }
1043
}
1044

1045
func (p *ConnectionPool) addConnection(id string,
1046
        conn *tarantool.Connection, role Role) error {
436✔
1047
        // The internal connection initialization.
436✔
1048
        p.watcherContainer.mutex.RLock()
436✔
1049
        defer p.watcherContainer.mutex.RUnlock()
436✔
1050

436✔
1051
        if isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS, conn.ProtocolInfo().Features) {
872✔
1052
                watched := []*poolWatcher{}
436✔
1053
                err := p.watcherContainer.foreach(func(watcher *poolWatcher) error {
445✔
1054
                        watch := false
9✔
1055
                        switch watcher.mode {
9✔
1056
                        case RW:
9✔
1057
                                watch = role == MasterRole
9✔
NEW
1058
                        case RO:
×
NEW
1059
                                watch = role == ReplicaRole
×
NEW
1060
                        default:
×
NEW
1061
                                watch = true
×
1062
                        }
1063
                        if watch {
15✔
1064
                                if err := watcher.watch(conn); err != nil {
6✔
NEW
1065
                                        return err
×
NEW
1066
                                }
×
1067
                                watched = append(watched, watcher)
6✔
1068
                        }
1069
                        return nil
9✔
1070
                })
1071
                if err != nil {
436✔
NEW
1072
                        for _, watcher := range watched {
×
NEW
1073
                                watcher.unwatch(conn)
×
NEW
1074
                        }
×
NEW
1075
                        log.Printf("tarantool: failed initialize watchers for %s: %s", id, err)
×
NEW
1076
                        return err
×
1077
                }
1078
        }
1079

1080
        p.anyPool.AddConn(id, conn)
436✔
1081

436✔
1082
        switch role {
436✔
1083
        case MasterRole:
228✔
1084
                p.rwPool.AddConn(id, conn)
228✔
1085
        case ReplicaRole:
208✔
1086
                p.roPool.AddConn(id, conn)
208✔
1087
        }
1088
        return nil
436✔
1089
}
1090

1091
func (p *ConnectionPool) handlerDiscovered(id string, conn *tarantool.Connection,
1092
        role Role) bool {
447✔
1093
        var err error
447✔
1094
        if p.opts.ConnectionHandler != nil {
466✔
1095
                err = p.opts.ConnectionHandler.Discovered(id, conn, role)
19✔
1096
        }
19✔
1097

1098
        if err != nil {
456✔
1099
                log.Printf("tarantool: storing connection to %s canceled: %s\n", id, err)
9✔
1100
                return false
9✔
1101
        }
9✔
1102
        return true
438✔
1103
}
1104

1105
func (p *ConnectionPool) handlerDeactivated(id string, conn *tarantool.Connection,
1106
        role Role) {
406✔
1107
        var err error
406✔
1108
        if p.opts.ConnectionHandler != nil {
416✔
1109
                err = p.opts.ConnectionHandler.Deactivated(id, conn, role)
10✔
1110
        }
10✔
1111

1112
        if err != nil {
406✔
NEW
1113
                log.Printf("tarantool: deactivating connection to %s by user failed: %s\n", id, err)
×
1114
        }
×
1115
}
1116

1117
func (p *ConnectionPool) deactivateConnection(id string, conn *tarantool.Connection, role Role) {
2✔
1118
        p.deleteConnection(id)
2✔
1119
        conn.Close()
2✔
1120
        p.handlerDeactivated(id, conn, role)
2✔
1121
}
2✔
1122

1123
func (p *ConnectionPool) deactivateConnections() {
4✔
1124
        for id, endpoint := range p.ends {
10✔
1125
                if endpoint != nil && endpoint.conn != nil {
8✔
1126
                        p.deactivateConnection(id, endpoint.conn, endpoint.role)
2✔
1127
                }
2✔
1128
        }
1129
}
1130

1131
func (p *ConnectionPool) processConnection(conn *tarantool.Connection,
1132
        id string, end *endpoint) bool {
404✔
1133
        role, err := p.getConnectionRole(conn)
404✔
1134
        if err != nil {
404✔
1135
                conn.Close()
×
NEW
1136
                log.Printf("tarantool: storing connection to %s failed: %s\n", id, err)
×
1137
                return false
×
1138
        }
×
1139

1140
        if !p.handlerDiscovered(id, conn, role) {
408✔
1141
                conn.Close()
4✔
1142
                return false
4✔
1143
        }
4✔
1144
        if p.addConnection(id, conn, role) != nil {
400✔
1145
                conn.Close()
×
NEW
1146
                p.handlerDeactivated(id, conn, role)
×
1147
                return false
×
1148
        }
×
1149

1150
        end.conn = conn
400✔
1151
        end.role = role
400✔
1152
        return true
400✔
1153
}
1154

1155
func (p *ConnectionPool) fillPools(
1156
        ctx context.Context,
1157
        dialers map[string]tarantool.Dialer) (bool, bool) {
114✔
1158
        somebodyAlive := false
114✔
1159
        ctxCanceled := false
114✔
1160

114✔
1161
        // It is called before controller() goroutines, so we don't expect
114✔
1162
        // concurrency issues here.
114✔
1163
        for id, dialer := range dialers {
528✔
1164
                end := newEndpoint(id, dialer)
414✔
1165
                p.ends[id] = end
414✔
1166
                connOpts := p.connOpts
414✔
1167
                connOpts.Notify = end.notify
414✔
1168
                conn, err := tarantool.Connect(ctx, dialer, connOpts)
414✔
1169
                if err != nil {
424✔
1170
                        log.Printf("tarantool: connect to %s failed: %s\n", id, err.Error())
10✔
1171
                        select {
10✔
1172
                        case <-ctx.Done():
4✔
1173
                                ctxCanceled = true
4✔
1174

4✔
1175
                                p.ends[id] = nil
4✔
1176
                                log.Printf("tarantool: operation was canceled")
4✔
1177

4✔
1178
                                p.deactivateConnections()
4✔
1179

4✔
1180
                                return false, ctxCanceled
4✔
1181
                        default:
6✔
1182
                        }
1183
                } else if p.processConnection(conn, id, end) {
804✔
1184
                        somebodyAlive = true
400✔
1185
                }
400✔
1186
        }
1187

1188
        return somebodyAlive, ctxCanceled
110✔
1189
}
1190

1191
func (p *ConnectionPool) updateConnection(e *endpoint) {
57✔
1192
        p.poolsMutex.Lock()
57✔
1193

57✔
1194
        if p.state.get() != connectedState {
62✔
1195
                p.poolsMutex.Unlock()
5✔
1196
                return
5✔
1197
        }
5✔
1198

1199
        if role, err := p.getConnectionRole(e.conn); err == nil {
102✔
1200
                if e.role != role {
76✔
1201
                        p.deleteConnection(e.id)
26✔
1202
                        p.poolsMutex.Unlock()
26✔
1203

26✔
1204
                        p.handlerDeactivated(e.id, e.conn, e.role)
26✔
1205
                        opened := p.handlerDiscovered(e.id, e.conn, role)
26✔
1206
                        if !opened {
30✔
1207
                                e.conn.Close()
4✔
1208
                                e.conn = nil
4✔
1209
                                e.role = UnknownRole
4✔
1210
                                return
4✔
1211
                        }
4✔
1212

1213
                        p.poolsMutex.Lock()
22✔
1214
                        if p.state.get() != connectedState {
24✔
1215
                                p.poolsMutex.Unlock()
2✔
1216

2✔
1217
                                e.conn.Close()
2✔
1218
                                p.handlerDeactivated(e.id, e.conn, role)
2✔
1219
                                e.conn = nil
2✔
1220
                                e.role = UnknownRole
2✔
1221
                                return
2✔
1222
                        }
2✔
1223

1224
                        if p.addConnection(e.id, e.conn, role) != nil {
20✔
1225
                                p.poolsMutex.Unlock()
×
1226

×
1227
                                e.conn.Close()
×
NEW
1228
                                p.handlerDeactivated(e.id, e.conn, role)
×
1229
                                e.conn = nil
×
1230
                                e.role = UnknownRole
×
1231
                                return
×
1232
                        }
×
1233
                        e.role = role
20✔
1234
                }
1235
                p.poolsMutex.Unlock()
44✔
1236
                return
44✔
1237
        } else {
2✔
1238
                p.deleteConnection(e.id)
2✔
1239
                p.poolsMutex.Unlock()
2✔
1240

2✔
1241
                e.conn.Close()
2✔
1242
                p.handlerDeactivated(e.id, e.conn, e.role)
2✔
1243
                e.conn = nil
2✔
1244
                e.role = UnknownRole
2✔
1245
                return
2✔
1246
        }
2✔
1247
}
1248

1249
func (p *ConnectionPool) tryConnect(ctx context.Context, e *endpoint) error {
21✔
1250
        p.poolsMutex.Lock()
21✔
1251

21✔
1252
        if p.state.get() != connectedState {
22✔
1253
                p.poolsMutex.Unlock()
1✔
1254
                return ErrClosed
1✔
1255
        }
1✔
1256

1257
        e.conn = nil
20✔
1258
        e.role = UnknownRole
20✔
1259

20✔
1260
        connOpts := p.connOpts
20✔
1261
        connOpts.Notify = e.notify
20✔
1262
        conn, err := tarantool.Connect(ctx, e.dialer, connOpts)
20✔
1263
        if err == nil {
37✔
1264
                role, err := p.getConnectionRole(conn)
17✔
1265
                p.poolsMutex.Unlock()
17✔
1266

17✔
1267
                if err != nil {
17✔
1268
                        conn.Close()
×
NEW
1269
                        log.Printf("tarantool: storing connection to %s failed: %s\n",
×
NEW
1270
                                e.id, err)
×
1271
                        return err
×
1272
                }
×
1273

1274
                opened := p.handlerDiscovered(e.id, conn, role)
17✔
1275
                if !opened {
18✔
1276
                        conn.Close()
1✔
1277
                        return errors.New("storing connection canceled")
1✔
1278
                }
1✔
1279

1280
                p.poolsMutex.Lock()
16✔
1281
                if p.state.get() != connectedState {
16✔
1282
                        p.poolsMutex.Unlock()
×
1283
                        conn.Close()
×
NEW
1284
                        p.handlerDeactivated(e.id, conn, role)
×
1285
                        return ErrClosed
×
1286
                }
×
1287

1288
                if err = p.addConnection(e.id, conn, role); err != nil {
16✔
1289
                        p.poolsMutex.Unlock()
×
1290
                        conn.Close()
×
NEW
1291
                        p.handlerDeactivated(e.id, conn, role)
×
1292
                        return err
×
1293
                }
×
1294
                e.conn = conn
16✔
1295
                e.role = role
16✔
1296
        }
1297

1298
        p.poolsMutex.Unlock()
19✔
1299
        return err
19✔
1300
}
1301

1302
func (p *ConnectionPool) reconnect(ctx context.Context, e *endpoint) {
×
1303
        p.poolsMutex.Lock()
×
1304

×
1305
        if p.state.get() != connectedState {
×
1306
                p.poolsMutex.Unlock()
×
1307
                return
×
1308
        }
×
1309

NEW
1310
        p.deleteConnection(e.id)
×
1311
        p.poolsMutex.Unlock()
×
1312

×
NEW
1313
        p.handlerDeactivated(e.id, e.conn, e.role)
×
1314
        e.conn = nil
×
1315
        e.role = UnknownRole
×
1316

×
1317
        p.tryConnect(ctx, e)
×
1318
}
1319

1320
func (p *ConnectionPool) controller(ctx context.Context, e *endpoint) {
408✔
1321
        timer := time.NewTicker(p.opts.CheckTimeout)
408✔
1322
        defer timer.Stop()
408✔
1323

408✔
1324
        shutdown := false
408✔
1325
        for {
2,006✔
1326
                if shutdown {
1,632✔
1327
                        // Graceful shutdown in progress. We need to wait for a finish or
34✔
1328
                        // to force close.
34✔
1329
                        select {
34✔
1330
                        case <-e.closed:
31✔
1331
                        case <-e.close:
3✔
1332
                        }
1333
                }
1334

1335
                select {
1,598✔
1336
                case <-e.closed:
408✔
1337
                        return
408✔
1338
                default:
1,190✔
1339
                }
1340

1341
                select {
1,190✔
1342
                // e.close has priority to avoid concurrency with e.shutdown.
1343
                case <-e.close:
378✔
1344
                        if e.conn != nil {
744✔
1345
                                p.poolsMutex.Lock()
366✔
1346
                                p.deleteConnection(e.id)
366✔
1347
                                p.poolsMutex.Unlock()
366✔
1348

366✔
1349
                                if !shutdown {
730✔
1350
                                        e.closeErr = e.conn.Close()
364✔
1351
                                        p.handlerDeactivated(e.id, e.conn, e.role)
364✔
1352
                                        close(e.closed)
364✔
1353
                                } else {
366✔
1354
                                        // Force close the connection.
2✔
1355
                                        e.conn.Close()
2✔
1356
                                        // And wait for a finish.
2✔
1357
                                        <-e.closed
2✔
1358
                                }
2✔
1359
                        } else {
12✔
1360
                                close(e.closed)
12✔
1361
                        }
12✔
1362
                default:
812✔
1363
                        select {
812✔
1364
                        case <-e.shutdown:
32✔
1365
                                shutdown = true
32✔
1366
                                if e.conn != nil {
64✔
1367
                                        p.poolsMutex.Lock()
32✔
1368
                                        p.deleteConnection(e.id)
32✔
1369
                                        p.poolsMutex.Unlock()
32✔
1370

32✔
1371
                                        // We need to catch s.close in the current goroutine, so
32✔
1372
                                        // we need to start an another one for the shutdown.
32✔
1373
                                        go func() {
64✔
1374
                                                e.closeErr = e.conn.CloseGraceful()
32✔
1375
                                                close(e.closed)
32✔
1376
                                        }()
32✔
1377
                                } else {
×
1378
                                        close(e.closed)
×
1379
                                }
×
1380
                        default:
780✔
1381
                                select {
780✔
1382
                                case <-e.close:
327✔
1383
                                        // Will be processed at an upper level.
1384
                                case <-e.shutdown:
9✔
1385
                                        // Will be processed at an upper level.
1386
                                case <-e.notify:
376✔
1387
                                        if e.conn != nil && e.conn.ClosedNow() {
386✔
1388
                                                p.poolsMutex.Lock()
10✔
1389
                                                if p.state.get() == connectedState {
20✔
1390
                                                        p.deleteConnection(e.id)
10✔
1391
                                                        p.poolsMutex.Unlock()
10✔
1392
                                                        p.handlerDeactivated(e.id, e.conn, e.role)
10✔
1393
                                                        e.conn = nil
10✔
1394
                                                        e.role = UnknownRole
10✔
1395
                                                } else {
10✔
1396
                                                        p.poolsMutex.Unlock()
×
1397
                                                }
×
1398
                                        }
1399
                                case <-timer.C:
68✔
1400
                                        // Reopen connection.
68✔
1401
                                        // Relocate connection between subpools
68✔
1402
                                        // if ro/rw was updated.
68✔
1403
                                        if e.conn == nil {
79✔
1404
                                                p.tryConnect(ctx, e)
11✔
1405
                                        } else if !e.conn.ClosedNow() {
125✔
1406
                                                p.updateConnection(e)
57✔
1407
                                        } else {
57✔
1408
                                                p.reconnect(ctx, e)
×
1409
                                        }
×
1410
                                }
1411
                        }
1412
                }
1413
        }
1414
}
1415

1416
func (p *ConnectionPool) getNextConnection(mode Mode) (*tarantool.Connection, error) {
582✔
1417

582✔
1418
        switch mode {
582✔
1419
        case ANY:
272✔
1420
                if next := p.anyPool.GetNextConnection(); next != nil {
540✔
1421
                        return next, nil
268✔
1422
                }
268✔
1423
        case RW:
106✔
1424
                if next := p.rwPool.GetNextConnection(); next != nil {
210✔
1425
                        return next, nil
104✔
1426
                }
104✔
1427
                return nil, ErrNoRwInstance
2✔
1428
        case RO:
60✔
1429
                if next := p.roPool.GetNextConnection(); next != nil {
118✔
1430
                        return next, nil
58✔
1431
                }
58✔
1432
                return nil, ErrNoRoInstance
2✔
1433
        case PreferRW:
78✔
1434
                if next := p.rwPool.GetNextConnection(); next != nil {
146✔
1435
                        return next, nil
68✔
1436
                }
68✔
1437
                if next := p.roPool.GetNextConnection(); next != nil {
20✔
1438
                        return next, nil
10✔
1439
                }
10✔
1440
        case PreferRO:
66✔
1441
                if next := p.roPool.GetNextConnection(); next != nil {
122✔
1442
                        return next, nil
56✔
1443
                }
56✔
1444
                if next := p.rwPool.GetNextConnection(); next != nil {
20✔
1445
                        return next, nil
10✔
1446
                }
10✔
1447
        }
1448
        return nil, ErrNoHealthyInstance
4✔
1449
}
1450

1451
func (p *ConnectionPool) getConnByMode(defaultMode Mode,
1452
        userMode []Mode) (*tarantool.Connection, error) {
28✔
1453
        if len(userMode) > 1 {
28✔
1454
                return nil, ErrTooManyArgs
×
1455
        }
×
1456

1457
        mode := defaultMode
28✔
1458
        if len(userMode) > 0 {
42✔
1459
                mode = userMode[0]
14✔
1460
        }
14✔
1461

1462
        return p.getNextConnection(mode)
28✔
1463
}
1464

1465
func newErrorFuture(err error) *tarantool.Future {
4✔
1466
        fut := tarantool.NewFuture()
4✔
1467
        fut.SetError(err)
4✔
1468
        return fut
4✔
1469
}
4✔
1470

1471
func isFeatureInSlice(expected iproto.Feature, actualSlice []iproto.Feature) bool {
10,494✔
1472
        for _, actual := range actualSlice {
52,470✔
1473
                if expected == actual {
52,470✔
1474
                        return true
10,494✔
1475
                }
10,494✔
1476
        }
NEW
1477
        return false
×
1478
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc