• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / go-tarantool / 24194770165

09 Apr 2026 02:09PM UTC coverage: 74.485% (-0.7%) from 75.138%
24194770165

Pull #559

github

laines-it
internal: add ring buffer for `reader` goroutine

Added a SPSC ring buffer for response data transfer.
It makes the `reader` goroutine split efficiently.

Closes #549
Pull Request #559: internal: add ring-buffer to reader goroutine

65 of 91 new or added lines in 2 files covered. (71.43%)

161 existing lines in 4 files now uncovered.

3144 of 4221 relevant lines covered (74.48%)

9816.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.11
/response_queue.go
1
package tarantool
2

3
import (
4
        "sync"
5
        "sync/atomic"
6
)
7

8
const CacheLineSize = 64
9

10
type resp struct {
11
        header Header
12
        buf    smallBuf
13
}
14

15
// responseQueue is a ring-buffer for responses.
16
type responseQueue struct {
17
        buf  []resp
18
        cap  uint64
19
        mask uint64
20

21
        _ [CacheLineSize - 40]byte
22

23
        decodeIdx      uint64
24
        fetchIdxCached uint64
25

26
        _ [CacheLineSize - 16]byte
27

28
        fetchIdx        uint64
29
        decodeIdxCached uint64
30

31
        _ [CacheLineSize - 16]byte
32

33
        mu   sync.Mutex
34
        cond *sync.Cond
35

36
        decodeWaiting int32
37
        fetchWaiting  int32
38
}
39

40
func newResponseQueue(size uint) *responseQueue {
334✔
41
        cap := uint64(1)
334✔
42
        for cap < uint64(size) {
3,674✔
43
                cap <<= 1
3,340✔
44
        }
3,340✔
45

46
        q := &responseQueue{
334✔
47
                buf:  make([]resp, cap),
334✔
48
                cap:  cap,
334✔
49
                mask: cap - 1,
334✔
50
        }
334✔
51
        q.cond = sync.NewCond(&q.mu)
334✔
52

334✔
53
        return q
334✔
54
}
55

56
func (q *responseQueue) push(r resp) {
1,163✔
57
        for {
2,326✔
58
                decodeIdx := atomic.LoadUint64(&q.decodeIdx)
1,163✔
59
                decodeIdxNext := (decodeIdx + 1) & q.mask
1,163✔
60

1,163✔
61
                if decodeIdxNext == q.fetchIdxCached {
1,163✔
NEW
62
                        q.fetchIdxCached = atomic.LoadUint64(&q.fetchIdx)
×
NEW
63
                        if decodeIdxNext == q.fetchIdxCached {
×
NEW
64
                                q.waitForSpace()
×
NEW
65
                                continue
×
66
                        }
67
                }
68

69
                q.buf[decodeIdx] = r
1,163✔
70

1,163✔
71
                atomic.StoreUint64(&q.decodeIdx, decodeIdxNext)
1,163✔
72

1,163✔
73
                if atomic.LoadInt32(&q.fetchWaiting) > 0 {
2,297✔
74
                        q.mu.Lock()
1,134✔
75
                        q.cond.Signal()
1,134✔
76
                        q.mu.Unlock()
1,134✔
77
                }
1,134✔
78

79
                return
1,163✔
80
        }
81
}
82

NEW
83
func (q *responseQueue) waitForSpace() {
×
NEW
84
        atomic.AddInt32(&q.decodeWaiting, 1)
×
NEW
85
        defer atomic.AddInt32(&q.decodeWaiting, -1)
×
NEW
86

×
NEW
87
        q.mu.Lock()
×
NEW
88
        defer q.mu.Unlock()
×
NEW
89

×
NEW
90
        for {
×
NEW
91
                decodeIdx := atomic.LoadUint64(&q.decodeIdx)
×
NEW
92
                fetchIdx := atomic.LoadUint64(&q.fetchIdx)
×
NEW
93

×
NEW
94
                decodeIdxNext := (decodeIdx + 1) & q.mask
×
NEW
95

×
NEW
96
                if decodeIdxNext != fetchIdx {
×
NEW
97
                        q.fetchIdxCached = fetchIdx
×
NEW
98
                        return
×
NEW
99
                }
×
100

NEW
101
                q.cond.Wait()
×
102
        }
103
}
104

105
func (q *responseQueue) pop() resp {
1,493✔
106
        for {
4,110✔
107
                fetchIdx := atomic.LoadUint64(&q.fetchIdx)
2,617✔
108

2,617✔
109
                if fetchIdx == q.decodeIdxCached {
4,100✔
110
                        q.decodeIdxCached = atomic.LoadUint64(&q.decodeIdx)
1,483✔
111
                        if fetchIdx == q.decodeIdxCached {
2,937✔
112
                                q.waitForData()
1,454✔
113
                                continue
1,454✔
114
                        }
115
                }
116

117
                val := q.buf[fetchIdx]
1,163✔
118

1,163✔
119
                fetchIdxNext := (fetchIdx + 1) & q.mask
1,163✔
120
                atomic.StoreUint64(&q.fetchIdx, fetchIdxNext)
1,163✔
121

1,163✔
122
                if atomic.LoadInt32(&q.decodeWaiting) > 0 {
1,163✔
NEW
123
                        q.mu.Lock()
×
NEW
124
                        q.cond.Signal()
×
NEW
125
                        q.mu.Unlock()
×
NEW
126
                }
×
127

128
                return val
1,163✔
129
        }
130
}
131

132
func (q *responseQueue) waitForData() {
1,454✔
133
        atomic.AddInt32(&q.fetchWaiting, 1)
1,454✔
134
        defer atomic.AddInt32(&q.fetchWaiting, -1)
1,454✔
135

1,454✔
136
        q.mu.Lock()
1,454✔
137
        defer q.mu.Unlock()
1,454✔
138

1,454✔
139
        for {
4,032✔
140
                fetchIdx := atomic.LoadUint64(&q.fetchIdx)
2,578✔
141
                decodeIdx := atomic.LoadUint64(&q.decodeIdx)
2,578✔
142

2,578✔
143
                if fetchIdx != decodeIdx {
3,702✔
144
                        q.decodeIdxCached = decodeIdx
1,124✔
145
                        return
1,124✔
146
                }
1,124✔
147

148
                q.cond.Wait()
1,454✔
149
        }
150
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc