• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cybertec-postgresql / pg_timetable / 18127629475

30 Sep 2025 10:59AM UTC coverage: 84.299% (-1.9%) from 86.155%
18127629475

Pull #720

github

web-flow
Merge f545f91b2 into 61518f6b4
Pull Request #720: [!] add YAML-based chain definitions

196 of 204 new or added lines in 7 files covered. (96.08%)

62 existing lines in 4 files now uncovered.

1659 of 1968 relevant lines covered (84.3%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.23
/internal/pgengine/transaction.go
1
package pgengine
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "errors"
7
        "fmt"
8
        "strconv"
9
        "strings"
10

11
        "github.com/cybertec-postgresql/pg_timetable/internal/log"
12
        pgx "github.com/jackc/pgx/v5"
13
        "github.com/jackc/pgx/v5/pgconn"
14
)
15

16
// StartTransaction returns transaction object, virtual transaction id and error
17
func (pge *PgEngine) StartTransaction(ctx context.Context) (tx pgx.Tx, vxid int64, err error) {
1✔
18
        if tx, err = pge.ConfigDb.Begin(ctx); err != nil {
2✔
19
                return
1✔
20
        }
1✔
21
        err = tx.QueryRow(ctx, `SELECT 
1✔
22
(split_part(virtualxid, '/', 1)::int8 << 32) | split_part(virtualxid, '/', 2)::int8
1✔
23
FROM pg_locks 
1✔
24
WHERE pid = pg_backend_pid() AND virtualxid IS NOT NULL`).Scan(&vxid)
1✔
25
        return
1✔
26
}
27

28
// CommitTransaction commits transaction and log error in the case of error
29
func (pge *PgEngine) CommitTransaction(ctx context.Context, tx pgx.Tx) {
1✔
30
        err := tx.Commit(ctx)
1✔
31
        if err != nil {
2✔
32
                log.GetLogger(ctx).WithError(err).Error("Application cannot commit after job finished")
1✔
33
        }
1✔
34
}
35

36
// RollbackTransaction rollbacks transaction and log error in the case of error
37
func (pge *PgEngine) RollbackTransaction(ctx context.Context, tx pgx.Tx) {
1✔
38
        err := tx.Rollback(ctx)
1✔
39
        if err != nil {
2✔
40
                log.GetLogger(ctx).WithError(err).Error("Application cannot rollback after job failed")
1✔
41
        }
1✔
42
}
43

44
// MustSavepoint creates SAVDEPOINT in transaction and log error in the case of error
45
func (pge *PgEngine) MustSavepoint(ctx context.Context, tx pgx.Tx, taskID int) {
1✔
46
        if _, err := tx.Exec(ctx, fmt.Sprintf("SAVEPOINT task_%d", taskID)); err != nil {
2✔
47
                log.GetLogger(ctx).WithError(err).Error("Savepoint failed")
1✔
48
        }
1✔
49
}
50

51
// MustRollbackToSavepoint rollbacks transaction to SAVEPOINT and log error in the case of error
52
func (pge *PgEngine) MustRollbackToSavepoint(ctx context.Context, tx pgx.Tx, taskID int) {
1✔
53
        if _, err := tx.Exec(ctx, fmt.Sprintf("ROLLBACK TO SAVEPOINT task_%d", taskID)); err != nil {
2✔
54
                log.GetLogger(ctx).WithError(err).Error("Rollback to savepoint failed")
1✔
55
        }
1✔
56
}
57

58
// ExecuteSQLTask executes SQL task
59
func (pge *PgEngine) ExecuteSQLTask(ctx context.Context, tx pgx.Tx, task *ChainTask, paramValues []string) (out string, err error) {
1✔
60
        switch {
1✔
61
        case task.IsRemote():
1✔
62
                return pge.ExecRemoteSQLTask(ctx, task, paramValues)
1✔
63
        case task.Autonomous:
1✔
64
                return pge.ExecAutonomousSQLTask(ctx, task, paramValues)
1✔
65
        default:
1✔
66
                return pge.ExecLocalSQLTask(ctx, tx, task, paramValues)
1✔
67
        }
68
}
69

70
// ExecLocalSQLTask executes local task in the chain transaction
71
func (pge *PgEngine) ExecLocalSQLTask(ctx context.Context, tx pgx.Tx, task *ChainTask, paramValues []string) (out string, err error) {
1✔
72
        if err := pge.SetRole(ctx, tx, task.RunAs); err != nil {
2✔
73
                return "", err
1✔
74
        }
1✔
75
        if task.IgnoreError {
2✔
76
                pge.MustSavepoint(ctx, tx, task.TaskID)
1✔
77
        }
1✔
78
        pge.SetCurrentTaskContext(ctx, tx, task.ChainID, task.TaskID)
1✔
79
        out, err = pge.ExecuteSQLCommand(ctx, tx, task.Command, paramValues)
1✔
80
        if err != nil && task.IgnoreError {
2✔
81
                pge.MustRollbackToSavepoint(ctx, tx, task.TaskID)
1✔
82
        }
1✔
83
        if task.RunAs > "" {
2✔
84
                pge.ResetRole(ctx, tx)
1✔
85
        }
1✔
86
        return
1✔
87
}
88

89
// ExecStandaloneTask executes task against the provided connection interface, it can be remote connection or acquired connection from the pool
90
func (pge *PgEngine) ExecStandaloneTask(ctx context.Context, connf func() (PgxConnIface, error), task *ChainTask, paramValues []string) (string, error) {
1✔
91
        conn, err := connf()
1✔
92
        if err != nil {
2✔
93
                return "", err
1✔
94
        }
1✔
95
        defer pge.FinalizeDBConnection(ctx, conn)
1✔
96
        if err := pge.SetRole(ctx, conn, task.RunAs); err != nil {
2✔
97
                return "", err
1✔
98
        }
1✔
99
        pge.SetCurrentTaskContext(ctx, conn, task.ChainID, task.TaskID)
1✔
100
        return pge.ExecuteSQLCommand(ctx, conn, task.Command, paramValues)
1✔
101
}
102

103
// ExecRemoteSQLTask executes task against remote connection
104
func (pge *PgEngine) ExecRemoteSQLTask(ctx context.Context, task *ChainTask, paramValues []string) (string, error) {
1✔
105
        log.GetLogger(ctx).Info("Switching to remote task mode")
1✔
106
        return pge.ExecStandaloneTask(ctx,
1✔
107
                func() (PgxConnIface, error) { return pge.GetRemoteDBConnection(ctx, task.ConnectString) },
2✔
108
                task, paramValues)
109
}
110

111
// ExecAutonomousSQLTask executes autonomous task in an acquired connection from pool
112
func (pge *PgEngine) ExecAutonomousSQLTask(ctx context.Context, task *ChainTask, paramValues []string) (string, error) {
1✔
113
        log.GetLogger(ctx).Info("Switching to autonomous task mode")
1✔
114
        return pge.ExecStandaloneTask(ctx,
1✔
115
                func() (PgxConnIface, error) { return pge.GetLocalDBConnection(ctx) },
2✔
116
                task, paramValues)
117
}
118

119
// ExecuteSQLCommand executes chain command with parameters inside transaction
120
func (pge *PgEngine) ExecuteSQLCommand(ctx context.Context, executor executor, command string, paramValues []string) (out string, err error) {
1✔
121
        var ct pgconn.CommandTag
1✔
122
        var params []any
1✔
123
        if strings.TrimSpace(command) == "" {
2✔
124
                return "", errors.New("SQL command cannot be empty")
1✔
125
        }
1✔
126
        if len(paramValues) == 0 { //mimic empty param
2✔
127
                ct, err = executor.Exec(ctx, command)
1✔
128
                out = ct.String()
1✔
129
                return
1✔
130
        }
1✔
131
        for _, val := range paramValues {
2✔
132
                if val > "" {
2✔
133
                        if err = json.Unmarshal([]byte(val), &params); err != nil {
2✔
134
                                return
1✔
135
                        }
1✔
136
                        ct, err = executor.Exec(ctx, command, params...)
1✔
137
                        out = out + ct.String() + "\n"
1✔
138
                }
139
        }
140
        return
1✔
141
}
142

143
// GetLocalDBConnection acquires a connection from a local pool and returns it
144
func (pge *PgEngine) GetLocalDBConnection(ctx context.Context) (conn PgxConnIface, err error) {
1✔
145
        c, err := pge.ConfigDb.Acquire(ctx)
1✔
146
        if err != nil {
2✔
147
                return nil, err
1✔
148
        }
1✔
UNCOV
149
        return c.Hijack(), nil
×
150
}
151

152
// GetRemoteDBConnection create a remote db connection object
153
func (pge *PgEngine) GetRemoteDBConnection(ctx context.Context, connectionString string) (conn PgxConnIface, err error) {
1✔
154
        conn, err = pgx.Connect(ctx, connectionString)
1✔
155
        if err != nil {
2✔
156
                return nil, err
1✔
157
        }
1✔
158
        log.GetLogger(ctx).Info("Remote connection established...")
1✔
159
        return
1✔
160
}
161

162
// FinalizeDBConnection closes session
163
func (pge *PgEngine) FinalizeDBConnection(ctx context.Context, remoteDb PgxConnIface) {
1✔
164
        l := log.GetLogger(ctx)
1✔
165
        l.Info("Closing standalone session")
1✔
166
        if err := remoteDb.Close(ctx); err != nil {
2✔
167
                l.WithError(err).Error("Cannot close database connection:", err)
1✔
168
        }
1✔
169
        remoteDb = nil
1✔
170
}
171

172
// SetRole - set the current user identifier of the current session
173
func (pge *PgEngine) SetRole(ctx context.Context, executor executor, runUID string) error {
1✔
174
        if strings.TrimSpace(runUID) == "" {
2✔
175
                return nil
1✔
176
        }
1✔
177
        log.GetLogger(ctx).Info("Setting role to ", runUID)
1✔
178
        _, err := executor.Exec(ctx, fmt.Sprintf("SET ROLE %v", runUID))
1✔
179
        return err
1✔
180
}
181

182
// ResetRole - RESET forms reset the current user identifier to be the current session user identifier
183
func (pge *PgEngine) ResetRole(ctx context.Context, executor executor) {
1✔
184
        l := log.GetLogger(ctx)
1✔
185
        l.Info("Resetting role")
1✔
186
        _, err := executor.Exec(ctx, `RESET ROLE`)
1✔
187
        if err != nil {
2✔
188
                l.WithError(err).Error("Failed to set a role", err)
1✔
189
        }
1✔
190
}
191

192
// SetCurrentTaskContext - set the working transaction "pg_timetable.current_task_id" run-time parameter
193
func (pge *PgEngine) SetCurrentTaskContext(ctx context.Context, executor executor, chainID int, taskID int) {
1✔
194
        _, err := executor.Exec(ctx, `SELECT set_config('pg_timetable.current_task_id', $1, false),
1✔
195
set_config('pg_timetable.current_chain_id', $2, false),
1✔
196
set_config('pg_timetable.current_client_name', $3, false)`, strconv.Itoa(taskID), strconv.Itoa(chainID), pge.ClientName)
1✔
197
        if err != nil {
2✔
198
                log.GetLogger(ctx).WithError(err).Error("Failed to set current task context", err)
1✔
199
        }
1✔
200
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc