• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

coccyx / gogen / 13618304364

02 Mar 2025 06:46PM UTC coverage: 60.597% (-0.2%) from 60.779%
13618304364

Pull #49

github

coccyx
Adding fix for long intervals not shutting down in a reasonable time
Pull Request #49: Bug/timer fix on error

7 of 21 new or added lines in 1 file covered. (33.33%)

1 existing line in 1 file now uncovered.

2436 of 4020 relevant lines covered (60.6%)

497.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.61
/timer/timer.go
1
package timer
2

3
import (
4
        "time"
5

6
        config "github.com/coccyx/gogen/internal"
7
        log "github.com/coccyx/gogen/logger"
8
        "github.com/coccyx/gogen/rater"
9
)
10

11
// Timer will put work into the generator queue on an interval specified by the Sample.
12
// One instance is created per sample.
13
type Timer struct {
14
        S              *config.Sample
15
        cur            int
16
        GQ             chan *config.GenQueueItem
17
        OQ             chan *config.OutQueueItem
18
        Done           chan int
19
        closed         bool
20
        cacheCounter   int // Number of intervals left to use cache
21
        cacheIntervals int // Number of intervals to cache for
22
}
23

24
// NewTimer creates a new Timer for a sample which will put work into the generator queue on each interval
25
func (t *Timer) NewTimer(cacheIntervals int) {
22✔
26
        s := t.S
22✔
27
        t.cacheIntervals = cacheIntervals
22✔
28
        // If we're not realtime, then we should be backfilling
22✔
29
        if !s.Realtime {
42✔
30
                // Set the end time based on configuration, either now or a specified time in the config
20✔
31
                var endtime time.Time
20✔
32
                n := time.Now()
20✔
33
                if s.EndParsed.Before(n) && !s.EndParsed.IsZero() {
36✔
34
                        endtime = s.EndParsed
16✔
35
                } else {
20✔
36
                        endtime = n
4✔
37
                }
4✔
38
                // Run through as many intervals until we're at endtime
39
                t.backfill(endtime)
20✔
40
                // If we had no endtime set, then keep going in realtime mode
20✔
41
                if s.EndParsed.IsZero() {
23✔
42
                        t.backfill(time.Now())
3✔
43
                        s.Realtime = true
3✔
44
                }
3✔
45
        }
46
        // Endtime can be greater than now, so continue until we've reached the end time... Realtime won't get set, so we'll end after this
47
        if !t.S.Realtime {
39✔
48
                t.backfill(s.EndParsed)
17✔
49
        }
17✔
50
        // In realtime mode, continue until we get an interrupt
51
        if s.Realtime {
27✔
52
                for {
16✔
53
                        if s.Generator == "replay" {
15✔
54
                                t.genWork()
4✔
55
                                time.Sleep(s.ReplayOffsets[t.cur])
4✔
56
                                t.cur++
4✔
57
                                if t.cur >= len(s.ReplayOffsets) {
5✔
58
                                        t.cur = 0
1✔
59
                                }
1✔
60
                        } else {
7✔
61
                                if s.Interval > 5 {
7✔
NEW
62
                                        // For longer intervals, use ticker to check closed status
×
NEW
63
                                        mainTimer := time.NewTimer(time.Duration(s.Interval) * time.Second)
×
NEW
64
                                        checkTicker := time.NewTicker(1 * time.Second)
×
NEW
65
                                        select {
×
NEW
66
                                        case <-mainTimer.C:
×
NEW
67
                                                checkTicker.Stop()
×
NEW
68
                                                t.genWork()
×
NEW
69
                                        case <-checkTicker.C:
×
NEW
70
                                                if t.closed {
×
NEW
71
                                                        mainTimer.Stop()
×
NEW
72
                                                        checkTicker.Stop()
×
NEW
73
                                                        break
×
74
                                                }
NEW
75
                                                continue
×
76
                                        }
77
                                } else {
7✔
78
                                        // For short intervals, just use the timer directly
7✔
79
                                        timer := time.NewTimer(time.Duration(s.Interval) * time.Second)
7✔
80
                                        <-timer.C
7✔
81
                                        t.genWork()
7✔
82
                                }
7✔
83
                        }
84
                        if t.closed {
6✔
85
                                break
×
86
                        }
87
                }
88
        }
89
        t.Done <- 1
17✔
90
}
91

92
func (t *Timer) backfill(until time.Time) {
40✔
93
        for t.S.Current.Before(until) {
269✔
94
                t.genWork()
229✔
95
                t.inc()
229✔
96
                if t.closed {
229✔
97
                        break
×
98
                }
99
        }
100
}
101

102
func (t *Timer) genWork() {
238✔
103
        s := t.S
238✔
104
        now := s.Now()
238✔
105
        var item *config.GenQueueItem
238✔
106
        useCache := t.cacheCounter > 0
238✔
107
        setCache := !useCache && t.cacheIntervals > 0
238✔
108
        t.cacheCounter--
238✔
109
        if t.cacheCounter < 0 {
434✔
110
                t.cacheCounter = t.cacheIntervals
196✔
111
        }
196✔
112
        ci := &config.CacheItem{
238✔
113
                UseCache: useCache,
238✔
114
                SetCache: setCache,
238✔
115
        }
238✔
116
        if s.Generator == "replay" {
248✔
117
                earliest := now
10✔
118
                latest := now
10✔
119
                count := 1
10✔
120
                item = &config.GenQueueItem{S: s, Count: count, Event: t.cur, Earliest: earliest, Latest: latest, Now: now, OQ: t.OQ, Cache: ci}
10✔
121
        } else {
238✔
122
                earliest := now.Add(s.EarliestParsed)
228✔
123
                latest := now.Add(s.LatestParsed)
228✔
124
                count := rater.EventRate(s, now, s.Count)
228✔
125
                item = &config.GenQueueItem{S: s, Count: count, Event: -1, Earliest: earliest, Latest: latest, Now: now, OQ: t.OQ, Cache: ci}
228✔
126
        }
228✔
127
        // log.Debugf("Placing item in queue for sample '%s': %#v", t.S.Name, item)
128
Loop1:
238✔
129
        for {
487✔
130
                select {
249✔
131
                case t.GQ <- item:
236✔
132
                        break Loop1
236✔
133
                case <-time.After(1 * time.Second):
11✔
134
                        if t.closed {
11✔
NEW
135
                                log.Debugf("Timer %s closed", t.S.Name)
×
UNCOV
136
                                break Loop1
×
137
                        }
138
                        continue
11✔
139
                }
140
        }
141
}
142

143
func (t *Timer) inc() {
229✔
144
        s := t.S
229✔
145
        if s.Generator == "replay" {
235✔
146
                s.Current = s.Current.Add(s.ReplayOffsets[t.cur])
6✔
147
                t.cur++
6✔
148
                if t.cur >= len(s.ReplayOffsets) {
7✔
149
                        t.cur = 0
1✔
150
                }
1✔
151
        } else {
223✔
152
                s.Current = s.Current.Add(time.Duration(s.Interval) * time.Second)
223✔
153
        }
223✔
154
        if s.Wait {
229✔
155
                timer := time.NewTimer(time.Duration(s.Interval) * time.Second)
×
156
                <-timer.C
×
157
        }
×
158
}
159

160
// Close shuts down a timer
161
func (t *Timer) Close() {
1✔
162
        log.Infof("Closing timer for sample %s", t.S.Name)
1✔
163
        t.closed = true
1✔
164
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc