• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13481548301

23 Feb 2025 09:06AM UTC coverage: 4.031% (-54.8%) from 58.825%
13481548301

Pull #9521

github

web-flow
Merge 1ffbe99fe into 5fe900d18
Pull Request #9521: unit: remove GOACC, use Go 1.20 native coverage functionality

2852 of 70750 relevant lines covered (4.03%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/queue/gc_queue.go
1
package queue
2

3
import (
4
        "container/list"
5
        "time"
6

7
        "github.com/lightningnetwork/lnd/ticker"
8
)
9

10
// GCQueue is garbage collecting queue, which dynamically grows and contracts
11
// based on load. If the queue has items which have been returned, the queue
12
// will check every gcInterval amount of time to see if any elements are
13
// eligible to be released back to the runtime. Elements that have been in the
14
// queue for a duration of least expiryInterval will be released upon the next
15
// iteration of the garbage collection, thus the maximum amount of time an
16
// element remain in the queue is expiryInterval+gcInterval. The gc ticker will
17
// be disabled after all items in the queue have been taken or released to
18
// ensure that the GCQueue becomes quiescent, and imposes minimal overhead in
19
// the steady state.
20
type GCQueue struct {
21
        // takeBuffer coordinates the delivery of items taken from the queue
22
        // such that they are delivered to requesters.
23
        takeBuffer chan interface{}
24

25
        // returnBuffer coordinates the return of items back into the queue,
26
        // where they will be kept until retaken or released.
27
        returnBuffer chan interface{}
28

29
        // newItem is a constructor, used to generate new elements if none are
30
        // otherwise available for reuse.
31
        newItem func() interface{}
32

33
        // expiryInterval is the minimum amount of time an element will remain
34
        // in the queue before being released.
35
        expiryInterval time.Duration
36

37
        // recycleTicker is a resumable ticker used to trigger a sweep to
38
        // release elements that have been in the queue longer than
39
        // expiryInterval.
40
        recycleTicker ticker.Ticker
41

42
        // freeList maintains a list of gcQueueEntries, sorted in order of
43
        // increasing time of arrival.
44
        freeList *list.List
45

46
        quit chan struct{}
47
}
48

49
// NewGCQueue creates a new garbage collecting queue, which dynamically grows
50
// and contracts based on load. If the queue has items which have been returned,
51
// the queue will check every gcInterval amount of time to see if any elements
52
// are eligible to be released back to the runtime. Elements that have been in
53
// the queue for a duration of least expiryInterval will be released upon the
54
// next iteration of the garbage collection, thus the maximum amount of time an
55
// element remain in the queue is expiryInterval+gcInterval. The gc ticker will
56
// be disabled after all items in the queue have been taken or released to
57
// ensure that the GCQueue becomes quiescent, and imposes minimal overhead in
58
// the steady state. The returnQueueSize parameter is used to size the maximal
59
// number of items that can be returned without being dropped during large
60
// bursts in attempts to return items to the GCQUeue.
61
func NewGCQueue(newItem func() interface{}, returnQueueSize int,
62
        gcInterval, expiryInterval time.Duration) *GCQueue {
×
63

×
64
        q := &GCQueue{
×
65
                takeBuffer:     make(chan interface{}),
×
66
                returnBuffer:   make(chan interface{}, returnQueueSize),
×
67
                expiryInterval: expiryInterval,
×
68
                freeList:       list.New(),
×
69
                recycleTicker:  ticker.New(gcInterval),
×
70
                newItem:        newItem,
×
71
                quit:           make(chan struct{}),
×
72
        }
×
73

×
74
        go q.queueManager()
×
75

×
76
        return q
×
77
}
×
78

79
// Take returns either a recycled element from the queue, or creates a new item
80
// if none are available.
81
func (q *GCQueue) Take() interface{} {
×
82
        select {
×
83
        case item := <-q.takeBuffer:
×
84
                return item
×
85
        case <-time.After(time.Millisecond):
×
86
                return q.newItem()
×
87
        }
88
}
89

90
// Return adds the returned item to freelist if the queue's returnBuffer has
91
// available capacity. Under load, items may be dropped to ensure this method
92
// does not block.
93
func (q *GCQueue) Return(item interface{}) {
×
94
        select {
×
95
        case q.returnBuffer <- item:
×
96
        default:
×
97
        }
98
}
99

100
// gcQueueEntry is a tuple containing an interface{} and the time at which the
101
// item was added to the queue. The recorded time is used to determine when the
102
// entry becomes stale, and can be released if it has not already been taken.
103
type gcQueueEntry struct {
104
        item interface{}
105
        time time.Time
106
}
107

108
// queueManager maintains the free list of elements by popping the head of the
109
// queue when items are needed, and appending them to the end of the queue when
110
// items are returned. The queueManager will periodically attempt to release any
111
// items that have been in the queue longer than the expiry interval.
112
//
113
// NOTE: This method SHOULD be run as a goroutine.
114
func (q *GCQueue) queueManager() {
×
115
        for {
×
116
                // If the pool is empty, initialize a buffer pool to serve a
×
117
                // client that takes a buffer immediately. If this happens, this
×
118
                // is either:
×
119
                //   1) the first iteration of the loop,
×
120
                //   2) after all entries were garbage collected, or
×
121
                //   3) the freelist was emptied after the last entry was taken.
×
122
                //
×
123
                // In all of these cases, it is safe to pause the recycle ticker
×
124
                // since it will be resumed as soon an entry is returned to the
×
125
                // freelist.
×
126
                if q.freeList.Len() == 0 {
×
127
                        q.freeList.PushBack(gcQueueEntry{
×
128
                                item: q.newItem(),
×
129
                                time: time.Now(),
×
130
                        })
×
131

×
132
                        q.recycleTicker.Pause()
×
133
                }
×
134

135
                next := q.freeList.Front()
×
136

×
137
                select {
×
138

139
                // If a client requests a new write buffer, deliver the buffer
140
                // at the head of the freelist to them.
141
                case q.takeBuffer <- next.Value.(gcQueueEntry).item:
×
142
                        q.freeList.Remove(next)
×
143

144
                // If a client is returning a write buffer, add it to the free
145
                // list and resume the recycle ticker so that it can be cleared
146
                // if the entries are not quickly reused.
147
                case item := <-q.returnBuffer:
×
148
                        // Add the returned buffer to the freelist, recording
×
149
                        // the current time so we can determine when the entry
×
150
                        // expires.
×
151
                        q.freeList.PushBack(gcQueueEntry{
×
152
                                item: item,
×
153
                                time: time.Now(),
×
154
                        })
×
155

×
156
                        // Adding the buffer implies that we now have a non-zero
×
157
                        // number of elements in the free list. Resume the
×
158
                        // recycle ticker to cleanup any entries that go unused.
×
159
                        q.recycleTicker.Resume()
×
160

161
                // If the recycle ticker fires, we will aggressively release any
162
                // write buffers in the freelist for which the expiryInterval
163
                // has elapsed since their insertion. If after doing so, no
164
                // elements remain, we will pause the recycle ticker.
165
                case <-q.recycleTicker.Ticks():
×
166
                        // Since the insert time of all entries will be
×
167
                        // monotonically increasing, iterate over elements and
×
168
                        // remove all entries that have expired.
×
169
                        var next *list.Element
×
170
                        for e := q.freeList.Front(); e != nil; e = next {
×
171
                                // Cache the next element, since it will become
×
172
                                // unreachable from the current element if it is
×
173
                                // removed.
×
174
                                next = e.Next()
×
175
                                entry := e.Value.(gcQueueEntry)
×
176

×
177
                                // Use now - insertTime <= expiryInterval to
×
178
                                // determine if this entry has not expired.
×
179
                                if time.Since(entry.time) <= q.expiryInterval {
×
180
                                        // If this entry hasn't expired, then
×
181
                                        // all entries that follow will still be
×
182
                                        // valid.
×
183
                                        break
×
184
                                }
185

186
                                // Otherwise, remove the expired entry from the
187
                                // linked-list.
188
                                q.freeList.Remove(e)
×
189
                                entry.item = nil
×
190
                                e.Value = nil
×
191
                        }
192
                }
193
        }
194
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc