• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 25875079309

14 May 2026 05:31PM UTC coverage: 62.163% (-0.07%) from 62.233%
25875079309

push

github

web-flow
Merge pull request #10799 from ziggie1984/fix-kv-payment-migration

payments/db: remap legacy zero attempt IDs

0 of 240 new or added lines in 2 files covered. (0.0%)

114 existing lines in 23 files now uncovered.

143826 of 231369 relevant lines covered (62.16%)

19059.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.25
/queue/back_pressure.go
1
package queue
2

3
import (
4
        "context"
5
        "errors"
6
        "math/rand"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/lightningnetwork/lnd/fn/v2"
11
)
12

13
// ErrQueueClosed is returned by Enqueue/TryEnqueue when the queue has already
14
// been closed.
15
var ErrQueueClosed = errors.New("queue closed")
16

17
// DropCheckFunc decides whether to drop an item based solely on the current
18
// queue depth. This is the natural return type for length-only strategies such
19
// as RandomEarlyDrop.
20
type DropCheckFunc func(queueLen int) bool
21

22
// DropPredicate decides whether to drop an item based on the current queue
23
// depth and the item itself. It returns true to drop, false to enqueue. Use
24
// this when the drop decision depends on the item itself; for length-only
25
// checks prefer DropCheckFunc.
26
type DropPredicate[T any] func(queueLen int, item T) bool
27

28
// AsDropPredicate adapts a length-only DropCheckFunc into a DropPredicate[T],
29
// ignoring the item.
30
func AsDropPredicate[T any](f DropCheckFunc) DropPredicate[T] {
110✔
31
        return func(queueLen int, _ T) bool {
1,530✔
32
                return f(queueLen)
1,420✔
33
        }
1,420✔
34
}
35

36
// ErrItemDropped is returned by Enqueue when the item is dropped by the
37
// DropPredicate. This can happen before the queue is actually full (e.g. with
38
// RED-style early drops).
39
var ErrItemDropped = errors.New("item dropped by drop predicate")
40

41
// ErrNegativeMinThreshold is returned by RandomEarlyDrop when minThreshold
42
// is negative.
43
var ErrNegativeMinThreshold = errors.New(
44
        "queue: minThreshold must be >= 0",
45
)
46

47
// ErrInvalidThresholdOrder is returned by RandomEarlyDrop when maxThreshold
48
// is not strictly greater than minThreshold.
49
var ErrInvalidThresholdOrder = errors.New(
50
        "queue: maxThreshold must be > minThreshold",
51
)
52

53
// BackpressureQueue is a generic, fixed-capacity queue with predicate-based
54
// drop behavior. When full, it uses the DropPredicate to perform early drops
55
// (e.g., RED-style).
56
type BackpressureQueue[T any] struct {
57
        ch            chan T
58
        dropPredicate DropPredicate[T]
59

60
        closed    atomic.Bool
61
        closeOnce sync.Once
62
}
63

64
// NewBackpressureQueue creates a new BackpressureQueue with the given capacity
65
// and drop predicate. Panics if capacity <= 0 or predicate is nil.
66
func NewBackpressureQueue[T any](capacity int,
67
        predicate DropPredicate[T]) *BackpressureQueue[T] {
319✔
68

319✔
69
        if capacity <= 0 {
319✔
70
                panic("queue: NewBackpressureQueue requires capacity > 0")
×
71
        }
72
        if predicate == nil {
319✔
73
                panic("queue: NewBackpressureQueue requires " +
×
74
                        "a non-nil predicate")
×
75
        }
76

77
        return &BackpressureQueue[T]{
319✔
78
                ch:            make(chan T, capacity),
319✔
79
                dropPredicate: predicate,
319✔
80
        }
319✔
81
}
82

83
// Enqueue attempts to add an item to the queue, respecting context
84
// cancellation. Returns ErrItemDropped if dropped, or context error if ctx is
85
// done before enqueue. Otherwise, `nil` is returned on success.
86
func (q *BackpressureQueue[T]) Enqueue(ctx context.Context, item T) error {
2,328✔
87
        if q.closed.Load() {
2,329✔
88
                return ErrQueueClosed
1✔
89
        }
1✔
90

91
        // Consult the drop predicate based on the current queue length.
92
        //
93
        // NOTE: There is a TOCTOU gap here — the queue length snapshot can
94
        // become stale between this check and the channel send below if
95
        // there are multiple concurrent writers. This is acceptable because
96
        // RED is inherently probabilistic and approximate; a slightly
97
        // outdated length does not compromise correctness.
98
        if q.dropPredicate(len(q.ch), item) {
2,661✔
99
                return ErrItemDropped
334✔
100
        }
334✔
101

102
        // If the predicate decides not to drop, attempt to enqueue the item.
103
        select {
1,993✔
104
        case q.ch <- item:
1,892✔
105
                return nil
1,892✔
106

107
        default:
101✔
108
                // Channel is full, and the predicate decided not to drop. We
101✔
109
                // must block until space is available or context is cancelled.
101✔
110
                select {
101✔
UNCOV
111
                case q.ch <- item:
×
UNCOV
112
                        return nil
×
113

114
                case <-ctx.Done():
101✔
115
                        return ctx.Err()
101✔
116
                }
117
        }
118
}
119

120
// TryEnqueue attempts to add an item to the queue without blocking. Returns
121
// true if successfully enqueued, false if the drop predicate rejected the item
122
// or the queue is at capacity.
123
func (q *BackpressureQueue[T]) TryEnqueue(item T) bool {
15✔
124
        if q.closed.Load() {
16✔
125
                return false
1✔
126
        }
1✔
127

128
        if q.dropPredicate(len(q.ch), item) {
16✔
129
                return false
2✔
130
        }
2✔
131

132
        select {
12✔
133
        case q.ch <- item:
11✔
134
                return true
11✔
135
        default:
1✔
136
                return false
1✔
137
        }
138
}
139

140
// Dequeue retrieves the next item from the queue, blocking until available or
141
// context done. Returns the item or an error if ctx is done before an item is
142
// available.
143
func (q *BackpressureQueue[T]) Dequeue(ctx context.Context) fn.Result[T] {
1,337✔
144
        select {
1,337✔
145

146
        case item, ok := <-q.ch:
877✔
147
                if !ok {
877✔
148
                        return fn.Err[T](ErrQueueClosed)
×
149
                }
×
150
                return fn.Ok(item)
877✔
151

152
        case <-ctx.Done():
460✔
153
                return fn.Err[T](ctx.Err())
460✔
154
        }
155
}
156

157
// Len returns the current number of items buffered in the queue.
158
func (q *BackpressureQueue[T]) Len() int {
5✔
159
        return len(q.ch)
5✔
160
}
5✔
161

162
// ReceiveChan returns the receive-only end of the internal channel, allowing
163
// callers to select on it alongside other channels (e.g., context.Done).
164
func (q *BackpressureQueue[T]) ReceiveChan() <-chan T {
9✔
165
        return q.ch
9✔
166
}
9✔
167

168
// Close closes the internal channel. It is safe to call multiple times;
169
// only the first call has any effect. After Close, no more items can be
170
// enqueued. Remaining items can still be received via ReceiveChan.
171
func (q *BackpressureQueue[T]) Close() {
11✔
172
        q.closeOnce.Do(func() {
21✔
173
                q.closed.Store(true)
10✔
174
                close(q.ch)
10✔
175
        })
10✔
176
}
177

178
// redConfig holds configuration for RandomEarlyDrop.
179
type redConfig struct {
180
        // randSrc returns a float64 in [0.0, 1.0). It must be safe for
181
        // concurrent use if the returned DropCheckFunc will be called from
182
        // multiple goroutines. The default (math/rand.Float64) is safe since
183
        // Go 1.20+.
184
        randSrc func() float64
185
}
186

187
// REDOption is a functional option for configuring RandomEarlyDrop.
188
type REDOption func(*redConfig)
189

190
// WithRandSource provides a custom random number source (a function that
191
// returns a float64 between 0.0 and 1.0).
192
func WithRandSource(src func() float64) REDOption {
101✔
193
        return func(cfg *redConfig) {
202✔
194
                cfg.randSrc = src
101✔
195
        }
101✔
196
}
197

198
// RandomEarlyDrop returns a DropCheckFunc that implements Random Early
199
// Detection (RED), inspired by TCP-RED queue management.
200
//
201
// RED prevents sudden buffer overflows by proactively dropping packets before
202
// the queue is full. It establishes two thresholds:
203
//
204
//  1. minThreshold: queue length below which no drops occur.
205
//  2. maxThreshold: queue length at or above which all items are dropped.
206
//
207
// Between these points, the drop probability p increases linearly:
208
//
209
//        p = (queueLen - minThreshold) / (maxThreshold - minThreshold)
210
//
211
// For example, with minThreshold=15 and maxThreshold=35:
212
//   - At queueLen=15, p=0.0 (0% drop chance)
213
//   - At queueLen=25, p=0.5 (50% drop chance)
214
//   - At queueLen=35, p=1.0 (100% drop chance)
215
//
216
// This smooth ramp helps avoid tail-drop spikes, smooths queue occupancy,
217
// and gives early back-pressure signals to senders.
218
func RandomEarlyDrop(minThreshold, maxThreshold int,
219
        opts ...REDOption) (DropCheckFunc, error) {
101✔
220

101✔
221
        if minThreshold < 0 {
101✔
222
                return nil, ErrNegativeMinThreshold
×
223
        }
×
224
        if maxThreshold <= minThreshold {
101✔
225
                return nil, ErrInvalidThresholdOrder
×
226
        }
×
227

228
        cfg := redConfig{
101✔
229
                randSrc: rand.Float64,
101✔
230
        }
101✔
231

101✔
232
        for _, opt := range opts {
202✔
233
                opt(&cfg)
101✔
234
        }
101✔
235
        if cfg.randSrc == nil {
101✔
236
                cfg.randSrc = rand.Float64
×
237
        }
×
238

239
        // Precompute the denominator for the linear drop probability
240
        // scaling. Since minThreshold < maxThreshold is enforced above,
241
        // this is always positive.
242
        denominator := float64(maxThreshold - minThreshold)
101✔
243

101✔
244
        dropFn := func(queueLen int) bool {
1,498✔
245
                // If the queue is below the minimum threshold, then we never
1,397✔
246
                // drop.
1,397✔
247
                if queueLen < minThreshold {
1,994✔
248
                        return false
597✔
249
                }
597✔
250

251
                // If the queue is at or above the maximum threshold, then we
252
                // always drop.
253
                if queueLen >= maxThreshold {
1,075✔
254
                        return true
275✔
255
                }
275✔
256

257
                // If we're in the middle, then we implement linear scaling of
258
                // the drop probability based on our thresholds. At this point,
259
                // minThreshold <= queueLen < maxThreshold.
260
                p := float64(queueLen-minThreshold) / denominator
525✔
261

525✔
262
                return cfg.randSrc() < p
525✔
263
        }
264

265
        return dropFn, nil
101✔
266
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc