• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Permify / permify / 19380238762

14 Nov 2025 11:16PM UTC coverage: 85.869% (-0.01%) from 85.879%
19380238762

Pull #2609

github

tolgaozen
fix(database): align comments with actual backward compatibility behavior
Pull Request #2609: feat(database): expose additional pgxpool connection pool configurati…

65 of 76 new or added lines in 3 files covered. (85.53%)

1 existing line in 1 file now uncovered.

9644 of 11231 relevant lines covered (85.87%)

288.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.42
/pkg/database/postgres/postgres.go
1
package postgres
2

3
import (
4
        "context"
5
        "fmt"
6
        "log/slog" // Structured logging
7
        "strings"
8
        "time"
9

10
        "github.com/cenkalti/backoff/v4"
11

12
        "github.com/exaring/otelpgx"
13

14
        "github.com/jackc/pgx/v5"
15

16
        "github.com/jackc/pgx/v5/pgxpool"
17

18
        "github.com/Masterminds/squirrel"
19
)
20

21
// Postgres - Structure for Postresql instance
22
type Postgres struct {
23
        ReadPool  *pgxpool.Pool
24
        WritePool *pgxpool.Pool
25

26
        Builder squirrel.StatementBuilderType
27
        // options
28
        maxDataPerWrite       int
29
        maxRetries            int
30
        watchBufferSize       int
31
        maxConnectionLifeTime time.Duration
32
        maxConnectionIdleTime time.Duration
33
        maxConns              int // Maximum number of connections in the pool (maps to pgxpool MaxConns)
34
        maxIdleConnections    int // Deprecated: Use MinConns instead. Kept for backward compatibility (maps to MinConns if MinConns is not set).
35
        minConns              int // Minimum number of connections in the pool (maps to pgxpool MinConns)
36
        minIdleConns          int // Minimum number of idle connections in the pool (maps to pgxpool MinIdleConns)
37
        healthCheckPeriod     time.Duration
38
        maxConnLifetimeJitter time.Duration
39
        connectTimeout        time.Duration
40
}
41

42
// New -
43
func New(uri string, opts ...Option) (*Postgres, error) {
7✔
44
        return newDB(uri, uri, opts...)
7✔
45
}
7✔
46

47
// NewWithSeparateURIs -
48
func NewWithSeparateURIs(writerUri, readerUri string, opts ...Option) (*Postgres, error) {
3✔
49
        return newDB(writerUri, readerUri, opts...)
3✔
50
}
3✔
51

52
// new - Creates new postgresql db instance
53
func newDB(writerUri, readerUri string, opts ...Option) (*Postgres, error) {
10✔
54
        pg := &Postgres{
10✔
55
                maxConns:              _defaultMaxConns,
10✔
56
                maxIdleConnections:    _defaultMaxIdleConnections,
10✔
57
                minConns:              _defaultMinConns,
10✔
58
                minIdleConns:          _defaultMinIdleConns,
10✔
59
                maxDataPerWrite:       _defaultMaxDataPerWrite,
10✔
60
                maxRetries:            _defaultMaxRetries,
10✔
61
                watchBufferSize:       _defaultWatchBufferSize,
10✔
62
                healthCheckPeriod:     _defaultHealthCheckPeriod,
10✔
63
                maxConnLifetimeJitter: _defaultMaxConnLifetimeJitter,
10✔
64
                connectTimeout:        _defaultConnectTimeout,
10✔
65
        }
10✔
66

10✔
67
        // Custom options
10✔
68
        for _, opt := range opts {
30✔
69
                opt(pg)
20✔
70
        }
20✔
71

72
        pg.Builder = squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar)
10✔
73

10✔
74
        writeConfig, err := pgxpool.ParseConfig(writerUri)
10✔
75
        if err != nil {
12✔
76
                return nil, err
2✔
77
        }
2✔
78

79
        readConfig, err := pgxpool.ParseConfig(readerUri)
8✔
80
        if err != nil {
9✔
81
                return nil, err
1✔
82
        }
1✔
83

84
        // Set the default execution mode for queries using the write and read configurations.
85
        setDefaultQueryExecMode(writeConfig.ConnConfig)
7✔
86
        setDefaultQueryExecMode(readConfig.ConnConfig)
7✔
87

7✔
88
        // Set the plan cache mode for both write and read configurations to optimize query planning.
7✔
89
        setPlanCacheMode(writeConfig.ConnConfig)
7✔
90
        setPlanCacheMode(readConfig.ConnConfig)
7✔
91

7✔
92
        // Set the minimum number of connections in the pool for both write and read configurations.
7✔
93
        // For backward compatibility: if MinConns is not set (0) but MaxIdleConnections is set, use MaxIdleConnections (old behavior).
7✔
94
        minConns := pg.minConns
7✔
95
        if minConns == 0 && pg.maxIdleConnections > 0 {
12✔
96
                minConns = pg.maxIdleConnections
5✔
97
        }
5✔
98
        if minConns > 0 {
12✔
99
                writeConfig.MinConns = int32(minConns)
5✔
100
                readConfig.MinConns = int32(minConns)
5✔
101
        }
5✔
102

103
        // Set the minimum number of idle connections in the pool.
104
        // Note: MinIdleConns was not set in the old code, so we only set it if explicitly configured.
105
        if pg.minIdleConns > 0 {
7✔
NEW
106
                writeConfig.MinIdleConns = int32(pg.minIdleConns)
×
NEW
107
                readConfig.MinIdleConns = int32(pg.minIdleConns)
×
NEW
108
        }
×
109

110
        // Set the maximum number of connections in the pool for both write and read configurations.
111
        // pgxpool default is 0 (unlimited), so only set if explicitly configured.
112
        // Note: MaxOpenConnections is already mapped to MaxConns via options.go, so no backward compatibility needed here.
113
        if pg.maxConns > 0 {
12✔
114
                writeConfig.MaxConns = int32(pg.maxConns)
5✔
115
                readConfig.MaxConns = int32(pg.maxConns)
5✔
116
        }
5✔
117

118
        // Set the maximum amount of time a connection may be idle before being closed for both configurations.
119
        writeConfig.MaxConnIdleTime = pg.maxConnectionIdleTime
7✔
120
        readConfig.MaxConnIdleTime = pg.maxConnectionIdleTime
7✔
121

7✔
122
        // Set the maximum lifetime of a connection in the pool for both configurations.
7✔
123
        writeConfig.MaxConnLifetime = pg.maxConnectionLifeTime
7✔
124
        readConfig.MaxConnLifetime = pg.maxConnectionLifeTime
7✔
125

7✔
126
        // Set a jitter to the maximum connection lifetime to prevent all connections from expiring at the same time.
7✔
127
        if pg.maxConnLifetimeJitter > 0 {
7✔
NEW
128
                writeConfig.MaxConnLifetimeJitter = pg.maxConnLifetimeJitter
×
NEW
129
                readConfig.MaxConnLifetimeJitter = pg.maxConnLifetimeJitter
×
130
        } else {
7✔
131
                // Default to 20% of MaxConnLifetime if not explicitly set
7✔
132
                writeConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime))
7✔
133
                readConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime))
7✔
134
        }
7✔
135

136
        // Set the health check period for both configurations.
137
        if pg.healthCheckPeriod > 0 {
7✔
NEW
138
                writeConfig.HealthCheckPeriod = pg.healthCheckPeriod
×
NEW
139
                readConfig.HealthCheckPeriod = pg.healthCheckPeriod
×
NEW
140
        }
×
141

142
        // Set the connect timeout for both configurations.
143
        if pg.connectTimeout > 0 {
7✔
NEW
144
                writeConfig.ConnConfig.ConnectTimeout = pg.connectTimeout
×
NEW
145
                readConfig.ConnConfig.ConnectTimeout = pg.connectTimeout
×
NEW
146
        }
×
147

148
        writeConfig.ConnConfig.Tracer = otelpgx.NewTracer()
7✔
149
        readConfig.ConnConfig.Tracer = otelpgx.NewTracer()
7✔
150

7✔
151
        // Create connection pools for both writing and reading operations using the configured settings.
7✔
152
        pg.WritePool, pg.ReadPool, err = createPools(
7✔
153
                context.Background(), // Context used to control the lifecycle of the pools.
7✔
154
                writeConfig,          // Configuration settings for the write pool.
7✔
155
                readConfig,           // Configuration settings for the read pool.
7✔
156
        )
7✔
157
        // Handle errors during the creation of the connection pools.
7✔
158
        if err != nil {
9✔
159
                return nil, err
2✔
160
        }
2✔
161

162
        return pg, nil
5✔
163
}
164

165
func (p *Postgres) GetMaxDataPerWrite() int {
1✔
166
        return p.maxDataPerWrite
1✔
167
}
1✔
168

169
func (p *Postgres) GetMaxRetries() int {
1✔
170
        return p.maxRetries
1✔
171
}
1✔
172

173
func (p *Postgres) GetWatchBufferSize() int {
1✔
174
        return p.watchBufferSize
1✔
175
}
1✔
176

177
// GetEngineType - Get the engine type which is postgresql in string
178
func (p *Postgres) GetEngineType() string {
1✔
179
        return "postgres"
1✔
180
}
1✔
181

182
// Close - Close postgresql instance
183
func (p *Postgres) Close() error {
6✔
184
        p.ReadPool.Close()
6✔
185
        p.WritePool.Close()
6✔
186
        return nil
6✔
187
}
6✔
188

189
// IsReady - Check if database is ready
190
func (p *Postgres) IsReady(ctx context.Context) (bool, error) {
2✔
191
        ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
2✔
192
        defer cancel()
2✔
193
        if err := p.ReadPool.Ping(ctx); err != nil {
2✔
194
                return false, err
×
195
        }
×
196
        return true, nil
1✔
197
}
198

199
var queryExecModes = map[string]pgx.QueryExecMode{
200
        "cache_statement": pgx.QueryExecModeCacheStatement,
201
        "cache_describe":  pgx.QueryExecModeCacheDescribe,
202
        "describe_exec":   pgx.QueryExecModeDescribeExec,
203
        "mode_exec":       pgx.QueryExecModeExec,
204
        "simple_protocol": pgx.QueryExecModeSimpleProtocol,
205
}
206

207
func setDefaultQueryExecMode(config *pgx.ConnConfig) {
14✔
208
        // Default mode if no specific mode is found in the connection string
14✔
209
        defaultMode := "cache_statement"
14✔
210

14✔
211
        // Iterate through the map keys to check if any are mentioned in the connection string
14✔
212
        for key := range queryExecModes {
84✔
213
                if strings.Contains(config.ConnString(), "default_query_exec_mode="+key) {
70✔
214
                        config.DefaultQueryExecMode = queryExecModes[key]
×
215
                        slog.Info("setDefaultQueryExecMode", slog.String("mode", key))
×
216
                        return
×
217
                }
×
218
        }
219

220
        // Set to default mode if no matching mode is found
221
        config.DefaultQueryExecMode = queryExecModes[defaultMode]
14✔
222
        slog.Warn("setDefaultQueryExecMode", slog.String("mode", defaultMode))
14✔
223
}
224

225
var planCacheModes = map[string]string{
226
        "auto":              "auto",
227
        "force_custom_plan": "force_custom_plan",
228
        "disable":           "disable",
229
}
230

231
func setPlanCacheMode(config *pgx.ConnConfig) {
14✔
232
        // Default plan cache mode
14✔
233
        const defaultMode = "auto"
14✔
234

14✔
235
        // Extract connection string
14✔
236
        connStr := config.ConnString()
14✔
237
        planCacheMode := defaultMode
14✔
238

14✔
239
        // Check for specific plan cache modes in the connection string
14✔
240
        for key, value := range planCacheModes {
56✔
241
                if strings.Contains(connStr, "plan_cache_mode="+key) {
42✔
242
                        if key == "disable" {
×
243
                                delete(config.RuntimeParams, "plan_cache_mode")
×
244
                                slog.Info("setPlanCacheMode", slog.String("mode", "disabled"))
×
245
                                return
×
246
                        }
×
247
                        planCacheMode = value
×
248
                        slog.Info("setPlanCacheMode", slog.String("mode", key))
×
249
                        break
×
250
                }
251
        }
252

253
        // Set the plan cache mode
254
        config.RuntimeParams["plan_cache_mode"] = planCacheMode
14✔
255
        if planCacheMode == defaultMode {
28✔
256
                slog.Warn("setPlanCacheMode", slog.String("mode", defaultMode))
14✔
257
        }
14✔
258
}
259

260
// createPools initializes read and write connection pools with appropriate configurations and error handling.
261
func createPools(ctx context.Context, wConfig, rConfig *pgxpool.Config) (*pgxpool.Pool, *pgxpool.Pool, error) {
7✔
262
        // Context with timeout for creating the pools
7✔
263
        initCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
7✔
264
        defer cancel()
7✔
265

7✔
266
        // Create write pool
7✔
267
        writePool, err := pgxpool.NewWithConfig(initCtx, wConfig)
7✔
268
        if err != nil {
7✔
269
                return nil, nil, fmt.Errorf("failed to create write pool: %w", err)
×
270
        }
×
271

272
        // Create read pool using the same configuration
273
        readPool, err := pgxpool.NewWithConfig(initCtx, rConfig)
7✔
274
        if err != nil {
7✔
275
                writePool.Close() // Ensure write pool is closed on failure
×
276
                return nil, nil, fmt.Errorf("failed to create read pool: %w", err)
×
277
        }
×
278

279
        // Set up retry policy for pinging pools
280
        retryPolicy := backoff.NewExponentialBackOff()
7✔
281
        retryPolicy.MaxElapsedTime = 1 * time.Minute
7✔
282

7✔
283
        // Attempt to ping both pools to confirm connectivity
7✔
284
        err = backoff.Retry(func() error {
33✔
285
                pingCtx, pingCancel := context.WithTimeout(context.Background(), 2*time.Second)
26✔
286
                defer pingCancel()
26✔
287

26✔
288
                if err := writePool.Ping(pingCtx); err != nil {
47✔
289
                        return fmt.Errorf("write pool ping failed: %w", err)
21✔
290
                }
21✔
291
                if err := readPool.Ping(pingCtx); err != nil {
5✔
292
                        return fmt.Errorf("read pool ping failed: %w", err)
×
293
                }
×
294
                return nil
5✔
295
        }, retryPolicy)
296
        // Handle errors from pinging
297
        if err != nil {
9✔
298
                writePool.Close()
2✔
299
                readPool.Close()
2✔
300
                return nil, nil, fmt.Errorf("pinging pools failed: %w", err)
2✔
301
        }
2✔
302

303
        return writePool, readPool, nil
5✔
304
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc