• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cybertec-postgresql / pg_timetable / 18011644652

25 Sep 2025 02:56PM UTC coverage: 85.698% (-0.6%) from 86.277%
18011644652

Pull #717

github

pashagolub
update Release GHA workflow
Pull Request #717: [+] use a dedicated action for Docker builds

1480 of 1727 relevant lines covered (85.7%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.61
/internal/scheduler/interval_chain.go
1
package scheduler
2

3
import (
4
        "context"
5
        "time"
6

7
        "github.com/cybertec-postgresql/pg_timetable/internal/log"
8
        "github.com/cybertec-postgresql/pg_timetable/internal/pgengine"
9
)
10

11
type IntervalChain = pgengine.IntervalChain
12

13
// SendIntervalChain sends interval chain to the channel for workers
14
func (sch *Scheduler) SendIntervalChain(c IntervalChain) {
1✔
15
        select {
1✔
16
        case sch.ichainsChan <- c:
1✔
17
                sch.l.WithField("chain", c.ChainID).Debug("Sent interval chain to the execution channel")
1✔
18
        default:
×
19
                sch.l.WithField("chain", c.ChainID).Error("Failed to send interval chain to the execution channel")
×
20
        }
21
}
22

23
func (sch *Scheduler) isValid(ichain IntervalChain) bool {
1✔
24
        sch.intervalChainMutex.Lock()
1✔
25
        defer sch.intervalChainMutex.Unlock()
1✔
26
        return (IntervalChain{}) != sch.intervalChains[ichain.ChainID]
1✔
27
}
1✔
28

29
func (sch *Scheduler) reschedule(ctx context.Context, ichain IntervalChain) {
1✔
30
        log.GetLogger(ctx).Debug("Sleeping before next execution of interval chain")
1✔
31
        select {
1✔
32
        case <-time.After(time.Duration(ichain.Interval) * time.Second):
1✔
33
                if sch.isValid(ichain) {
2✔
34
                        sch.SendIntervalChain(ichain)
1✔
35
                }
1✔
36
        case <-ctx.Done():
1✔
37
                return
1✔
38
        }
39
}
40

41
func (sch *Scheduler) retrieveIntervalChainsAndRun(ctx context.Context) {
1✔
42
        var ichains []IntervalChain
1✔
43
        err := sch.pgengine.SelectIntervalChains(ctx, &ichains)
1✔
44
        if err != nil {
1✔
45
                sch.l.WithError(err).Error("Could not query pending interval tasks")
×
46
        } else {
1✔
47
                sch.l.WithField("count", len(ichains)).Info("Retrieve interval chains to run")
1✔
48
        }
1✔
49

50
        // delete chains that are not returned from the database
51
        sch.intervalChainMutex.Lock()
1✔
52
        for id, ichain := range sch.intervalChains {
1✔
53
                if !ichain.IsListed(ichains) {
×
54
                        delete(sch.intervalChains, id)
×
55
                }
×
56
        }
57

58
        // update chains from the database and send to working channel new one
59
        for _, ichain := range ichains {
2✔
60
                if (IntervalChain{}) == sch.intervalChains[ichain.ChainID] {
2✔
61
                        sch.SendIntervalChain(ichain)
1✔
62
                }
1✔
63
                sch.intervalChains[ichain.ChainID] = ichain
1✔
64
        }
65
        sch.intervalChainMutex.Unlock()
1✔
66
}
67

68
func (sch *Scheduler) intervalChainWorker(ctx context.Context, ichains <-chan IntervalChain) {
1✔
69
        for {
2✔
70
                select {
1✔
71
                case <-ctx.Done(): //check context with high priority
1✔
72
                        return
1✔
73
                default:
1✔
74
                        select {
1✔
75
                        case ichain := <-ichains:
1✔
76
                                if !sch.isValid(ichain) { // chain not in the list of active chains
1✔
77
                                        continue
×
78
                                }
79
                                chainL := sch.l.WithField("chain", ichain.ChainID)
1✔
80
                                chainContext := log.WithLogger(ctx, chainL)
1✔
81
                                chainL.Info("Starting chain")
1✔
82
                                if !ichain.RepeatAfter {
2✔
83
                                        go sch.reschedule(chainContext, ichain)
1✔
84
                                }
1✔
85
                                if !sch.pgengine.InsertChainRunStatus(ctx, ichain.ChainID, ichain.MaxInstances) {
1✔
86
                                        chainL.Info("Cannot proceed. Sleeping")
×
87
                                        if ichain.RepeatAfter {
×
88
                                                go sch.reschedule(chainContext, ichain)
×
89
                                        }
×
90
                                        continue
×
91
                                }
92
                                sch.Lock(ichain.ExclusiveExecution)
1✔
93
                                sch.executeChain(chainContext, ichain.Chain)
1✔
94
                                sch.Unlock(ichain.ExclusiveExecution)
1✔
95
                                if ichain.RepeatAfter {
2✔
96
                                        go sch.reschedule(chainContext, ichain)
1✔
97
                                }
1✔
98
                        case <-ctx.Done():
1✔
99
                                return
1✔
100
                        }
101
                }
102
        }
103
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc