• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cybertec-postgresql / pg_timetable / 18127629475

30 Sep 2025 10:59AM UTC coverage: 84.299% (-1.9%) from 86.155%
18127629475

Pull #720

github

web-flow
Merge f545f91b2 into 61518f6b4
Pull Request #720: [!] add YAML-based chain definitions

196 of 204 new or added lines in 7 files covered. (96.08%)

62 existing lines in 4 files now uncovered.

1659 of 1968 relevant lines covered (84.3%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.3
/internal/pgengine/bootstrap.go
1
package pgengine
2

3
import (
4
        "context"
5
        "errors"
6
        "math/rand"
7
        "os"
8
        "path/filepath"
9
        "strings"
10
        "time"
11

12
        "github.com/cybertec-postgresql/pg_timetable/internal/config"
13
        "github.com/cybertec-postgresql/pg_timetable/internal/log"
14

15
        pgx "github.com/jackc/pgx/v5"
16
        pgconn "github.com/jackc/pgx/v5/pgconn"
17
        pgtype "github.com/jackc/pgx/v5/pgtype"
18
        pgxpool "github.com/jackc/pgx/v5/pgxpool"
19
        tracelog "github.com/jackc/pgx/v5/tracelog"
20
        retry "github.com/sethvargo/go-retry"
21
)
22

23
// WaitTime specifies amount of time in seconds to wait before reconnecting to DB
24
const WaitTime = 5 * time.Second
25

26
// maximum wait time before reconnect attempts
27
const maxWaitTime = WaitTime * 16
28

29
// create a new exponential backoff to be used in retry attempts
30
var backoff = retry.WithCappedDuration(maxWaitTime, retry.NewExponential(WaitTime))
31

32
// PgxIface is common interface for every pgx class
33
type PgxIface interface {
34
        Begin(ctx context.Context) (pgx.Tx, error)
35
        Exec(context.Context, string, ...any) (pgconn.CommandTag, error)
36
        QueryRow(context.Context, string, ...any) pgx.Row
37
        Query(ctx context.Context, query string, args ...any) (pgx.Rows, error)
38
        Ping(ctx context.Context) error
39
        CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)
40
}
41

42
// PgxConnIface is interface representing pgx connection
43
type PgxConnIface interface {
44
        PgxIface
45
        Close(ctx context.Context) error
46
}
47

48
// PgxPoolIface is interface representing pgx pool
49
type PgxPoolIface interface {
50
        PgxIface
51
        Acquire(ctx context.Context) (*pgxpool.Conn, error)
52
        Close()
53
}
54

55
// PgEngine is responsible for every database-related action
56
type PgEngine struct {
57
        l        log.LoggerHookerIface
58
        ConfigDb PgxPoolIface
59
        config.CmdOptions
60
        // NOTIFY messages passed verification are pushed to this channel
61
        chainSignalChan chan ChainSignal
62
        sid             int32
63
        logTypeOID      uint32
64
}
65

66
// Getsid returns the pseudo-random session ID to use for the session identification.
67
// Previously `os.Getpid()` used but this approach is not producing unique IDs for docker containers
68
// where all IDs are the same across all running containers, e.g. 1
69
func (pge *PgEngine) Getsid() int32 {
1✔
70
        if pge.sid == 0 {
2✔
71
                r := rand.New(rand.NewSource(time.Now().UnixNano()))
1✔
72
                pge.sid = r.Int31()
1✔
73
        }
1✔
74
        return pge.sid
1✔
75
}
76

77
var sqls = []string{sqlInit, sqlCron, sqlDDL, sqlJSONSchema, sqlJobFunctions}
78
var sqlNames = []string{"Schema Init", "Cron Functions", "Tables and Views", "JSON Schema", "Job Functions"}
79

80
// New opens connection and creates schema
81
func New(ctx context.Context, cmdOpts config.CmdOptions, logger log.LoggerHookerIface) (*PgEngine, error) {
1✔
82
        var (
1✔
83
                err error
1✔
84
        )
1✔
85
        pge := &PgEngine{
1✔
86
                l:               logger,
1✔
87
                ConfigDb:        nil,
1✔
88
                CmdOptions:      cmdOpts,
1✔
89
                chainSignalChan: make(chan ChainSignal, 64),
1✔
90
        }
1✔
91
        pge.l.WithField("sid", pge.Getsid()).Info("Starting new session... ")
1✔
92
        config := pge.getPgxConnConfig()
1✔
93
        if err = retry.Do(ctx, backoff, func(ctx context.Context) error {
2✔
94
                if pge.ConfigDb, err = pgxpool.NewWithConfig(ctx, config); err == nil {
2✔
95
                        err = pge.ConfigDb.Ping(ctx)
1✔
96
                }
1✔
97
                if err != nil {
2✔
98
                        pge.l.WithError(err).Error("Connection failed")
1✔
99
                        pge.l.Info("Sleeping before reconnecting...")
1✔
100
                        return retry.RetryableError(err)
1✔
101
                }
1✔
102
                return nil
1✔
103
        }); err != nil {
1✔
104
                return nil, err
1✔
105
        }
1✔
106
        pge.l.Info("Database connection established")
1✔
107
        if err := pge.ExecuteSchemaScripts(ctx); err != nil {
1✔
108
                return nil, err
×
109
        }
×
110
        pge.AddLogHook(ctx) //schema exists, we can log now
1✔
111
        if cmdOpts.Start.File != "" {
1✔
NEW
112
                if err := pge.ExecuteFileScript(ctx, cmdOpts); err != nil {
×
113
                        return nil, err
×
114
                }
×
115
        }
116
        return pge, nil
1✔
117
}
118

119
// NewDB creates pgengine instance for already opened database connection, allowing to bypass a parameters based credentials.
120
// We assume here all checks for proper schema validation are done beforehannd
121
func NewDB(DB PgxPoolIface, args ...string) *PgEngine {
1✔
122
        return &PgEngine{
1✔
123
                l:               log.Init(config.LoggingOpts{LogLevel: "error"}),
1✔
124
                ConfigDb:        DB,
1✔
125
                CmdOptions:      *config.NewCmdOptions(args...),
1✔
126
                chainSignalChan: make(chan ChainSignal, 64),
1✔
127
        }
1✔
128
}
1✔
129

130
func quoteIdent(s string) string {
1✔
131
        return `"` + strings.ReplaceAll(s, `"`, `""`) + `"`
1✔
132
}
1✔
133

134
// getPgxConnConfig transforms standard connestion string to pgx specific one with
135
func (pge *PgEngine) getPgxConnConfig() *pgxpool.Config {
1✔
136
        connConfig, err := pgxpool.ParseConfig(pge.ConnStr)
1✔
137
        if err != nil {
1✔
138
                pge.l.WithError(err).Error("Cannot parse connection string")
×
139
                return nil
×
140
        }
×
141
        // in the worst scenario we need separate connections for each of workers,
142
        // separate connection for Scheduler.retrieveChainsAndRun(),
143
        // separate connection for Scheduler.retrieveIntervalChainsAndRun(),
144
        // and another connection for LogHook.send()
145
        connConfig.MaxConns = int32(pge.Resource.CronWorkers) + int32(pge.Resource.IntervalWorkers) + 3
1✔
146
        connConfig.ConnConfig.RuntimeParams["application_name"] = "pg_timetable"
1✔
147
        connConfig.ConnConfig.OnNotice = func(_ *pgconn.PgConn, n *pgconn.Notice) {
2✔
148
                pge.l.WithField("severity", n.Severity).WithField("notice", n.Message).Info("Notice received")
1✔
149
        }
1✔
150
        connConfig.AfterConnect = func(ctx context.Context, pgconn *pgx.Conn) (err error) {
2✔
151
                pge.l.WithField("pid", pgconn.PgConn().PID()).
1✔
152
                        WithField("client", pge.ClientName).
1✔
153
                        Debug("Trying to get lock for the session")
1✔
154
                if err = pge.TryLockClientName(ctx, pgconn); err != nil {
1✔
155
                        return err
×
156
                }
×
157

158
                _, err = pgconn.Exec(ctx, "LISTEN "+quoteIdent(pge.ClientName))
1✔
159
                if pge.logTypeOID == InvalidOid {
2✔
160
                        err = pgconn.QueryRow(ctx, "select coalesce(to_regtype('timetable.log_type')::oid, 0)").Scan(&pge.logTypeOID)
1✔
161
                }
1✔
162
                pgconn.TypeMap().RegisterType(&pgtype.Type{Name: "timetable.log_type", OID: pge.logTypeOID, Codec: &pgtype.EnumCodec{}})
1✔
163
                return err
1✔
164
        }
165
        // reset session before returning connection to the pool
166
        // after task completion, if the task is not properly finalized (especially when running in autonomous mode),
167
        // some objects and/or setting changes will still exist in the session
168
        connConfig.AfterRelease = func(pgconn *pgx.Conn) bool {
2✔
169
                var err error
1✔
170
                if pgconn.DeallocateAll(context.Background()) != nil {
1✔
171
                        return false // destroy the connection in case of error
×
172
                }
×
173
                if _, err = pgconn.Exec(context.Background(), "DISCARD ALL"); err == nil {
2✔
174
                        _, err = pgconn.Exec(context.Background(), "LISTEN "+quoteIdent(pge.ClientName))
1✔
175
                }
1✔
176
                return err == nil
1✔
177
        }
178

179
        if !pge.Start.Debug { //will handle notification in HandleNotifications directly
2✔
180
                connConfig.ConnConfig.OnNotification = pge.NotificationHandler
1✔
181
        }
1✔
182
        tracelogger := &tracelog.TraceLog{
1✔
183
                Logger:   log.NewPgxLogger(pge.l),
1✔
184
                LogLevel: map[bool]tracelog.LogLevel{false: tracelog.LogLevelWarn, true: tracelog.LogLevelDebug}[pge.Verbose()],
1✔
185
        }
1✔
186
        connConfig.ConnConfig.Tracer = tracelogger
1✔
187
        return connConfig
1✔
188
}
189

190
// AddLogHook adds a new pgx log hook to logrus logger
191
func (pge *PgEngine) AddLogHook(ctx context.Context) {
1✔
192
        pge.l.AddHook(NewHook(ctx, pge, pge.Logging.LogDBLevel))
1✔
193
}
1✔
194

195
// QueryRowIface specifies interface to use QueryRow method
196
type QueryRowIface interface {
197
        QueryRow(context.Context, string, ...any) pgx.Row
198
}
199

200
// TryLockClientName obtains lock on the server to prevent another client with the same name
201
func (pge *PgEngine) TryLockClientName(ctx context.Context, conn QueryRowIface) error {
1✔
202
        sql := "SELECT COALESCE(to_regproc('timetable.try_lock_client_name')::int4, 0)"
1✔
203
        var procoid int // check if the schema is available first
1✔
204
        if err := conn.QueryRow(ctx, sql).Scan(&procoid); err != nil {
2✔
205
                return err
1✔
206
        }
1✔
207
        if procoid == 0 { //there is no schema yet, will lock after bootstrapping
2✔
208
                pge.l.Debug("There is no schema yet, will lock after bootstrapping")
1✔
209
                return nil
1✔
210
        }
1✔
211
        sql = "SELECT timetable.try_lock_client_name($1, $2)"
1✔
212
        var locked bool
1✔
213
        if e := conn.QueryRow(ctx, sql, pge.Getsid(), pge.ClientName).Scan(&locked); e != nil {
2✔
214
                return e
1✔
215
        } else if !locked {
2✔
216
                return errors.New("cannot obtain lock for a session")
×
217
        }
×
218
        return nil
1✔
219
}
220

221
// ExecuteFileScript handles both SQL and YAML files based on file extension
222
func (pge *PgEngine) ExecuteFileScript(ctx context.Context, cmdOpts config.CmdOptions) error {
1✔
223
        filePath := cmdOpts.Start.File
1✔
224

1✔
225
        // Determine file type by extension
1✔
226
        fileExt := strings.ToLower(filepath.Ext(filePath))
1✔
227

1✔
228
        switch fileExt {
1✔
229
        case ".yaml", ".yml":
1✔
230
                // Handle YAML chain definition files
1✔
231
                if cmdOpts.Start.Validate {
2✔
232
                        // Only validate, don't import
1✔
233
                        _, err := ParseYamlFile(filePath)
1✔
234
                        if err != nil {
2✔
235
                                pge.l.WithError(err).Error("YAML validation failed")
1✔
236
                                return err
1✔
237
                        }
1✔
238
                        pge.l.WithField("file", filePath).Info("YAML file validation successful")
1✔
239
                        return nil
1✔
240
                }
241

242
                // Import YAML chains
243
                return pge.LoadYamlChains(ctx, filePath, cmdOpts.Start.Replace)
1✔
244

245
        case ".sql":
1✔
246
                // Handle SQL script files (existing behavior)
1✔
247
                return pge.ExecuteCustomScripts(ctx, filePath)
1✔
248

249
        default:
1✔
250
                return errors.New("unsupported file extension: " + fileExt)
1✔
251
        }
252
}
253

254
// ExecuteCustomScripts executes SQL scripts in files
255
func (pge *PgEngine) ExecuteCustomScripts(ctx context.Context, filename ...string) error {
1✔
256
        for _, f := range filename {
2✔
257
                sql, err := os.ReadFile(f)
1✔
258
                if err != nil {
2✔
259
                        pge.l.WithError(err).Error("cannot read command file")
1✔
260
                        return err
1✔
261
                }
1✔
262
                pge.l.Info("Executing script: ", f)
1✔
263
                if _, err = pge.ConfigDb.Exec(ctx, string(sql)); err != nil {
2✔
264
                        pge.l.WithError(err).Error("script execution failed")
1✔
265
                        return err
1✔
266
                }
1✔
267
                pge.l.Info("Script file executed: ", f)
1✔
268
        }
269
        return nil
1✔
270
}
271

272
// ExecuteSchemaScripts executes initial schema scripts
273
func (pge *PgEngine) ExecuteSchemaScripts(ctx context.Context) error {
1✔
274
        var exists bool
1✔
275
        err := pge.ConfigDb.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'timetable')").Scan(&exists)
1✔
276
        if err != nil {
2✔
277
                return err
1✔
278
        }
1✔
279
        if !exists {
2✔
280
                for i, sql := range sqls {
2✔
281
                        sqlName := sqlNames[i]
1✔
282
                        pge.l.Info("Executing script: ", sqlName)
1✔
283
                        if _, err = pge.ConfigDb.Exec(ctx, sql); err != nil {
2✔
284
                                pge.l.WithError(err).Error("Script execution failed")
1✔
285
                                pge.l.Warn("Dropping \"timetable\" schema...")
1✔
286
                                _, err = pge.ConfigDb.Exec(ctx, "DROP SCHEMA IF EXISTS timetable CASCADE")
1✔
287
                                if err != nil {
2✔
288
                                        pge.l.WithError(err).Error("Schema dropping failed")
1✔
289
                                }
1✔
290
                                return err
1✔
291
                        }
292
                        pge.l.Info("Schema file executed: " + sqlName)
1✔
293
                }
294
                pge.l.Info("Configuration schema created...")
1✔
295
        }
296
        return nil
1✔
297
}
298

299
// Finalize closes session
300
func (pge *PgEngine) Finalize() {
1✔
301
        pge.l.Info("Closing session")
1✔
302
        sql := `WITH del_ch AS (DELETE FROM timetable.active_chain WHERE client_name = $1)
1✔
303
DELETE FROM timetable.active_session WHERE client_name = $1`
1✔
304
        _, err := pge.ConfigDb.Exec(context.Background(), sql, pge.ClientName)
1✔
305
        if err != nil {
1✔
306
                pge.l.WithError(err).Error("Cannot finalize database session")
×
307
        }
×
308
        pge.ConfigDb.Close()
1✔
309
        pge.ConfigDb = nil
1✔
310
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc