• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cybertec-postgresql / pg_timetable / 18104647912

29 Sep 2025 04:59PM UTC coverage: 82.341% (-4.1%) from 86.435%
18104647912

Pull #720

github

web-flow
Merge ee6d7bcf6 into 61518f6b4
Pull Request #720: [!] add YAML-based chain definitions

181 of 235 new or added lines in 7 files covered. (77.02%)

65 existing lines in 4 files now uncovered.

1646 of 1999 relevant lines covered (82.34%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.47
/internal/pgengine/bootstrap.go
1
package pgengine
2

3
import (
4
        "context"
5
        "errors"
6
        "math/rand"
7
        "os"
8
        "path/filepath"
9
        "strings"
10
        "time"
11

12
        "github.com/cybertec-postgresql/pg_timetable/internal/config"
13
        "github.com/cybertec-postgresql/pg_timetable/internal/log"
14

15
        pgx "github.com/jackc/pgx/v5"
16
        pgconn "github.com/jackc/pgx/v5/pgconn"
17
        pgtype "github.com/jackc/pgx/v5/pgtype"
18
        pgxpool "github.com/jackc/pgx/v5/pgxpool"
19
        tracelog "github.com/jackc/pgx/v5/tracelog"
20
        retry "github.com/sethvargo/go-retry"
21
)
22

23
// WaitTime specifies amount of time in seconds to wait before reconnecting to DB
24
const WaitTime = 5 * time.Second
25

26
// maximum wait time before reconnect attempts
27
const maxWaitTime = WaitTime * 16
28

29
// create a new exponential backoff to be used in retry attempts
30
var backoff = retry.WithCappedDuration(maxWaitTime, retry.NewExponential(WaitTime))
31

32
// PgxIface is common interface for every pgx class
33
type PgxIface interface {
34
        Begin(ctx context.Context) (pgx.Tx, error)
35
        Exec(context.Context, string, ...any) (pgconn.CommandTag, error)
36
        QueryRow(context.Context, string, ...any) pgx.Row
37
        Query(ctx context.Context, query string, args ...any) (pgx.Rows, error)
38
        Ping(ctx context.Context) error
39
        CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)
40
}
41

42
// PgxConnIface is interface representing pgx connection
43
type PgxConnIface interface {
44
        PgxIface
45
        Close(ctx context.Context) error
46
}
47

48
// PgxPoolIface is interface representing pgx pool
49
type PgxPoolIface interface {
50
        PgxIface
51
        Acquire(ctx context.Context) (*pgxpool.Conn, error)
52
        Close()
53
}
54

55
// PgEngine is responsible for every database-related action
56
type PgEngine struct {
57
        l        log.LoggerHookerIface
58
        ConfigDb PgxPoolIface
59
        config.CmdOptions
60
        // NOTIFY messages passed verification are pushed to this channel
61
        chainSignalChan chan ChainSignal
62
        sid             int32
63
        logTypeOID      uint32
64
}
65

66
// Getsid returns the pseudo-random session ID to use for the session identification.
67
// Previously `os.Getpid()` used but this approach is not producing unique IDs for docker containers
68
// where all IDs are the same across all running containers, e.g. 1
69
func (pge *PgEngine) Getsid() int32 {
1✔
70
        if pge.sid == 0 {
2✔
71
                r := rand.New(rand.NewSource(time.Now().UnixNano()))
1✔
72
                pge.sid = r.Int31()
1✔
73
        }
1✔
74
        return pge.sid
1✔
75
}
76

77
var sqls = []string{sqlInit, sqlCron, sqlDDL, sqlJSONSchema, sqlJobFunctions}
78
var sqlNames = []string{"Schema Init", "Cron Functions", "Tables and Views", "JSON Schema", "Job Functions"}
79

80
// New opens connection and creates schema
81
func New(ctx context.Context, cmdOpts config.CmdOptions, logger log.LoggerHookerIface) (*PgEngine, error) {
1✔
82
        var (
1✔
83
                err error
1✔
84
        )
1✔
85
        pge := &PgEngine{
1✔
86
                l:               logger,
1✔
87
                ConfigDb:        nil,
1✔
88
                CmdOptions:      cmdOpts,
1✔
89
                chainSignalChan: make(chan ChainSignal, 64),
1✔
90
        }
1✔
91
        pge.l.WithField("sid", pge.Getsid()).Info("Starting new session... ")
1✔
92
        config := pge.getPgxConnConfig()
1✔
93
        if err = retry.Do(ctx, backoff, func(ctx context.Context) error {
2✔
94
                if pge.ConfigDb, err = pgxpool.NewWithConfig(ctx, config); err == nil {
2✔
95
                        err = pge.ConfigDb.Ping(ctx)
1✔
96
                }
1✔
97
                if err != nil {
2✔
98
                        pge.l.WithError(err).Error("Connection failed")
1✔
99
                        pge.l.Info("Sleeping before reconnecting...")
1✔
100
                        return retry.RetryableError(err)
1✔
101
                }
1✔
102
                return nil
1✔
103
        }); err != nil {
1✔
104
                return nil, err
1✔
105
        }
1✔
106
        pge.l.Info("Database connection established")
1✔
107
        if err := pge.ExecuteSchemaScripts(ctx); err != nil {
1✔
108
                return nil, err
×
109
        }
×
110
        pge.AddLogHook(ctx) //schema exists, we can log now
1✔
111
        if cmdOpts.Start.File != "" {
1✔
NEW
112
                if err := pge.ExecuteFileScript(ctx, cmdOpts); err != nil {
×
113
                        return nil, err
×
114
                }
×
115
        }
116
        return pge, nil
1✔
117
}
118

119
// NewDB creates pgengine instance for already opened database connection, allowing to bypass a parameters based credentials.
120
// We assume here all checks for proper schema validation are done beforehannd
121
func NewDB(DB PgxPoolIface, args ...string) *PgEngine {
1✔
122
        return &PgEngine{
1✔
123
                l:               log.Init(config.LoggingOpts{LogLevel: "error"}),
1✔
124
                ConfigDb:        DB,
1✔
125
                CmdOptions:      *config.NewCmdOptions(args...),
1✔
126
                chainSignalChan: make(chan ChainSignal, 64),
1✔
127
        }
1✔
128
}
1✔
129

130
func quoteIdent(s string) string {
1✔
131
        return `"` + strings.ReplaceAll(s, `"`, `""`) + `"`
1✔
132
}
1✔
133

134
// getPgxConnConfig transforms standard connestion string to pgx specific one with
135
func (pge *PgEngine) getPgxConnConfig() *pgxpool.Config {
1✔
136
        connConfig, err := pgxpool.ParseConfig(pge.ConnStr)
1✔
137
        if err != nil {
1✔
138
                pge.l.WithError(err).Error("Cannot parse connection string")
×
139
                return nil
×
140
        }
×
141
        // in the worst scenario we need separate connections for each of workers,
142
        // separate connection for Scheduler.retrieveChainsAndRun(),
143
        // separate connection for Scheduler.retrieveIntervalChainsAndRun(),
144
        // and another connection for LogHook.send()
145
        connConfig.MaxConns = int32(pge.Resource.CronWorkers) + int32(pge.Resource.IntervalWorkers) + 3
1✔
146
        connConfig.ConnConfig.RuntimeParams["application_name"] = "pg_timetable"
1✔
147
        connConfig.ConnConfig.OnNotice = func(_ *pgconn.PgConn, n *pgconn.Notice) {
2✔
148
                pge.l.WithField("severity", n.Severity).WithField("notice", n.Message).Info("Notice received")
1✔
149
        }
1✔
150
        connConfig.AfterConnect = func(ctx context.Context, pgconn *pgx.Conn) (err error) {
2✔
151
                pge.l.WithField("pid", pgconn.PgConn().PID()).
1✔
152
                        WithField("client", pge.ClientName).
1✔
153
                        Debug("Trying to get lock for the session")
1✔
154
                if err = pge.TryLockClientName(ctx, pgconn); err != nil {
1✔
155
                        return err
×
156
                }
×
157

158
                _, err = pgconn.Exec(ctx, "LISTEN "+quoteIdent(pge.ClientName))
1✔
159
                if pge.logTypeOID == InvalidOid {
2✔
160
                        err = pgconn.QueryRow(ctx, "select coalesce(to_regtype('timetable.log_type')::oid, 0)").Scan(&pge.logTypeOID)
1✔
161
                }
1✔
162
                pgconn.TypeMap().RegisterType(&pgtype.Type{Name: "timetable.log_type", OID: pge.logTypeOID, Codec: &pgtype.EnumCodec{}})
1✔
163
                return err
1✔
164
        }
165
        // reset session before returning connection to the pool
166
        // after task completion, if the task is not properly finalized (especially when running in autonomous mode),
167
        // some objects and/or setting changes will still exist in the session
168
        connConfig.AfterRelease = func(pgconn *pgx.Conn) bool {
2✔
169
                var err error
1✔
170
                if pgconn.DeallocateAll(context.Background()) != nil {
1✔
171
                        return false // destroy the connection in case of error
×
172
                }
×
173
                if _, err = pgconn.Exec(context.Background(), "DISCARD ALL"); err == nil {
2✔
174
                        _, err = pgconn.Exec(context.Background(), "LISTEN "+quoteIdent(pge.ClientName))
1✔
175
                }
1✔
176
                return err == nil
1✔
177
        }
178

179
        if !pge.Start.Debug { //will handle notification in HandleNotifications directly
2✔
180
                connConfig.ConnConfig.OnNotification = pge.NotificationHandler
1✔
181
        }
1✔
182
        tracelogger := &tracelog.TraceLog{
1✔
183
                Logger:   log.NewPgxLogger(pge.l),
1✔
184
                LogLevel: map[bool]tracelog.LogLevel{false: tracelog.LogLevelWarn, true: tracelog.LogLevelDebug}[pge.Verbose()],
1✔
185
        }
1✔
186
        connConfig.ConnConfig.Tracer = tracelogger
1✔
187
        return connConfig
1✔
188
}
189

190
// AddLogHook adds a new pgx log hook to logrus logger
191
func (pge *PgEngine) AddLogHook(ctx context.Context) {
1✔
192
        pge.l.AddHook(NewHook(ctx, pge, pge.Logging.LogDBLevel))
1✔
193
}
1✔
194

195
// QueryRowIface specifies interface to use QueryRow method
196
type QueryRowIface interface {
197
        QueryRow(context.Context, string, ...any) pgx.Row
198
}
199

200
// TryLockClientName obtains lock on the server to prevent another client with the same name
201
func (pge *PgEngine) TryLockClientName(ctx context.Context, conn QueryRowIface) error {
1✔
202
        sql := "SELECT COALESCE(to_regproc('timetable.try_lock_client_name')::int4, 0)"
1✔
203
        var procoid int // check if the schema is available first
1✔
204
        if err := conn.QueryRow(ctx, sql).Scan(&procoid); err != nil {
2✔
205
                return err
1✔
206
        }
1✔
207
        if procoid == 0 { //there is no schema yet, will lock after bootstrapping
2✔
208
                pge.l.Debug("There is no schema yet, will lock after bootstrapping")
1✔
209
                return nil
1✔
210
        }
1✔
211
        sql = "SELECT timetable.try_lock_client_name($1, $2)"
1✔
212
        var locked bool
1✔
213
        if e := conn.QueryRow(ctx, sql, pge.Getsid(), pge.ClientName).Scan(&locked); e != nil {
2✔
214
                return e
1✔
215
        } else if !locked {
2✔
216
                return errors.New("cannot obtain lock for a session")
×
217
        }
×
218
        return nil
1✔
219
}
220

221
// ExecuteFileScript handles both SQL and YAML files based on file extension
NEW
222
func (pge *PgEngine) ExecuteFileScript(ctx context.Context, cmdOpts config.CmdOptions) error {
×
NEW
223
        filePath := cmdOpts.Start.File
×
NEW
224

×
NEW
225
        // Determine file type by extension
×
NEW
226
        fileExt := strings.ToLower(filepath.Ext(filePath))
×
NEW
227

×
NEW
228
        switch fileExt {
×
NEW
229
        case ".yaml", ".yml":
×
NEW
230
                // Handle YAML chain definition files
×
NEW
231
                if cmdOpts.Start.Validate {
×
NEW
232
                        // Only validate, don't import
×
NEW
233
                        _, err := ParseYamlFile(filePath)
×
NEW
234
                        if err != nil {
×
NEW
235
                                pge.l.WithError(err).Error("YAML validation failed")
×
NEW
236
                                return err
×
NEW
237
                        }
×
NEW
238
                        pge.l.WithField("file", filePath).Info("YAML file validation successful")
×
NEW
239
                        return nil
×
240
                }
241

242
                // Import YAML chains
NEW
243
                return pge.LoadYamlChains(ctx, filePath, cmdOpts.Start.Replace)
×
244

NEW
245
        case ".sql":
×
NEW
246
                // Handle SQL script files (existing behavior)
×
NEW
247
                return pge.ExecuteCustomScripts(ctx, filePath)
×
248

NEW
249
        default:
×
NEW
250
                // Try to detect content type for files without extension
×
NEW
251
                content, err := os.ReadFile(filePath)
×
NEW
252
                if err != nil {
×
NEW
253
                        pge.l.WithError(err).Error("cannot read file")
×
NEW
254
                        return err
×
NEW
255
                }
×
256

257
                // Check if it looks like YAML (starts with "chains:" or contains YAML markers)
NEW
258
                contentStr := strings.TrimSpace(string(content))
×
NEW
259
                if strings.HasPrefix(contentStr, "chains:") {
×
NEW
260
                        pge.l.WithField("file", filePath).Info("Detected YAML content, processing as YAML")
×
NEW
261
                        return pge.LoadYamlChains(ctx, filePath, false)
×
NEW
262
                }
×
NEW
263
                pge.l.WithField("file", filePath).Info("Processing as SQL script")
×
NEW
264
                return pge.ExecuteCustomScripts(ctx, filePath)
×
265
        }
266
}
267

268
// ExecuteCustomScripts executes SQL scripts in files
269
func (pge *PgEngine) ExecuteCustomScripts(ctx context.Context, filename ...string) error {
1✔
270
        for _, f := range filename {
2✔
271
                sql, err := os.ReadFile(f)
1✔
272
                if err != nil {
2✔
273
                        pge.l.WithError(err).Error("cannot read command file")
1✔
274
                        return err
1✔
275
                }
1✔
276
                pge.l.Info("Executing script: ", f)
1✔
277
                if _, err = pge.ConfigDb.Exec(ctx, string(sql)); err != nil {
2✔
278
                        pge.l.WithError(err).Error("script execution failed")
1✔
279
                        return err
1✔
280
                }
1✔
281
                pge.l.Info("Script file executed: ", f)
1✔
282
        }
283
        return nil
1✔
284
}
285

286
// ExecuteSchemaScripts executes initial schema scripts
287
func (pge *PgEngine) ExecuteSchemaScripts(ctx context.Context) error {
1✔
288
        var exists bool
1✔
289
        err := pge.ConfigDb.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'timetable')").Scan(&exists)
1✔
290
        if err != nil {
2✔
291
                return err
1✔
292
        }
1✔
293
        if !exists {
2✔
294
                for i, sql := range sqls {
2✔
295
                        sqlName := sqlNames[i]
1✔
296
                        pge.l.Info("Executing script: ", sqlName)
1✔
297
                        if _, err = pge.ConfigDb.Exec(ctx, sql); err != nil {
2✔
298
                                pge.l.WithError(err).Error("Script execution failed")
1✔
299
                                pge.l.Warn("Dropping \"timetable\" schema...")
1✔
300
                                _, err = pge.ConfigDb.Exec(ctx, "DROP SCHEMA IF EXISTS timetable CASCADE")
1✔
301
                                if err != nil {
2✔
302
                                        pge.l.WithError(err).Error("Schema dropping failed")
1✔
303
                                }
1✔
304
                                return err
1✔
305
                        }
306
                        pge.l.Info("Schema file executed: " + sqlName)
1✔
307
                }
308
                pge.l.Info("Configuration schema created...")
1✔
309
        }
310
        return nil
1✔
311
}
312

313
// Finalize closes session
314
func (pge *PgEngine) Finalize() {
1✔
315
        pge.l.Info("Closing session")
1✔
316
        sql := `WITH del_ch AS (DELETE FROM timetable.active_chain WHERE client_name = $1)
1✔
317
DELETE FROM timetable.active_session WHERE client_name = $1`
1✔
318
        _, err := pge.ConfigDb.Exec(context.Background(), sql, pge.ClientName)
1✔
319
        if err != nil {
1✔
320
                pge.l.WithError(err).Error("Cannot finalize database session")
×
321
        }
×
322
        pge.ConfigDb.Close()
1✔
323
        pge.ConfigDb = nil
1✔
324
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc