• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cybertec-postgresql / pg_timetable / 18127629475

30 Sep 2025 10:59AM UTC coverage: 84.299% (-1.9%) from 86.155%
18127629475

Pull #720

github

web-flow
Merge f545f91b2 into 61518f6b4
Pull Request #720: [!] add YAML-based chain definitions

196 of 204 new or added lines in 7 files covered. (96.08%)

62 existing lines in 4 files now uncovered.

1659 of 1968 relevant lines covered (84.3%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.5
/internal/pgengine/yaml.go
1
package pgengine
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "fmt"
7
        "os"
8
        "strings"
9

10
        "gopkg.in/yaml.v3"
11
)
12

13
// YamlChain represents a chain with tasks for YAML processing
14
type YamlChain struct {
15
        Chain      `yaml:",inline"`
16
        ClientName string     `db:"client_name" yaml:"client_name,omitempty"`
17
        Schedule   string     `db:"run_at" yaml:"schedule,omitempty"`
18
        Live       bool       `db:"live" yaml:"live,omitempty"`
19
        Tasks      []YamlTask `yaml:"tasks"`
20
}
21

22
// YamlTask extends the basic task structure with Parameters field
23
type YamlTask struct {
24
        ChainTask  `yaml:",inline"`
25
        TaskName   string `db:"task_name" yaml:"name,omitempty"`
26
        Parameters []any  `yaml:"parameters,omitempty"`
27
}
28

29
// YamlConfig represents the root YAML configuration
30
type YamlConfig struct {
31
        Chains []YamlChain `yaml:"chains"`
32
}
33

34
// LoadYamlChains loads chains from a YAML file and imports them
35
func (pge *PgEngine) LoadYamlChains(ctx context.Context, filePath string, replace bool) error {
1✔
36
        // Parse YAML file
1✔
37
        yamlConfig, err := ParseYamlFile(filePath)
1✔
38
        if err != nil {
2✔
39
                return fmt.Errorf("failed to parse YAML file: %w", err)
1✔
40
        }
1✔
41

42
        // Import chains
43
        for _, yamlChain := range yamlConfig.Chains {
2✔
44
                // Delete existing chain if replace mode
1✔
45
                if replace {
2✔
46
                        _, _ = pge.ConfigDb.Exec(ctx, "SELECT timetable.delete_job($1)", yamlChain.ChainName)
1✔
47
                }
1✔
48

49
                // Check if chain exists
50
                var exists bool
1✔
51
                err := pge.ConfigDb.QueryRow(ctx,
1✔
52
                        "SELECT EXISTS(SELECT 1 FROM timetable.chain WHERE chain_name = $1)",
1✔
53
                        yamlChain.ChainName).Scan(&exists)
1✔
54
                if err != nil {
1✔
NEW
55
                        return fmt.Errorf("failed to check if chain exists: %w", err)
×
NEW
56
                }
×
57
                if exists && !replace {
2✔
58
                        return fmt.Errorf("chain '%s' already exists (use --replace flag to overwrite)", yamlChain.ChainName)
1✔
59
                }
1✔
60

61
                // Multi-task chain - use direct SQL
62
                chainID, err := pge.CreateChainFromYaml(ctx, &yamlChain)
1✔
63
                if err != nil {
1✔
NEW
64
                        return fmt.Errorf("failed to create multi-task chain %s: %w", yamlChain.ChainName, err)
×
NEW
65
                }
×
66
                pge.l.WithField("chain", yamlChain.ChainName).WithField("chain_id", chainID).Info("Created multi-task chain")
1✔
67
        }
68

69
        pge.l.WithField("chains", len(yamlConfig.Chains)).WithField("file", filePath).Info("Successfully imported YAML chains")
1✔
70
        return nil
1✔
71
}
72

73
// CreateChainFromYaml creates a multi-task chain using direct SQL inserts
74
func (pge *PgEngine) CreateChainFromYaml(ctx context.Context, yamlChain *YamlChain) (int64, error) {
1✔
75
        // Insert chain
1✔
76
        var chainID int64
1✔
77
        err := pge.ConfigDb.QueryRow(ctx, `INSERT INTO timetable.chain (
1✔
78
                        chain_name, run_at, max_instances, timeout, live, 
1✔
79
                        self_destruct, exclusive_execution, client_name, on_error
1✔
80
                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) 
1✔
81
                RETURNING chain_id`,
1✔
82
                yamlChain.ChainName,
1✔
83
                yamlChain.Schedule,
1✔
84
                yamlChain.MaxInstances,
1✔
85
                yamlChain.Timeout,
1✔
86
                yamlChain.Live,
1✔
87
                yamlChain.SelfDestruct,
1✔
88
                yamlChain.ExclusiveExecution,
1✔
89
                nullString(yamlChain.ClientName),
1✔
90
                nullString(yamlChain.OnError)).Scan(&chainID)
1✔
91
        if err != nil {
2✔
92
                return 0, fmt.Errorf("failed to insert chain: %w", err)
1✔
93
        }
1✔
94

95
        // Insert tasks
96
        for i, task := range yamlChain.Tasks {
2✔
97
                taskOrder := float64((i + 1) * 10)
1✔
98

1✔
99
                var taskID int64
1✔
100
                err := pge.ConfigDb.QueryRow(ctx, `
1✔
101
                        INSERT INTO timetable.task (
1✔
102
                                chain_id, task_order, task_name, kind, command, 
1✔
103
                                run_as, database_connection, ignore_error, autonomous, timeout
1✔
104
                        ) VALUES ($1, $2, $3, $4::timetable.command_kind, $5, $6, $7, $8, $9, $10) 
1✔
105
                        RETURNING task_id`,
1✔
106
                        chainID,
1✔
107
                        taskOrder,
1✔
108
                        nullString(task.TaskName),
1✔
109
                        task.Kind,
1✔
110
                        task.Command,
1✔
111
                        nullString(task.RunAs),
1✔
112
                        nullString(task.ConnectString),
1✔
113
                        task.IgnoreError,
1✔
114
                        task.Autonomous,
1✔
115
                        task.Timeout).Scan(&taskID)
1✔
116
                if err != nil {
2✔
117
                        return 0, fmt.Errorf("failed to insert task %d: %w", i+1, err)
1✔
118
                }
1✔
119

120
                // Insert parameters if any
121
                if len(task.Parameters) > 0 {
2✔
122
                        for paramIndex, param := range task.Parameters {
2✔
123
                                orderID := paramIndex + 1
1✔
124

1✔
125
                                // Convert parameter to JSON for JSONB storage
1✔
126
                                jsonValue, err := json.Marshal(param)
1✔
127
                                if err != nil {
2✔
128
                                        return 0, fmt.Errorf("failed to marshal parameter %d to JSON: %w", orderID, err)
1✔
129
                                }
1✔
130

131
                                _, err = pge.ConfigDb.Exec(ctx,
1✔
132
                                        "INSERT INTO timetable.parameter (task_id, order_id, value) VALUES ($1, $2, $3::jsonb)",
1✔
133
                                        taskID, orderID, string(jsonValue))
1✔
134
                                if err != nil {
2✔
135
                                        return 0, fmt.Errorf("failed to insert parameter %d: %w", orderID, err)
1✔
136
                                }
1✔
137
                        }
138
                }
139
        }
140

141
        return chainID, nil
1✔
142
}
143

144
// nullString returns nil for empty strings, otherwise returns the string
145
func nullString(s string) any {
1✔
146
        if s == "" {
2✔
147
                return nil
1✔
148
        }
1✔
149
        return s
1✔
150
}
151

152
// ValidateChain validates a YAML chain configuration
153
func (c *YamlChain) ValidateChain() error {
1✔
154
        if c.ChainName == "" {
2✔
155
                return fmt.Errorf("chain name is required")
1✔
156
        }
1✔
157

158
        if c.Schedule == "" {
2✔
159
                return fmt.Errorf("chain schedule is required")
1✔
160
        }
1✔
161

162
        // Validate cron format
163
        specialSchedules := []string{"@reboot", "@after", "@every"}
1✔
164
        isSpecial := false
1✔
165
        for _, s := range specialSchedules {
2✔
166
                if strings.HasPrefix(c.Schedule, s) {
2✔
167
                        isSpecial = true
1✔
168
                        break
1✔
169
                }
170
        }
171

172
        if !isSpecial {
2✔
173
                fields := strings.Fields(c.Schedule)
1✔
174
                if len(fields) != 5 {
2✔
175
                        return fmt.Errorf("invalid cron format: %s (expected 5 fields)", c.Schedule)
1✔
176
                }
1✔
177
        }
178

179
        if len(c.Tasks) == 0 {
2✔
180
                return fmt.Errorf("chain must have at least one task")
1✔
181
        }
1✔
182

183
        // Validate each task
184
        for i, task := range c.Tasks {
2✔
185
                if err := task.ValidateTask(); err != nil {
2✔
186
                        return fmt.Errorf("task %d: %w", i+1, err)
1✔
187
                }
1✔
188
        }
189

190
        return nil
1✔
191
}
192

193
// ValidateTask validates a YAML task configuration
194
func (t *YamlTask) ValidateTask() error {
1✔
195
        if t.Command == "" {
2✔
196
                return fmt.Errorf("task command is required")
1✔
197
        }
1✔
198

199
        // Validate kind
200
        switch strings.ToUpper(t.Kind) {
1✔
201
        case "", "SQL", "PROGRAM", "BUILTIN":
1✔
202
                // Valid kinds
203
        default:
1✔
204
                return fmt.Errorf("invalid task kind: %s (must be SQL, PROGRAM, or BUILTIN)", t.Kind)
1✔
205
        }
206

207
        // Validate timeout is non-negative
208
        if t.Timeout < 0 {
2✔
209
                return fmt.Errorf("task timeout must be non-negative")
1✔
210
        }
1✔
211

212
        return nil
1✔
213
}
214

215
// SetDefaults sets default values for optional fields
216
func (c *YamlChain) SetDefaults() {
1✔
217
        // Chain defaults
1✔
218
        if c.Schedule == "" {
2✔
219
                c.Schedule = "* * * * *" // Default to every minute
1✔
220
        }
1✔
221

222
        // Task defaults
223
        for i := range c.Tasks {
2✔
224
                task := &c.Tasks[i]
1✔
225
                if task.Kind == "" {
2✔
226
                        task.Kind = "SQL"
1✔
227
                }
1✔
228
        }
229
}
230

231
// ParseYamlFile parses a YAML file and returns the configuration
232
func ParseYamlFile(filePath string) (*YamlConfig, error) {
1✔
233
        // Check if file exists
1✔
234
        if _, err := os.Stat(filePath); os.IsNotExist(err) {
2✔
235
                return nil, fmt.Errorf("file not found: %s", filePath)
1✔
236
        }
1✔
237

238
        // Read file
239
        data, err := os.ReadFile(filePath)
1✔
240
        if err != nil {
2✔
241
                return nil, fmt.Errorf("failed to read file: %w", err)
1✔
242
        }
1✔
243

244
        // Parse YAML
245
        var config YamlConfig
1✔
246
        if err := yaml.Unmarshal(data, &config); err != nil {
2✔
247
                return nil, fmt.Errorf("failed to parse YAML: %w", err)
1✔
248
        }
1✔
249

250
        // Set defaults and validate
251
        for i := range config.Chains {
2✔
252
                chain := &config.Chains[i]
1✔
253
                chain.SetDefaults()
1✔
254
                if err := chain.ValidateChain(); err != nil {
2✔
255
                        return nil, fmt.Errorf("chain %d (%s): %w", i+1, chain.ChainName, err)
1✔
256
                }
1✔
257
        }
258

259
        return &config, nil
1✔
260
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc