• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Permify / permify / 18392524663

10 Oct 2025 12:13AM UTC coverage: 86.394% (-0.04%) from 86.429%
18392524663

push

github

web-flow
Merge pull request #2536 from Permify/feature/add-vtproto-support

refactor: reorganize and update protobuf files and configurations

27 of 30 new or added lines in 13 files covered. (90.0%)

2 existing lines in 1 file now uncovered.

9296 of 10760 relevant lines covered (86.39%)

205.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.35
/pkg/database/postgres/postgres.go
1
package postgres
2

3
import (
4
        "context"
5
        "fmt"
6
        "log/slog" // Structured logging
7
        "strings"
8
        "time"
9

10
        "github.com/cenkalti/backoff/v4"
11

12
        "github.com/exaring/otelpgx"
13

14
        "github.com/jackc/pgx/v5"
15

16
        "github.com/jackc/pgx/v5/pgxpool"
17

18
        "github.com/Masterminds/squirrel"
19
)
20

21
// Postgres - Structure for Postresql instance
22
type Postgres struct {
23
        ReadPool  *pgxpool.Pool
24
        WritePool *pgxpool.Pool
25

26
        Builder squirrel.StatementBuilderType
27
        // options
28
        maxDataPerWrite       int
29
        maxRetries            int
30
        watchBufferSize       int
31
        maxConnectionLifeTime time.Duration
32
        maxConnectionIdleTime time.Duration
33
        maxOpenConnections    int
34
        maxIdleConnections    int
35
}
36

37
// New -
38
func New(uri string, opts ...Option) (*Postgres, error) {
7✔
39
        return newDB(uri, uri, opts...)
7✔
40
}
7✔
41

42
// NewWithSeparateURIs -
43
func NewWithSeparateURIs(writerUri, readerUri string, opts ...Option) (*Postgres, error) {
3✔
44
        return newDB(writerUri, readerUri, opts...)
3✔
45
}
3✔
46

47
// new - Creates new postgresql db instance
48
func newDB(writerUri, readerUri string, opts ...Option) (*Postgres, error) {
10✔
49
        pg := &Postgres{
10✔
50
                maxOpenConnections: _defaultMaxOpenConnections,
10✔
51
                maxIdleConnections: _defaultMaxIdleConnections,
10✔
52
                maxDataPerWrite:    _defaultMaxDataPerWrite,
10✔
53
                maxRetries:         _defaultMaxRetries,
10✔
54
                watchBufferSize:    _defaultWatchBufferSize,
10✔
55
        }
10✔
56

10✔
57
        // Custom options
10✔
58
        for _, opt := range opts {
30✔
59
                opt(pg)
20✔
60
        }
20✔
61

62
        pg.Builder = squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar)
10✔
63

10✔
64
        writeConfig, err := pgxpool.ParseConfig(writerUri)
10✔
65
        if err != nil {
12✔
66
                return nil, err
2✔
67
        }
2✔
68

69
        readConfig, err := pgxpool.ParseConfig(readerUri)
8✔
70
        if err != nil {
9✔
71
                return nil, err
1✔
72
        }
1✔
73

74
        // Set the default execution mode for queries using the write and read configurations.
75
        setDefaultQueryExecMode(writeConfig.ConnConfig)
7✔
76
        setDefaultQueryExecMode(readConfig.ConnConfig)
7✔
77

7✔
78
        // Set the plan cache mode for both write and read configurations to optimize query planning.
7✔
79
        setPlanCacheMode(writeConfig.ConnConfig)
7✔
80
        setPlanCacheMode(readConfig.ConnConfig)
7✔
81

7✔
82
        // Set the minimum number of idle connections in the pool for both write and read configurations.
7✔
83
        writeConfig.MinConns = int32(pg.maxIdleConnections)
7✔
84
        readConfig.MinConns = int32(pg.maxIdleConnections)
7✔
85

7✔
86
        // Set the maximum number of active connections in the pool for both write and read configurations.
7✔
87
        writeConfig.MaxConns = int32(pg.maxOpenConnections)
7✔
88
        readConfig.MaxConns = int32(pg.maxOpenConnections)
7✔
89

7✔
90
        // Set the maximum amount of time a connection may be idle before being closed for both configurations.
7✔
91
        writeConfig.MaxConnIdleTime = pg.maxConnectionIdleTime
7✔
92
        readConfig.MaxConnIdleTime = pg.maxConnectionIdleTime
7✔
93

7✔
94
        // Set the maximum lifetime of a connection in the pool for both configurations.
7✔
95
        writeConfig.MaxConnLifetime = pg.maxConnectionLifeTime
7✔
96
        readConfig.MaxConnLifetime = pg.maxConnectionLifeTime
7✔
97

7✔
98
        // Set a jitter to the maximum connection lifetime to prevent all connections from expiring at the same time.
7✔
99
        writeConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime))
7✔
100
        readConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime))
7✔
101

7✔
102
        writeConfig.ConnConfig.Tracer = otelpgx.NewTracer()
7✔
103
        readConfig.ConnConfig.Tracer = otelpgx.NewTracer()
7✔
104

7✔
105
        // Create connection pools for both writing and reading operations using the configured settings.
7✔
106
        pg.WritePool, pg.ReadPool, err = createPools(
7✔
107
                context.Background(), // Context used to control the lifecycle of the pools.
7✔
108
                writeConfig,          // Configuration settings for the write pool.
7✔
109
                readConfig,           // Configuration settings for the read pool.
7✔
110
        )
7✔
111
        // Handle errors during the creation of the connection pools.
7✔
112
        if err != nil {
9✔
113
                return nil, err
2✔
114
        }
2✔
115

116
        return pg, nil
5✔
117
}
118

119
func (p *Postgres) GetMaxDataPerWrite() int {
1✔
120
        return p.maxDataPerWrite
1✔
121
}
1✔
122

123
func (p *Postgres) GetMaxRetries() int {
1✔
124
        return p.maxRetries
1✔
125
}
1✔
126

127
func (p *Postgres) GetWatchBufferSize() int {
1✔
128
        return p.watchBufferSize
1✔
129
}
1✔
130

131
// GetEngineType - Get the engine type which is postgresql in string
132
func (p *Postgres) GetEngineType() string {
1✔
133
        return "postgres"
1✔
134
}
1✔
135

136
// Close - Close postgresql instance
137
func (p *Postgres) Close() error {
6✔
138
        p.ReadPool.Close()
6✔
139
        p.WritePool.Close()
6✔
140
        return nil
6✔
141
}
6✔
142

143
// IsReady - Check if database is ready
144
func (p *Postgres) IsReady(ctx context.Context) (bool, error) {
2✔
145
        ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
2✔
146
        defer cancel()
2✔
147
        if err := p.ReadPool.Ping(ctx); err != nil {
2✔
148
                return false, err
×
149
        }
×
150
        return true, nil
1✔
151
}
152

153
var queryExecModes = map[string]pgx.QueryExecMode{
154
        "cache_statement": pgx.QueryExecModeCacheStatement,
155
        "cache_describe":  pgx.QueryExecModeCacheDescribe,
156
        "describe_exec":   pgx.QueryExecModeDescribeExec,
157
        "mode_exec":       pgx.QueryExecModeExec,
158
        "simple_protocol": pgx.QueryExecModeSimpleProtocol,
159
}
160

161
func setDefaultQueryExecMode(config *pgx.ConnConfig) {
14✔
162
        // Default mode if no specific mode is found in the connection string
14✔
163
        defaultMode := "cache_statement"
14✔
164

14✔
165
        // Iterate through the map keys to check if any are mentioned in the connection string
14✔
166
        for key := range queryExecModes {
84✔
167
                if strings.Contains(config.ConnString(), "default_query_exec_mode="+key) {
70✔
168
                        config.DefaultQueryExecMode = queryExecModes[key]
×
169
                        slog.Info("setDefaultQueryExecMode", slog.String("mode", key))
×
170
                        return
×
171
                }
×
172
        }
173

174
        // Set to default mode if no matching mode is found
175
        config.DefaultQueryExecMode = queryExecModes[defaultMode]
14✔
176
        slog.Warn("setDefaultQueryExecMode", slog.String("mode", defaultMode))
14✔
177
}
178

179
var planCacheModes = map[string]string{
180
        "auto":              "auto",
181
        "force_custom_plan": "force_custom_plan",
182
        "disable":           "disable",
183
}
184

185
func setPlanCacheMode(config *pgx.ConnConfig) {
14✔
186
        // Default plan cache mode
14✔
187
        const defaultMode = "auto"
14✔
188

14✔
189
        // Extract connection string
14✔
190
        connStr := config.ConnString()
14✔
191
        planCacheMode := defaultMode
14✔
192

14✔
193
        // Check for specific plan cache modes in the connection string
14✔
194
        for key, value := range planCacheModes {
56✔
195
                if strings.Contains(connStr, "plan_cache_mode="+key) {
42✔
196
                        if key == "disable" {
×
NEW
197
                                delete(config.RuntimeParams, "plan_cache_mode")
×
198
                                slog.Info("setPlanCacheMode", slog.String("mode", "disabled"))
×
199
                                return
×
200
                        }
×
201
                        planCacheMode = value
×
202
                        slog.Info("setPlanCacheMode", slog.String("mode", key))
×
203
                        break
×
204
                }
205
        }
206

207
        // Set the plan cache mode
208
        config.RuntimeParams["plan_cache_mode"] = planCacheMode
14✔
209
        if planCacheMode == defaultMode {
28✔
210
                slog.Warn("setPlanCacheMode", slog.String("mode", defaultMode))
14✔
211
        }
14✔
212
}
213

214
// createPools initializes read and write connection pools with appropriate configurations and error handling.
215
func createPools(ctx context.Context, wConfig, rConfig *pgxpool.Config) (*pgxpool.Pool, *pgxpool.Pool, error) {
7✔
216
        // Context with timeout for creating the pools
7✔
217
        initCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
7✔
218
        defer cancel()
7✔
219

7✔
220
        // Create write pool
7✔
221
        writePool, err := pgxpool.NewWithConfig(initCtx, wConfig)
7✔
222
        if err != nil {
7✔
223
                return nil, nil, fmt.Errorf("failed to create write pool: %w", err)
×
224
        }
×
225

226
        // Create read pool using the same configuration
227
        readPool, err := pgxpool.NewWithConfig(initCtx, rConfig)
7✔
228
        if err != nil {
7✔
229
                writePool.Close() // Ensure write pool is closed on failure
×
230
                return nil, nil, fmt.Errorf("failed to create read pool: %w", err)
×
231
        }
×
232

233
        // Set up retry policy for pinging pools
234
        retryPolicy := backoff.NewExponentialBackOff()
7✔
235
        retryPolicy.MaxElapsedTime = 1 * time.Minute
7✔
236

7✔
237
        // Attempt to ping both pools to confirm connectivity
7✔
238
        err = backoff.Retry(func() error {
34✔
239
                pingCtx, pingCancel := context.WithTimeout(context.Background(), 2*time.Second)
27✔
240
                defer pingCancel()
27✔
241

27✔
242
                if err := writePool.Ping(pingCtx); err != nil {
49✔
243
                        return fmt.Errorf("write pool ping failed: %w", err)
22✔
244
                }
22✔
245
                if err := readPool.Ping(pingCtx); err != nil {
5✔
246
                        return fmt.Errorf("read pool ping failed: %w", err)
×
247
                }
×
248
                return nil
5✔
249
        }, retryPolicy)
250
        // Handle errors from pinging
251
        if err != nil {
9✔
252
                writePool.Close()
2✔
253
                readPool.Close()
2✔
254
                return nil, nil, fmt.Errorf("pinging pools failed: %w", err)
2✔
255
        }
2✔
256

257
        return writePool, readPool, nil
5✔
258
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc