• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 12781671724

14 Jan 2025 02:42PM UTC coverage: 85.577% (+0.1%) from 85.448%
12781671724

push

github

web-flow
Fix JS API in-flight metric (#6373)

After a drain this would have been misreporting, as we did not remove
drained entries from the `apiInflight` count.

Signed-off-by: Neil Twigg <neil@nats.io>

68669 of 80242 relevant lines covered (85.58%)

1030156.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.64
/server/ipqueue.go
1
// Copyright 2021-2023 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "errors"
18
        "sync"
19
        "sync/atomic"
20
)
21

22
const ipQueueDefaultMaxRecycleSize = 4 * 1024
23

24
// This is a generic intra-process queue.
25
type ipQueue[T any] struct {
26
        inprogress int64
27
        sync.Mutex
28
        ch   chan struct{}
29
        elts []T
30
        pos  int
31
        pool *sync.Pool
32
        sz   uint64 // Calculated size (only if calc != nil)
33
        name string
34
        m    *sync.Map
35
        ipQueueOpts[T]
36
}
37

38
type ipQueueOpts[T any] struct {
39
        mrs  int              // Max recycle size
40
        calc func(e T) uint64 // Calc function for tracking size
41
        msz  uint64           // Limit by total calculated size
42
        mlen int              // Limit by number of entries
43
}
44

45
type ipQueueOpt[T any] func(*ipQueueOpts[T])
46

47
// This option allows to set the maximum recycle size when attempting
48
// to put back a slice to the pool.
49
func ipqMaxRecycleSize[T any](max int) ipQueueOpt[T] {
2✔
50
        return func(o *ipQueueOpts[T]) {
4✔
51
                o.mrs = max
2✔
52
        }
2✔
53
}
54

55
// This option enables total queue size counting by passing in a function
56
// that evaluates the size of each entry as it is pushed/popped. This option
57
// enables the size() function.
58
func ipqSizeCalculation[T any](calc func(e T) uint64) ipQueueOpt[T] {
5,146✔
59
        return func(o *ipQueueOpts[T]) {
10,293✔
60
                o.calc = calc
5,147✔
61
        }
5,147✔
62
}
63

64
// This option allows setting the maximum queue size. Once the limit is
65
// reached, then push() will stop returning true and no more entries will
66
// be stored until some more are popped. The ipQueue_SizeCalculation must
67
// be provided for this to work.
68
func ipqLimitBySize[T any](max uint64) ipQueueOpt[T] {
5,145✔
69
        return func(o *ipQueueOpts[T]) {
10,290✔
70
                o.msz = max
5,145✔
71
        }
5,145✔
72
}
73

74
// This option allows setting the maximum queue length. Once the limit is
75
// reached, then push() will stop returning true and no more entries will
76
// be stored until some more are popped.
77
func ipqLimitByLen[T any](max int) ipQueueOpt[T] {
5,145✔
78
        return func(o *ipQueueOpts[T]) {
10,290✔
79
                o.mlen = max
5,145✔
80
        }
5,145✔
81
}
82

83
var errIPQLenLimitReached = errors.New("IPQ len limit reached")
84
var errIPQSizeLimitReached = errors.New("IPQ size limit reached")
85

86
func newIPQueue[T any](s *Server, name string, opts ...ipQueueOpt[T]) *ipQueue[T] {
139,852✔
87
        q := &ipQueue[T]{
139,852✔
88
                ch: make(chan struct{}, 1),
139,852✔
89
                pool: &sync.Pool{
139,852✔
90
                        New: func() any {
327,478✔
91
                                // Reason we use pointer to slice instead of slice is explained
187,626✔
92
                                // here: https://staticcheck.io/docs/checks#SA6002
187,626✔
93
                                res := make([]T, 0, 32)
187,626✔
94
                                return &res
187,626✔
95
                        },
187,626✔
96
                },
97
                name: name,
98
                m:    &s.ipQueues,
99
                ipQueueOpts: ipQueueOpts[T]{
100
                        mrs: ipQueueDefaultMaxRecycleSize,
101
                },
102
        }
103
        for _, o := range opts {
155,291✔
104
                o(&q.ipQueueOpts)
15,439✔
105
        }
15,439✔
106
        s.ipQueues.Store(name, q)
139,852✔
107
        return q
139,852✔
108
}
109

110
// Add the element `e` to the queue, notifying the queue channel's `ch` if the
111
// entry is the first to be added, and returns the length of the queue after
112
// this element is added.
113
func (q *ipQueue[T]) push(e T) (int, error) {
39,886,383✔
114
        q.Lock()
39,886,383✔
115
        l := len(q.elts) - q.pos
39,886,383✔
116
        if q.mlen > 0 && l == q.mlen {
39,887,456✔
117
                q.Unlock()
1,073✔
118
                return l, errIPQLenLimitReached
1,073✔
119
        }
1,073✔
120
        if q.calc != nil {
45,559,337✔
121
                sz := q.calc(e)
5,674,027✔
122
                if q.msz > 0 && q.sz+sz > q.msz {
5,674,118✔
123
                        q.Unlock()
91✔
124
                        return l, errIPQSizeLimitReached
91✔
125
                }
91✔
126
                q.sz += sz
5,673,936✔
127
        }
128
        if q.elts == nil {
56,245,015✔
129
                // What comes out of the pool is already of size 0, so no need for [:0].
16,359,796✔
130
                q.elts = *(q.pool.Get().(*[]T))
16,359,796✔
131
        }
16,359,796✔
132
        q.elts = append(q.elts, e)
39,885,219✔
133
        q.Unlock()
39,885,219✔
134
        if l == 0 {
56,286,789✔
135
                select {
16,401,570✔
136
                case q.ch <- struct{}{}:
15,639,341✔
137
                default:
762,229✔
138
                }
139
        }
140
        return l + 1, nil
39,885,219✔
141
}
142

143
// Returns the whole list of elements currently present in the queue,
144
// emptying the queue. This should be called after receiving a notification
145
// from the queue's `ch` notification channel that indicates that there
146
// is something in the queue.
147
// However, in cases where `drain()` may be called from another go
148
// routine, it is possible that a routine is notified that there is
149
// something, but by the time it calls `pop()`, the drain() would have
150
// emptied the queue. So the caller should never assume that pop() will
151
// return a slice of 1 or more, it could return `nil`.
152
func (q *ipQueue[T]) pop() []T {
16,580,320✔
153
        if q == nil {
16,580,321✔
154
                return nil
1✔
155
        }
1✔
156
        q.Lock()
16,580,319✔
157
        if len(q.elts)-q.pos == 0 {
16,819,917✔
158
                q.Unlock()
239,598✔
159
                return nil
239,598✔
160
        }
239,598✔
161
        var elts []T
16,340,721✔
162
        if q.pos == 0 {
32,681,440✔
163
                elts = q.elts
16,340,719✔
164
        } else {
16,340,721✔
165
                elts = q.elts[q.pos:]
2✔
166
        }
2✔
167
        q.elts, q.pos, q.sz = nil, 0, 0
16,340,721✔
168
        atomic.AddInt64(&q.inprogress, int64(len(elts)))
16,340,721✔
169
        q.Unlock()
16,340,721✔
170
        return elts
16,340,721✔
171
}
172

173
// Returns the first element from the queue, if any. See comment above
174
// regarding calling after being notified that there is something and
175
// the use of drain(). In short, the caller should always check the
176
// boolean return value to ensure that the value is genuine and not a
177
// default empty value.
178
func (q *ipQueue[T]) popOne() (T, bool) {
141,212✔
179
        q.Lock()
141,212✔
180
        l := len(q.elts) - q.pos
141,212✔
181
        if l == 0 {
186,792✔
182
                q.Unlock()
45,580✔
183
                var empty T
45,580✔
184
                return empty, false
45,580✔
185
        }
45,580✔
186
        e := q.elts[q.pos]
95,632✔
187
        if l--; l > 0 {
133,993✔
188
                q.pos++
38,361✔
189
                if q.calc != nil {
38,366✔
190
                        q.sz -= q.calc(e)
5✔
191
                }
5✔
192
                // We need to re-signal
193
                select {
38,361✔
194
                case q.ch <- struct{}{}:
15,576✔
195
                default:
22,785✔
196
                }
197
        } else {
57,271✔
198
                // We have just emptied the queue, so we can reuse unless it is too big.
57,271✔
199
                if cap(q.elts) <= q.mrs {
114,539✔
200
                        q.elts = q.elts[:0]
57,268✔
201
                } else {
57,271✔
202
                        q.elts = nil
3✔
203
                }
3✔
204
                q.pos, q.sz = 0, 0
57,271✔
205
        }
206
        q.Unlock()
95,632✔
207
        return e, true
95,632✔
208
}
209

210
// After a pop(), the slice can be recycled for the next push() when
211
// a first element is added to the queue.
212
// This will also decrement the "in progress" count with the length
213
// of the slice.
214
// WARNING: The caller MUST never reuse `elts`.
215
func (q *ipQueue[T]) recycle(elts *[]T) {
16,580,315✔
216
        // If invoked with a nil list, nothing to do.
16,580,315✔
217
        if elts == nil || *elts == nil {
16,819,912✔
218
                return
239,597✔
219
        }
239,597✔
220
        // Update the in progress count.
221
        if len(*elts) > 0 {
32,681,436✔
222
                atomic.AddInt64(&q.inprogress, int64(-(len(*elts))))
16,340,718✔
223
        }
16,340,718✔
224
        // We also don't want to recycle huge slices, so check against the max.
225
        // q.mrs is normally immutable but can be changed, in a safe way, in some tests.
226
        if cap(*elts) > q.mrs {
16,340,830✔
227
                return
112✔
228
        }
112✔
229
        (*elts) = (*elts)[:0]
16,340,606✔
230
        q.pool.Put(elts)
16,340,606✔
231
}
232

233
// Returns the current length of the queue.
234
func (q *ipQueue[T]) len() int {
1,039,167✔
235
        q.Lock()
1,039,167✔
236
        defer q.Unlock()
1,039,167✔
237
        return len(q.elts) - q.pos
1,039,167✔
238
}
1,039,167✔
239

240
// Returns the calculated size of the queue (if ipQueue_SizeCalculation has been
241
// passed in), otherwise returns zero.
242
func (q *ipQueue[T]) size() uint64 {
16✔
243
        q.Lock()
16✔
244
        defer q.Unlock()
16✔
245
        return q.sz
16✔
246
}
16✔
247

248
// Empty the queue and consumes the notification signal if present.
249
// Returns the number of items that were drained from the queue.
250
// Note that this could cause a reader go routine that has been
251
// notified that there is something in the queue (reading from queue's `ch`)
252
// may then get nothing if `drain()` is invoked before the `pop()` or `popOne()`.
253
func (q *ipQueue[T]) drain() int {
90,878✔
254
        if q == nil {
90,878✔
255
                return 0
×
256
        }
×
257
        q.Lock()
90,878✔
258
        olen := len(q.elts) - q.pos
90,878✔
259
        q.elts, q.pos, q.sz = nil, 0, 0
90,878✔
260
        // Consume the signal if it was present to reduce the chance of a reader
90,878✔
261
        // routine to be think that there is something in the queue...
90,878✔
262
        select {
90,878✔
263
        case <-q.ch:
580✔
264
        default:
90,298✔
265
        }
266
        q.Unlock()
90,878✔
267
        return olen
90,878✔
268
}
269

270
// Since the length of the queue goes to 0 after a pop(), it is good to
271
// have an insight on how many elements are yet to be processed after a pop().
272
// For that reason, the queue maintains a count of elements returned through
273
// the pop() API. When the caller will call q.recycle(), this count will
274
// be reduced by the size of the slice returned by pop().
275
func (q *ipQueue[T]) inProgress() int64 {
13✔
276
        return atomic.LoadInt64(&q.inprogress)
13✔
277
}
13✔
278

279
// Remove this queue from the server's map of ipQueues.
280
// All ipQueue operations (such as push/pop/etc..) are still possible.
281
func (q *ipQueue[T]) unregister() {
88,495✔
282
        if q == nil {
89,600✔
283
                return
1,105✔
284
        }
1,105✔
285
        q.m.Delete(q.name)
87,390✔
286
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc