• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cybertec-postgresql / pg_timetable / 19066894532

04 Nov 2025 11:17AM UTC coverage: 87.519% (+3.1%) from 84.424%
19066894532

Pull #726

github

web-flow
Merge 4aa9c37b2 into 06f39e588
Pull Request #726: [-] fix `cannot scan NULL into *string`, closes #725

9 of 9 new or added lines in 1 file covered. (100.0%)

6 existing lines in 2 files now uncovered.

1732 of 1979 relevant lines covered (87.52%)

1.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.53
/internal/scheduler/chain.go
1
package scheduler
2

3
import (
4
        "cmp"
5
        "context"
6
        "fmt"
7
        "strings"
8
        "time"
9

10
        "github.com/cybertec-postgresql/pg_timetable/internal/log"
11
        "github.com/cybertec-postgresql/pg_timetable/internal/pgengine"
12
        pgx "github.com/jackc/pgx/v5"
13
)
14

15
type (
16
        Chain       = pgengine.Chain
17
        ChainSignal = pgengine.ChainSignal
18
)
19

20
// SendChain sends chain to the channel for workers
21
func (sch *Scheduler) SendChain(c Chain) {
1✔
22
        select {
1✔
23
        case sch.chainsChan <- c:
1✔
24
                sch.l.WithField("chain", c.ChainID).Debug("Sent chain to the execution channel")
1✔
25
        default:
×
26
                sch.l.WithField("chain", c.ChainID).Error("Failed to send chain to the execution channel")
×
27
        }
28
}
29

30
// Lock locks the chain in exclusive or non-exclusive mode
31
func (sch *Scheduler) Lock(exclusiveExecution bool) {
1✔
32
        if exclusiveExecution {
2✔
33
                sch.exclusiveMutex.Lock()
1✔
34
        } else {
2✔
35
                sch.exclusiveMutex.RLock()
1✔
36
        }
1✔
37
}
38

39
// Unlock releases the lock after the chain execution
40
func (sch *Scheduler) Unlock(exclusiveExecution bool) {
1✔
41
        if exclusiveExecution {
2✔
42
                sch.exclusiveMutex.Unlock()
1✔
43
        } else {
2✔
44
                sch.exclusiveMutex.RUnlock()
1✔
45
        }
1✔
46
}
47

48
func (sch *Scheduler) retrieveAsyncChainsAndRun(ctx context.Context) {
1✔
49
        for {
2✔
50
                chainSignal := sch.pgengine.WaitForChainSignal(ctx)
1✔
51
                if chainSignal.ConfigID == 0 {
2✔
52
                        return
1✔
53
                }
1✔
54
                err := sch.processAsyncChain(ctx, chainSignal)
1✔
55
                if err != nil {
2✔
56
                        sch.l.WithError(err).Error("Could not process async chain command")
1✔
57
                }
1✔
58
        }
59
}
60

61
func (sch *Scheduler) processAsyncChain(ctx context.Context, chainSignal ChainSignal) error {
1✔
62
        switch chainSignal.Command {
1✔
63
        case "START":
1✔
64
                var c Chain
1✔
65
                if err := sch.pgengine.SelectChain(ctx, &c, chainSignal.ConfigID); err != nil {
2✔
66
                        return fmt.Errorf("cannot start chain with ID: %d; %w", chainSignal.ConfigID, err)
1✔
67
                }
1✔
68
                go func() {
2✔
69
                        select {
1✔
70
                        case <-ctx.Done():
×
71
                                return
×
72
                        case <-time.After(time.Duration(chainSignal.Delay) * time.Second):
1✔
73
                                sch.SendChain(c)
1✔
74
                        }
75
                }()
76
        case "STOP":
1✔
77
                if cancel, ok := sch.activeChains[chainSignal.ConfigID]; ok {
1✔
78
                        cancel()
×
79
                        return nil
×
80
                }
×
81
                return fmt.Errorf("cannot stop chain with ID: %d. No running chain found", chainSignal.ConfigID)
1✔
82
        }
83
        return nil
1✔
84
}
85

86
func (sch *Scheduler) retrieveChainsAndRun(ctx context.Context, reboot bool) {
1✔
87
        var err error
1✔
88
        var headChains []Chain
1✔
89
        msg := "Retrieve scheduled chains to run"
1✔
90
        if reboot {
2✔
91
                msg += " @reboot"
1✔
92
        }
1✔
93
        if reboot {
2✔
94
                err = sch.pgengine.SelectRebootChains(ctx, &headChains)
1✔
95
        } else {
2✔
96
                err = sch.pgengine.SelectChains(ctx, &headChains)
1✔
97
        }
1✔
98
        if err != nil {
1✔
99
                sch.l.WithError(err).Error("Could not query pending tasks")
×
100
                return
×
101
        }
×
102
        headChainsCount := len(headChains)
1✔
103
        sch.l.WithField("count", headChainsCount).Info(msg)
1✔
104
        // now we can loop through the chains
1✔
105
        for _, c := range headChains {
2✔
106
                // if the number of chains pulled for execution is high, try to spread execution to avoid spikes
1✔
107
                if headChainsCount > sch.Config().Resource.CronWorkers*refetchTimeout {
1✔
108
                        time.Sleep(time.Duration(refetchTimeout*1000/headChainsCount) * time.Millisecond)
×
109
                }
×
110
                sch.SendChain(c)
1✔
111
        }
112
}
113

114
func (sch *Scheduler) addActiveChain(id int, cancel context.CancelFunc) {
1✔
115
        sch.activeChainMutex.Lock()
1✔
116
        sch.activeChains[id] = cancel
1✔
117
        sch.activeChainMutex.Unlock()
1✔
118
}
1✔
119

120
func (sch *Scheduler) deleteActiveChain(id int) {
1✔
121
        sch.activeChainMutex.Lock()
1✔
122
        delete(sch.activeChains, id)
1✔
123
        sch.activeChainMutex.Unlock()
1✔
124
}
1✔
125

126
func (sch *Scheduler) terminateChains() {
1✔
127
        for id, cancel := range sch.activeChains {
1✔
128
                sch.l.WithField("chain", id).Debug("Terminating chain...")
×
129
                cancel()
×
130
        }
×
131
        for {
2✔
132
                time.Sleep(1 * time.Second) // give some time to terminate chains gracefully
1✔
133
                if len(sch.activeChains) == 0 {
2✔
134
                        return
1✔
135
                }
1✔
136
                sch.l.Debugf("Still active chains running: %d", len(sch.activeChains))
×
137
        }
138
}
139

140
func (sch *Scheduler) chainWorker(ctx context.Context, chains <-chan Chain) {
1✔
141
        for {
2✔
142
                select {
1✔
143
                case <-ctx.Done(): //check context with high priority
1✔
144
                        return
1✔
145
                default:
1✔
146
                        select {
1✔
147
                        case chain := <-chains:
1✔
148
                                chainL := sch.l.WithField("chain", chain.ChainID)
1✔
149
                                chainContext := log.WithLogger(ctx, chainL)
1✔
150
                                if !sch.pgengine.InsertChainRunStatus(ctx, chain.ChainID, chain.MaxInstances) {
2✔
151
                                        chainL.Info("Cannot proceed. Sleeping")
1✔
152
                                        continue
1✔
153
                                }
154
                                chainL.Info("Starting chain")
1✔
155
                                sch.Lock(chain.ExclusiveExecution)
1✔
156
                                chainContext, cancel := context.WithCancel(chainContext)
1✔
157
                                sch.addActiveChain(chain.ChainID, cancel)
1✔
158
                                sch.executeChain(chainContext, chain)
1✔
159
                                sch.deleteActiveChain(chain.ChainID)
1✔
160
                                cancel()
1✔
161
                                sch.Unlock(chain.ExclusiveExecution)
1✔
162
                        case <-ctx.Done():
1✔
163
                                return
1✔
164
                        }
165

166
                }
167
        }
168
}
169

170
func getTimeoutContext(ctx context.Context, globalTimeout int, customTimeout int) (context.Context, context.CancelFunc) {
1✔
171
        timeout := cmp.Or(customTimeout, globalTimeout)
1✔
172
        if timeout > 0 {
2✔
173
                return context.WithTimeout(ctx, time.Millisecond*time.Duration(timeout))
1✔
174
        }
1✔
175
        return ctx, nil
1✔
176
}
177

178
func (sch *Scheduler) executeOnErrorHandler(ctx context.Context, chain Chain) {
1✔
179
        if ctx.Err() != nil || chain.OnError == "" {
2✔
180
                return
1✔
181
        }
1✔
182
        l := sch.l.WithField("chain", chain.ChainID)
1✔
183
        l.Info("Starting error handling")
1✔
184
        if _, err := sch.pgengine.ConfigDb.Exec(ctx, chain.OnError); err != nil {
2✔
185
                l.Info("Error handler failed")
1✔
186
                return
1✔
187
        }
1✔
188
        l.Info("Error handler executed successfully")
1✔
189
}
190

191
/* execute a chain of tasks */
192
func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) {
1✔
193
        var ChainTasks []pgengine.ChainTask
1✔
194
        var bctx context.Context
1✔
195
        var cancel context.CancelFunc
1✔
196
        var vxid int64
1✔
197

1✔
198
        chainCtx, cancel := getTimeoutContext(ctx, sch.Config().Resource.ChainTimeout, chain.Timeout)
1✔
199
        if cancel != nil {
2✔
200
                defer cancel()
1✔
201
        }
1✔
202

203
        chainL := sch.l.WithField("chain", chain.ChainID)
1✔
204
        tx, vxid, err := sch.pgengine.StartTransaction(chainCtx)
1✔
205
        if err != nil {
2✔
206
                chainL.WithError(err).Error("Cannot start transaction")
1✔
207
                return
1✔
208
        }
1✔
209
        chainL = chainL.WithField("vxid", vxid)
1✔
210

1✔
211
        err = sch.pgengine.GetChainElements(chainCtx, &ChainTasks, chain.ChainID)
1✔
212
        if err != nil {
1✔
UNCOV
213
                chainL.WithError(err).Error("Failed to retrieve chain elements")
×
UNCOV
214
                sch.pgengine.RollbackTransaction(chainCtx, tx)
×
UNCOV
215
                return
×
UNCOV
216
        }
×
217

218
        /* now we can loop through every element of the task chain */
219
        for _, task := range ChainTasks {
2✔
220
                task.ChainID = chain.ChainID
1✔
221
                task.Vxid = vxid
1✔
222
                l := chainL.WithField("task", task.TaskID)
1✔
223
                l.Info("Starting task")
1✔
224
                taskCtx := log.WithLogger(chainCtx, l)
1✔
225
                retCode := sch.executeTask(taskCtx, tx, &task)
1✔
226

1✔
227
                // we use background context here because current one (chainCtx) might be cancelled
1✔
228
                bctx = log.WithLogger(ctx, l)
1✔
229
                if retCode != 0 {
2✔
230
                        if !task.IgnoreError {
1✔
231
                                chainL.Error("Chain failed")
×
232
                                sch.pgengine.RemoveChainRunStatus(bctx, chain.ChainID)
×
233
                                sch.pgengine.RollbackTransaction(bctx, tx)
×
234
                                sch.executeOnErrorHandler(bctx, chain)
×
235
                                return
×
236
                        }
×
237
                        l.Info("Ignoring task failure")
1✔
238
                }
239
        }
240
        bctx = log.WithLogger(chainCtx, chainL)
1✔
241
        sch.pgengine.CommitTransaction(bctx, tx)
1✔
242
        chainL.Info("Chain executed successfully")
1✔
243
        sch.pgengine.RemoveChainRunStatus(bctx, chain.ChainID)
1✔
244
        if chain.SelfDestruct {
1✔
245
                sch.pgengine.DeleteChain(bctx, chain.ChainID)
×
246
        }
×
247
}
248

249
/* execute a task */
250
func (sch *Scheduler) executeTask(ctx context.Context, tx pgx.Tx, task *pgengine.ChainTask) int {
1✔
251
        var (
1✔
252
                paramValues []string
1✔
253
                err         error
1✔
254
                out         string
1✔
255
                retCode     int
1✔
256
                cancel      context.CancelFunc
1✔
257
        )
1✔
258

1✔
259
        l := log.GetLogger(ctx)
1✔
260
        err = sch.pgengine.GetChainParamValues(ctx, &paramValues, task)
1✔
261
        if err != nil {
2✔
262
                l.WithError(err).Error("cannot fetch parameters values for chain: ", err)
1✔
263
                return -1
1✔
264
        }
1✔
265

266
        ctx, cancel = getTimeoutContext(ctx, sch.Config().Resource.TaskTimeout, task.Timeout)
1✔
267
        if cancel != nil {
1✔
268
                defer cancel()
×
269
        }
×
270

271
        task.StartedAt = time.Now()
1✔
272
        switch task.Kind {
1✔
273
        case "SQL":
1✔
274
                out, err = sch.pgengine.ExecuteSQLTask(ctx, tx, task, paramValues)
1✔
275
        case "PROGRAM":
1✔
276
                if sch.pgengine.NoProgramTasks {
1✔
277
                        l.Info("Program task execution skipped")
×
278
                        return -2
×
279
                }
×
280
                retCode, out, err = sch.ExecuteProgramCommand(ctx, task.Command, paramValues)
1✔
281
        case "BUILTIN":
1✔
282
                out, err = sch.executeBuiltinTask(ctx, task.Command, paramValues)
1✔
283
        }
284
        task.Duration = time.Since(task.StartedAt).Microseconds()
1✔
285

1✔
286
        if err != nil {
2✔
287
                if retCode == 0 {
2✔
288
                        retCode = -1
1✔
289
                }
1✔
290
                out = strings.Join([]string{out, err.Error()}, "\n")
1✔
291
                l.WithError(err).Error("Task execution failed")
1✔
292
        } else {
1✔
293
                l.Info("Task executed successfully")
1✔
294
        }
1✔
295
        sch.pgengine.LogTaskExecution(context.Background(), task, retCode, out)
1✔
296
        return retCode
1✔
297
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc