• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cybertec-postgresql / pgwatch3 / 8298420684

15 Mar 2024 03:06PM UTC coverage: 8.047% (-0.5%) from 8.559%
8298420684

push

github

web-flow
[*] update docker compose command to force recreate test database (#398)

419 of 5207 relevant lines covered (8.05%)

0.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/database.go
1
package main
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "path"
8
        "strconv"
9
        "strings"
10
        "sync"
11
        "sync/atomic"
12
        "time"
13

14
        "github.com/cybertec-postgresql/pgwatch3/db"
15
        "github.com/cybertec-postgresql/pgwatch3/metrics"
16
        "github.com/cybertec-postgresql/pgwatch3/metrics/psutil"
17
        "github.com/cybertec-postgresql/pgwatch3/sources"
18
        "github.com/jackc/pgx/v5"
19
        "github.com/jackc/pgx/v5/pgxpool"
20
)
21

22
var configDb db.PgxPoolIface
23
var monitoredDbConnCache map[string]db.PgxPoolIface = make(map[string]db.PgxPoolIface)
24

25
// every DB under monitoring should have exactly 1 sql.DB connection assigned, that will internally limit parallel access
26
func InitSQLConnPoolForMonitoredDBIfNil(md sources.MonitoredDatabase) error {
×
27
        monitoredDbConnCacheLock.Lock()
×
28
        defer monitoredDbConnCacheLock.Unlock()
×
29

×
30
        conn, ok := monitoredDbConnCache[md.DBUniqueName]
×
31
        if ok && conn != nil {
×
32
                return nil
×
33
        }
×
34

35
        conn, err := db.New(mainContext, md.ConnStr, func(conf *pgxpool.Config) error {
×
36
                conf.MaxConns = int32(opts.Sources.MaxParallelConnectionsPerDb)
×
37
                return nil
×
38
        })
×
39
        if err != nil {
×
40
                return err
×
41
        }
×
42

43
        monitoredDbConnCache[md.DBUniqueName] = conn
×
44

×
45
        return nil
×
46
}
47

48
func CloseOrLimitSQLConnPoolForMonitoredDBIfAny(dbUnique string) {
×
49
        monitoredDbConnCacheLock.Lock()
×
50
        defer monitoredDbConnCacheLock.Unlock()
×
51

×
52
        conn, ok := monitoredDbConnCache[dbUnique]
×
53
        if !ok || conn == nil {
×
54
                return
×
55
        }
×
56

57
        if IsDBUndersized(dbUnique) || IsDBIgnoredBasedOnRecoveryState(dbUnique) {
×
58

×
59
                s := conn.Stat()
×
60
                if s.TotalConns() > 1 {
×
61
                        logger.Debugf("[%s] Limiting SQL connection pool to max 1 connection due to dormant state ...", dbUnique)
×
62
                        // conn.SetMaxIdleConns(1)
×
63
                        // conn.SetMaxOpenConns(1)
×
64
                }
×
65

66
        } else { // removed from config
×
67
                logger.Debugf("[%s] Closing SQL connection pool ...", dbUnique)
×
68
                conn.Close()
×
69
                delete(monitoredDbConnCache, dbUnique)
×
70
        }
×
71
}
72

73
func DBExecRead(ctx context.Context, conn db.PgxIface, sql string, args ...any) (metrics.Measurements, error) {
×
74
        rows, err := conn.Query(ctx, sql, args...)
×
75
        if err == nil {
×
76
                return pgx.CollectRows(rows, pgx.RowToMap)
×
77
        }
×
78
        return nil, err
×
79
}
80

81
func GetConnByUniqueName(dbUnique string) db.PgxIface {
×
82
        monitoredDbConnCacheLock.RLock()
×
83
        conn := monitoredDbConnCache[dbUnique]
×
84
        monitoredDbConnCacheLock.RUnlock()
×
85
        return conn
×
86
}
×
87

88
func DBExecReadByDbUniqueName(ctx context.Context, dbUnique string, sql string, args ...any) (metrics.Measurements, error) {
×
89
        var conn db.PgxIface
×
90
        var md sources.MonitoredDatabase
×
91
        var data metrics.Measurements
×
92
        var err error
×
93
        var tx pgx.Tx
×
94
        if strings.TrimSpace(sql) == "" {
×
95
                return nil, errors.New("empty SQL")
×
96
        }
×
97
        if md, err = GetMonitoredDatabaseByUniqueName(dbUnique); err != nil {
×
98
                return nil, err
×
99
        }
×
100
        if conn = GetConnByUniqueName(dbUnique); conn == nil {
×
101
                logger.Errorf("SQL connection for dbUnique %s not found or nil", dbUnique) // Should always be initialized in the main loop DB discovery code ...
×
102
                return nil, errors.New("SQL connection not found or nil")
×
103
        }
×
104
        if tx, err = conn.Begin(ctx); err != nil {
×
105
                return nil, err
×
106
        }
×
107
        defer func() { _ = tx.Commit(ctx) }()
×
108
        if md.IsPostgresSource() {
×
109
                _, err = tx.Exec(ctx, "SET LOCAL lock_timeout TO '100ms'")
×
110
                if err != nil {
×
111
                        atomic.AddUint64(&totalMetricFetchFailuresCounter, 1)
×
112
                        return nil, err
×
113
                }
×
114
        }
115
        if data, err = DBExecRead(ctx, tx, sql, args...); err != nil {
×
116
                atomic.AddUint64(&totalMetricFetchFailuresCounter, 1)
×
117
        }
×
118
        return data, err
×
119
}
120

121
func DBGetSizeMB(dbUnique string) (int64, error) {
×
122
        sqlDbSize := `select /* pgwatch3_generated */ pg_database_size(current_database());`
×
123
        var sizeMB int64
×
124

×
125
        lastDBSizeCheckLock.RLock()
×
126
        lastDBSizeCheckTime := lastDBSizeFetchTime[dbUnique]
×
127
        lastDBSize, ok := lastDBSizeMB[dbUnique]
×
128
        lastDBSizeCheckLock.RUnlock()
×
129

×
130
        if !ok || lastDBSizeCheckTime.Add(dbSizeCachingInterval).Before(time.Now()) {
×
131
                ver, err := DBGetPGVersion(mainContext, dbUnique, sources.SourcePostgres, false)
×
132
                if err != nil || (ver.ExecEnv != execEnvAzureSingle) || (ver.ExecEnv == execEnvAzureSingle && ver.ApproxDBSizeB < 1e12) {
×
133
                        logger.Debugf("[%s] determining DB size ...", dbUnique)
×
134

×
135
                        data, err := DBExecReadByDbUniqueName(mainContext, dbUnique, sqlDbSize) // can take some time on ancient FS, use 300s stmt timeout
×
136
                        if err != nil {
×
137
                                logger.Errorf("[%s] failed to determine DB size...cannot apply --min-db-size-mb flag. err: %v ...", dbUnique, err)
×
138
                                return 0, err
×
139
                        }
×
140
                        sizeMB = data[0]["pg_database_size"].(int64) / 1048576
×
141
                } else {
×
142
                        logger.Debugf("[%s] Using approx DB size for the --min-db-size-mb filter ...", dbUnique)
×
143
                        sizeMB = ver.ApproxDBSizeB / 1048576
×
144
                }
×
145

146
                logger.Debugf("[%s] DB size = %d MB, caching for %v ...", dbUnique, sizeMB, dbSizeCachingInterval)
×
147

×
148
                lastDBSizeCheckLock.Lock()
×
149
                lastDBSizeFetchTime[dbUnique] = time.Now()
×
150
                lastDBSizeMB[dbUnique] = sizeMB
×
151
                lastDBSizeCheckLock.Unlock()
×
152

×
153
                return sizeMB, nil
×
154

155
        }
156
        logger.Debugf("[%s] using cached DBsize %d MB for the --min-db-size-mb filter check", dbUnique, lastDBSize)
×
157
        return lastDBSize, nil
×
158
}
159

160
func TryDiscoverExecutionEnv(dbUnique string) string {
×
161
        sqlPGExecEnv := `select /* pgwatch3_generated */
×
162
        case
×
163
          when exists (select * from pg_settings where name = 'pg_qs.host_database' and setting = 'azure_sys') and version() ~* 'compiled by Visual C' then 'AZURE_SINGLE'
×
164
          when exists (select * from pg_settings where name = 'pg_qs.host_database' and setting = 'azure_sys') and version() ~* 'compiled by gcc' then 'AZURE_FLEXIBLE'
×
165
          when exists (select * from pg_settings where name = 'cloudsql.supported_extensions') then 'GOOGLE'
×
166
        else
×
167
          'UNKNOWN'
×
168
        end as exec_env;
×
169
  `
×
170
        data, err := DBExecReadByDbUniqueName(mainContext, dbUnique, sqlPGExecEnv)
×
171
        if err != nil {
×
172
                return ""
×
173
        }
×
174
        return data[0]["exec_env"].(string)
×
175
}
176

177
func GetDBTotalApproxSize(dbUnique string) (int64, error) {
×
178
        sqlApproxDBSize := `
×
179
        select /* pgwatch3_generated */
×
180
                current_setting('block_size')::int8 * sum(relpages) as db_size_approx
×
181
        from
×
182
                pg_class c
×
183
        where        /* works only for v9.1+*/
×
184
                c.relpersistence != 't';
×
185
        `
×
186
        data, err := DBExecReadByDbUniqueName(mainContext, dbUnique, sqlApproxDBSize)
×
187
        if err != nil {
×
188
                return 0, err
×
189
        }
×
190
        return data[0]["db_size_approx"].(int64), nil
×
191
}
192

193
func DBGetPGVersion(ctx context.Context, dbUnique string, srcType sources.Kind, noCache bool) (DBVersionMapEntry, error) {
×
194
        var ver DBVersionMapEntry
×
195
        var verNew DBVersionMapEntry
×
196
        var ok bool
×
197
        sql := `
×
198
                select /* pgwatch3_generated */ (regexp_matches(
×
199
                        regexp_replace(current_setting('server_version'), '(beta|devel).*', '', 'g'),
×
200
                        E'\\d+\\.?\\d+?')
×
201
                        )[1]::text as ver, pg_is_in_recovery(), current_database()::text;
×
202
        `
×
203
        sqlSysid := `select /* pgwatch3_generated */ system_identifier::text from pg_control_system();`
×
204
        sqlSu := `select /* pgwatch3_generated */ rolsuper
×
205
                           from pg_roles r where rolname = session_user;`
×
206
        sqlExtensions := `select /* pgwatch3_generated */ extname::text, (regexp_matches(extversion, $$\d+\.?\d+?$$))[1]::text as extversion from pg_extension order by 1;`
×
207
        pgpoolVersion := `SHOW POOL_VERSION` // supported from pgpool2 v3.0
×
208

×
209
        dbPgVersionMapLock.Lock()
×
210
        getVerLock, ok := dbGetPgVersionMapLock[dbUnique]
×
211
        if !ok {
×
212
                dbGetPgVersionMapLock[dbUnique] = &sync.RWMutex{}
×
213
                getVerLock = dbGetPgVersionMapLock[dbUnique]
×
214
        }
×
215
        ver, ok = dbPgVersionMap[dbUnique]
×
216
        dbPgVersionMapLock.Unlock()
×
217

×
218
        if !noCache && ok && ver.LastCheckedOn.After(time.Now().Add(time.Minute*-2)) { // use cached version for 2 min
×
219
                //log.Debugf("using cached postgres version %s for %s", ver.Version.String(), dbUnique)
×
220
                return ver, nil
×
221
        }
×
222
        getVerLock.Lock() // limit to 1 concurrent version info fetch per DB
×
223
        defer getVerLock.Unlock()
×
224
        logger.WithField("source", dbUnique).
×
225
                WithField("type", srcType).Debug("determining DB version and recovery status...")
×
226

×
227
        if verNew.Extensions == nil {
×
228
                verNew.Extensions = make(map[string]uint)
×
229
        }
×
230

231
        if srcType == sources.SourcePgBouncer {
×
232
                data, err := DBExecReadByDbUniqueName(ctx, dbUnique, "show version")
×
233
                if err != nil {
×
234
                        return verNew, err
×
235
                }
×
236
                if len(data) == 0 {
×
237
                        // surprisingly pgbouncer 'show version' outputs in pre v1.12 is emitted as 'NOTICE' which cannot be accessed from Go lib/pg
×
238
                        verNew.Version = 0
×
239
                        verNew.VersionStr = "0"
×
240
                } else {
×
241
                        matches := rBouncerAndPgpoolVerMatch.FindStringSubmatch(data[0]["version"].(string))
×
242
                        if len(matches) != 1 {
×
243
                                logger.Errorf("[%s] Unexpected PgBouncer version input: %s", dbUnique, data[0]["version"].(string))
×
244
                                return ver, fmt.Errorf("Unexpected PgBouncer version input: %s", data[0]["version"].(string))
×
245
                        }
×
246
                        verNew.VersionStr = matches[0]
×
247
                        verNew.Version = VersionToInt(matches[0])
×
248
                }
249
        } else if srcType == sources.SourcePgPool {
×
250
                data, err := DBExecReadByDbUniqueName(ctx, dbUnique, pgpoolVersion)
×
251
                if err != nil {
×
252
                        return verNew, err
×
253
                }
×
254
                if len(data) == 0 {
×
255
                        verNew.Version = VersionToInt("3.0")
×
256
                        verNew.VersionStr = "3.0"
×
257
                } else {
×
258
                        matches := rBouncerAndPgpoolVerMatch.FindStringSubmatch(string(data[0]["pool_version"].([]byte)))
×
259
                        if len(matches) != 1 {
×
260
                                logger.Errorf("[%s] Unexpected PgPool version input: %s", dbUnique, data[0]["pool_version"].([]byte))
×
261
                                return ver, fmt.Errorf("Unexpected PgPool version input: %s", data[0]["pool_version"].([]byte))
×
262
                        }
×
263
                        verNew.VersionStr = matches[0]
×
264
                        verNew.Version = VersionToInt(matches[0])
×
265
                }
266
        } else {
×
267
                data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sql)
×
268
                if err != nil {
×
269
                        if noCache {
×
270
                                return ver, err
×
271
                        }
×
272
                        logger.Infof("[%s] DBGetPGVersion failed, using old cached value. err: %v", dbUnique, err)
×
273
                        return ver, nil
×
274

275
                }
276
                verNew.Version = VersionToInt(data[0]["ver"].(string))
×
277
                verNew.VersionStr = data[0]["ver"].(string)
×
278
                verNew.IsInRecovery = data[0]["pg_is_in_recovery"].(bool)
×
279
                verNew.RealDbname = data[0]["current_database"].(string)
×
280

×
281
                if verNew.Version > VersionToInt("10.0") && opts.Measurements.SystemIdentifierField > "" {
×
282
                        logger.Debugf("[%s] determining system identifier version (pg ver: %v)", dbUnique, verNew.VersionStr)
×
283
                        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlSysid)
×
284
                        if err == nil && len(data) > 0 {
×
285
                                verNew.SystemIdentifier = data[0]["system_identifier"].(string)
×
286
                        }
×
287
                }
288

289
                if ver.ExecEnv != "" {
×
290
                        verNew.ExecEnv = ver.ExecEnv // carry over as not likely to change ever
×
291
                } else {
×
292
                        logger.Debugf("[%s] determining the execution env...", dbUnique)
×
293
                        execEnv := TryDiscoverExecutionEnv(dbUnique)
×
294
                        if execEnv != "" {
×
295
                                logger.Debugf("[%s] running on execution env: %s", dbUnique, execEnv)
×
296
                                verNew.ExecEnv = execEnv
×
297
                        }
×
298
                }
299

300
                // to work around poor Azure Single Server FS functions performance for some metrics + the --min-db-size-mb filter
301
                if verNew.ExecEnv == execEnvAzureSingle {
×
302
                        approxSize, err := GetDBTotalApproxSize(dbUnique)
×
303
                        if err == nil {
×
304
                                verNew.ApproxDBSizeB = approxSize
×
305
                        } else {
×
306
                                verNew.ApproxDBSizeB = ver.ApproxDBSizeB
×
307
                        }
×
308
                }
309

310
                logger.Debugf("[%s] determining if monitoring user is a superuser...", dbUnique)
×
311
                data, err = DBExecReadByDbUniqueName(ctx, dbUnique, sqlSu)
×
312
                if err == nil {
×
313
                        verNew.IsSuperuser = data[0]["rolsuper"].(bool)
×
314
                }
×
315
                logger.Debugf("[%s] superuser=%v", dbUnique, verNew.IsSuperuser)
×
316

×
317
                if verNew.Version >= MinExtensionInfoAvailable {
×
318
                        //log.Debugf("[%s] determining installed extensions info...", dbUnique)
×
319
                        data, err = DBExecReadByDbUniqueName(mainContext, dbUnique, sqlExtensions)
×
320
                        if err != nil {
×
321
                                logger.Errorf("[%s] failed to determine installed extensions info: %v", dbUnique, err)
×
322
                        } else {
×
323
                                for _, dr := range data {
×
324
                                        extver := VersionToInt(dr["extversion"].(string))
×
325
                                        if extver == 0 {
×
326
                                                logger.Error("[%s] failed to determine extension version info for extension %s: %v", dbUnique, dr["extname"])
×
327
                                                continue
×
328
                                        }
329
                                        verNew.Extensions[dr["extname"].(string)] = extver
×
330
                                }
331
                                logger.Debugf("[%s] installed extensions: %+v", dbUnique, verNew.Extensions)
×
332
                        }
333
                }
334
        }
335

336
        verNew.LastCheckedOn = time.Now()
×
337
        dbPgVersionMapLock.Lock()
×
338
        dbPgVersionMap[dbUnique] = verNew
×
339
        dbPgVersionMapLock.Unlock()
×
340

×
341
        return verNew, nil
×
342
}
343

344
func DetectSprocChanges(dbUnique string, vme DBVersionMapEntry, storageCh chan<- []metrics.MeasurementMessage, hostState map[string]map[string]string) ChangeDetectionResults {
×
345
        detectedChanges := make(metrics.Measurements, 0)
×
346
        var firstRun bool
×
347
        var changeCounts ChangeDetectionResults
×
348

×
349
        logger.Debugf("[%s][%s] checking for sproc changes...", dbUnique, specialMetricChangeEvents)
×
350
        if _, ok := hostState["sproc_hashes"]; !ok {
×
351
                firstRun = true
×
352
                hostState["sproc_hashes"] = make(map[string]string)
×
353
        }
×
354

355
        mvp, err := GetMetricVersionProperties("sproc_hashes", vme, nil)
×
356
        if err != nil {
×
357
                logger.Error("could not get sproc_hashes sql:", err)
×
358
                return changeCounts
×
359
        }
×
360

361
        data, err := DBExecReadByDbUniqueName(mainContext, dbUnique, mvp.SQL)
×
362
        if err != nil {
×
363
                logger.Error("could not read sproc_hashes from monitored host: ", dbUnique, ", err:", err)
×
364
                return changeCounts
×
365
        }
×
366

367
        for _, dr := range data {
×
368
                objIdent := dr["tag_sproc"].(string) + dbMetricJoinStr + dr["tag_oid"].(string)
×
369
                prevHash, ok := hostState["sproc_hashes"][objIdent]
×
370
                if ok { // we have existing state
×
371
                        if prevHash != dr["md5"].(string) {
×
372
                                logger.Info("detected change in sproc:", dr["tag_sproc"], ", oid:", dr["tag_oid"])
×
373
                                dr["event"] = "alter"
×
374
                                detectedChanges = append(detectedChanges, dr)
×
375
                                hostState["sproc_hashes"][objIdent] = dr["md5"].(string)
×
376
                                changeCounts.Altered++
×
377
                        }
×
378
                } else { // check for new / delete
×
379
                        if !firstRun {
×
380
                                logger.Info("detected new sproc:", dr["tag_sproc"], ", oid:", dr["tag_oid"])
×
381
                                dr["event"] = "create"
×
382
                                detectedChanges = append(detectedChanges, dr)
×
383
                                changeCounts.Created++
×
384
                        }
×
385
                        hostState["sproc_hashes"][objIdent] = dr["md5"].(string)
×
386
                }
387
        }
388
        // detect deletes
389
        if !firstRun && len(hostState["sproc_hashes"]) != len(data) {
×
390
                deletedSProcs := make([]string, 0)
×
391
                // turn resultset to map => [oid]=true for faster checks
×
392
                currentOidMap := make(map[string]bool)
×
393
                for _, dr := range data {
×
394
                        currentOidMap[dr["tag_sproc"].(string)+dbMetricJoinStr+dr["tag_oid"].(string)] = true
×
395
                }
×
396
                for sprocIdent := range hostState["sproc_hashes"] {
×
397
                        _, ok := currentOidMap[sprocIdent]
×
398
                        if !ok {
×
399
                                splits := strings.Split(sprocIdent, dbMetricJoinStr)
×
400
                                logger.Info("detected delete of sproc:", splits[0], ", oid:", splits[1])
×
401
                                influxEntry := make(metrics.Measurement)
×
402
                                influxEntry["event"] = "drop"
×
403
                                influxEntry["tag_sproc"] = splits[0]
×
404
                                influxEntry["tag_oid"] = splits[1]
×
405
                                if len(data) > 0 {
×
406
                                        influxEntry["epoch_ns"] = data[0]["epoch_ns"]
×
407
                                } else {
×
408
                                        influxEntry["epoch_ns"] = time.Now().UnixNano()
×
409
                                }
×
410
                                detectedChanges = append(detectedChanges, influxEntry)
×
411
                                deletedSProcs = append(deletedSProcs, sprocIdent)
×
412
                                changeCounts.Dropped++
×
413
                        }
414
                }
415
                for _, deletedSProc := range deletedSProcs {
×
416
                        delete(hostState["sproc_hashes"], deletedSProc)
×
417
                }
×
418
        }
419
        logger.Debugf("[%s][%s] detected %d sproc changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
×
420
        if len(detectedChanges) > 0 {
×
421
                md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
×
422
                storageCh <- []metrics.MeasurementMessage{{DBName: dbUnique, MetricName: "sproc_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
×
423
        }
×
424

425
        return changeCounts
×
426
}
427

428
func DetectTableChanges(dbUnique string, vme DBVersionMapEntry, storageCh chan<- []metrics.MeasurementMessage, hostState map[string]map[string]string) ChangeDetectionResults {
×
429
        detectedChanges := make(metrics.Measurements, 0)
×
430
        var firstRun bool
×
431
        var changeCounts ChangeDetectionResults
×
432

×
433
        logger.Debugf("[%s][%s] checking for table changes...", dbUnique, specialMetricChangeEvents)
×
434
        if _, ok := hostState["table_hashes"]; !ok {
×
435
                firstRun = true
×
436
                hostState["table_hashes"] = make(map[string]string)
×
437
        }
×
438

439
        mvp, err := GetMetricVersionProperties("table_hashes", vme, nil)
×
440
        if err != nil {
×
441
                logger.Error("could not get table_hashes sql:", err)
×
442
                return changeCounts
×
443
        }
×
444

445
        data, err := DBExecReadByDbUniqueName(mainContext, dbUnique, mvp.SQL)
×
446
        if err != nil {
×
447
                logger.Error("could not read table_hashes from monitored host:", dbUnique, ", err:", err)
×
448
                return changeCounts
×
449
        }
×
450

451
        for _, dr := range data {
×
452
                objIdent := dr["tag_table"].(string)
×
453
                prevHash, ok := hostState["table_hashes"][objIdent]
×
454
                //log.Debug("inspecting table:", objIdent, "hash:", prev_hash)
×
455
                if ok { // we have existing state
×
456
                        if prevHash != dr["md5"].(string) {
×
457
                                logger.Info("detected DDL change in table:", dr["tag_table"])
×
458
                                dr["event"] = "alter"
×
459
                                detectedChanges = append(detectedChanges, dr)
×
460
                                hostState["table_hashes"][objIdent] = dr["md5"].(string)
×
461
                                changeCounts.Altered++
×
462
                        }
×
463
                } else { // check for new / delete
×
464
                        if !firstRun {
×
465
                                logger.Info("detected new table:", dr["tag_table"])
×
466
                                dr["event"] = "create"
×
467
                                detectedChanges = append(detectedChanges, dr)
×
468
                                changeCounts.Created++
×
469
                        }
×
470
                        hostState["table_hashes"][objIdent] = dr["md5"].(string)
×
471
                }
472
        }
473
        // detect deletes
474
        if !firstRun && len(hostState["table_hashes"]) != len(data) {
×
475
                deletedTables := make([]string, 0)
×
476
                // turn resultset to map => [table]=true for faster checks
×
477
                currentTableMap := make(map[string]bool)
×
478
                for _, dr := range data {
×
479
                        currentTableMap[dr["tag_table"].(string)] = true
×
480
                }
×
481
                for table := range hostState["table_hashes"] {
×
482
                        _, ok := currentTableMap[table]
×
483
                        if !ok {
×
484
                                logger.Info("detected drop of table:", table)
×
485
                                influxEntry := make(metrics.Measurement)
×
486
                                influxEntry["event"] = "drop"
×
487
                                influxEntry["tag_table"] = table
×
488
                                if len(data) > 0 {
×
489
                                        influxEntry["epoch_ns"] = data[0]["epoch_ns"]
×
490
                                } else {
×
491
                                        influxEntry["epoch_ns"] = time.Now().UnixNano()
×
492
                                }
×
493
                                detectedChanges = append(detectedChanges, influxEntry)
×
494
                                deletedTables = append(deletedTables, table)
×
495
                                changeCounts.Dropped++
×
496
                        }
497
                }
498
                for _, deletedTable := range deletedTables {
×
499
                        delete(hostState["table_hashes"], deletedTable)
×
500
                }
×
501
        }
502

503
        logger.Debugf("[%s][%s] detected %d table changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
×
504
        if len(detectedChanges) > 0 {
×
505
                md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
×
506
                storageCh <- []metrics.MeasurementMessage{{DBName: dbUnique, MetricName: "table_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
×
507
        }
×
508

509
        return changeCounts
×
510
}
511

512
func DetectIndexChanges(dbUnique string, vme DBVersionMapEntry, storageCh chan<- []metrics.MeasurementMessage, hostState map[string]map[string]string) ChangeDetectionResults {
×
513
        detectedChanges := make(metrics.Measurements, 0)
×
514
        var firstRun bool
×
515
        var changeCounts ChangeDetectionResults
×
516

×
517
        logger.Debugf("[%s][%s] checking for index changes...", dbUnique, specialMetricChangeEvents)
×
518
        if _, ok := hostState["index_hashes"]; !ok {
×
519
                firstRun = true
×
520
                hostState["index_hashes"] = make(map[string]string)
×
521
        }
×
522

523
        mvp, err := GetMetricVersionProperties("index_hashes", vme, nil)
×
524
        if err != nil {
×
525
                logger.Error("could not get index_hashes sql:", err)
×
526
                return changeCounts
×
527
        }
×
528

529
        data, err := DBExecReadByDbUniqueName(mainContext, dbUnique, mvp.SQL)
×
530
        if err != nil {
×
531
                logger.Error("could not read index_hashes from monitored host:", dbUnique, ", err:", err)
×
532
                return changeCounts
×
533
        }
×
534

535
        for _, dr := range data {
×
536
                objIdent := dr["tag_index"].(string)
×
537
                prevHash, ok := hostState["index_hashes"][objIdent]
×
538
                if ok { // we have existing state
×
539
                        if prevHash != (dr["md5"].(string) + dr["is_valid"].(string)) {
×
540
                                logger.Info("detected index change:", dr["tag_index"], ", table:", dr["table"])
×
541
                                dr["event"] = "alter"
×
542
                                detectedChanges = append(detectedChanges, dr)
×
543
                                hostState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
×
544
                                changeCounts.Altered++
×
545
                        }
×
546
                } else { // check for new / delete
×
547
                        if !firstRun {
×
548
                                logger.Info("detected new index:", dr["tag_index"])
×
549
                                dr["event"] = "create"
×
550
                                detectedChanges = append(detectedChanges, dr)
×
551
                                changeCounts.Created++
×
552
                        }
×
553
                        hostState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
×
554
                }
555
        }
556
        // detect deletes
557
        if !firstRun && len(hostState["index_hashes"]) != len(data) {
×
558
                deletedIndexes := make([]string, 0)
×
559
                // turn resultset to map => [table]=true for faster checks
×
560
                currentIndexMap := make(map[string]bool)
×
561
                for _, dr := range data {
×
562
                        currentIndexMap[dr["tag_index"].(string)] = true
×
563
                }
×
564
                for indexName := range hostState["index_hashes"] {
×
565
                        _, ok := currentIndexMap[indexName]
×
566
                        if !ok {
×
567
                                logger.Info("detected drop of index_name:", indexName)
×
568
                                influxEntry := make(metrics.Measurement)
×
569
                                influxEntry["event"] = "drop"
×
570
                                influxEntry["tag_index"] = indexName
×
571
                                if len(data) > 0 {
×
572
                                        influxEntry["epoch_ns"] = data[0]["epoch_ns"]
×
573
                                } else {
×
574
                                        influxEntry["epoch_ns"] = time.Now().UnixNano()
×
575
                                }
×
576
                                detectedChanges = append(detectedChanges, influxEntry)
×
577
                                deletedIndexes = append(deletedIndexes, indexName)
×
578
                                changeCounts.Dropped++
×
579
                        }
580
                }
581
                for _, deletedIndex := range deletedIndexes {
×
582
                        delete(hostState["index_hashes"], deletedIndex)
×
583
                }
×
584
        }
585
        logger.Debugf("[%s][%s] detected %d index changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
×
586
        if len(detectedChanges) > 0 {
×
587
                md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
×
588
                storageCh <- []metrics.MeasurementMessage{{DBName: dbUnique, MetricName: "index_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
×
589
        }
×
590

591
        return changeCounts
×
592
}
593

594
func DetectPrivilegeChanges(dbUnique string, vme DBVersionMapEntry, storageCh chan<- []metrics.MeasurementMessage, hostState map[string]map[string]string) ChangeDetectionResults {
×
595
        detectedChanges := make(metrics.Measurements, 0)
×
596
        var firstRun bool
×
597
        var changeCounts ChangeDetectionResults
×
598

×
599
        logger.Debugf("[%s][%s] checking object privilege changes...", dbUnique, specialMetricChangeEvents)
×
600
        if _, ok := hostState["object_privileges"]; !ok {
×
601
                firstRun = true
×
602
                hostState["object_privileges"] = make(map[string]string)
×
603
        }
×
604

605
        mvp, err := GetMetricVersionProperties("privilege_changes", vme, nil)
×
606
        if err != nil || mvp.SQL == "" {
×
607
                logger.Warningf("[%s][%s] could not get SQL for 'privilege_changes'. cannot detect privilege changes", dbUnique, specialMetricChangeEvents)
×
608
                return changeCounts
×
609
        }
×
610

611
        // returns rows of: object_type, tag_role, tag_object, privilege_type
612
        data, err := DBExecReadByDbUniqueName(mainContext, dbUnique, mvp.SQL)
×
613
        if err != nil {
×
614
                logger.Errorf("[%s][%s] failed to fetch object privileges info: %v", dbUnique, specialMetricChangeEvents, err)
×
615
                return changeCounts
×
616
        }
×
617

618
        currentState := make(map[string]bool)
×
619
        for _, dr := range data {
×
620
                objIdent := fmt.Sprintf("%s#:#%s#:#%s#:#%s", dr["object_type"], dr["tag_role"], dr["tag_object"], dr["privilege_type"])
×
621
                if firstRun {
×
622
                        hostState["object_privileges"][objIdent] = ""
×
623
                } else {
×
624
                        _, ok := hostState["object_privileges"][objIdent]
×
625
                        if !ok {
×
626
                                logger.Infof("[%s][%s] detected new object privileges: role=%s, object_type=%s, object=%s, privilege_type=%s",
×
627
                                        dbUnique, specialMetricChangeEvents, dr["tag_role"], dr["object_type"], dr["tag_object"], dr["privilege_type"])
×
628
                                dr["event"] = "GRANT"
×
629
                                detectedChanges = append(detectedChanges, dr)
×
630
                                changeCounts.Created++
×
631
                                hostState["object_privileges"][objIdent] = ""
×
632
                        }
×
633
                        currentState[objIdent] = true
×
634
                }
635
        }
636
        // check revokes - exists in old state only
637
        if !firstRun && len(currentState) > 0 {
×
638
                for objPrevRun := range hostState["object_privileges"] {
×
639
                        if _, ok := currentState[objPrevRun]; !ok {
×
640
                                splits := strings.Split(objPrevRun, "#:#")
×
641
                                logger.Infof("[%s][%s] detected removed object privileges: role=%s, object_type=%s, object=%s, privilege_type=%s",
×
642
                                        dbUnique, specialMetricChangeEvents, splits[1], splits[0], splits[2], splits[3])
×
643
                                revokeEntry := make(metrics.Measurement)
×
644
                                if epochNs, ok := data[0]["epoch_ns"]; ok {
×
645
                                        revokeEntry["epoch_ns"] = epochNs
×
646
                                } else {
×
647
                                        revokeEntry["epoch_ns"] = time.Now().UnixNano()
×
648
                                }
×
649
                                revokeEntry["object_type"] = splits[0]
×
650
                                revokeEntry["tag_role"] = splits[1]
×
651
                                revokeEntry["tag_object"] = splits[2]
×
652
                                revokeEntry["privilege_type"] = splits[3]
×
653
                                revokeEntry["event"] = "REVOKE"
×
654
                                detectedChanges = append(detectedChanges, revokeEntry)
×
655
                                changeCounts.Dropped++
×
656
                                delete(hostState["object_privileges"], objPrevRun)
×
657
                        }
658
                }
659
        }
660

661
        logger.Debugf("[%s][%s] detected %d object privilege changes...", dbUnique, specialMetricChangeEvents, len(detectedChanges))
×
662
        if len(detectedChanges) > 0 {
×
663
                md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
×
664
                storageCh <- []metrics.MeasurementMessage{
×
665
                        {
×
666
                                DBName:     dbUnique,
×
667
                                MetricName: "privilege_changes",
×
668
                                Data:       detectedChanges,
×
669
                                CustomTags: md.CustomTags,
×
670
                        }}
×
671
        }
×
672

673
        return changeCounts
×
674
}
675

676
func DetectConfigurationChanges(dbUnique string, vme DBVersionMapEntry, storageCh chan<- []metrics.MeasurementMessage, hostState map[string]map[string]string) ChangeDetectionResults {
×
677
        detectedChanges := make(metrics.Measurements, 0)
×
678
        var firstRun bool
×
679
        var changeCounts ChangeDetectionResults
×
680

×
681
        logger.Debugf("[%s][%s] checking for configuration changes...", dbUnique, specialMetricChangeEvents)
×
682
        if _, ok := hostState["configuration_hashes"]; !ok {
×
683
                firstRun = true
×
684
                hostState["configuration_hashes"] = make(map[string]string)
×
685
        }
×
686

687
        mvp, err := GetMetricVersionProperties("configuration_hashes", vme, nil)
×
688
        if err != nil {
×
689
                logger.Errorf("[%s][%s] could not get configuration_hashes sql: %v", dbUnique, specialMetricChangeEvents, err)
×
690
                return changeCounts
×
691
        }
×
692

693
        data, err := DBExecReadByDbUniqueName(mainContext, dbUnique, mvp.SQL)
×
694
        if err != nil {
×
695
                logger.Errorf("[%s][%s] could not read configuration_hashes from monitored host: %v", dbUnique, specialMetricChangeEvents, err)
×
696
                return changeCounts
×
697
        }
×
698

699
        for _, dr := range data {
×
700
                objIdent := dr["tag_setting"].(string)
×
701
                objValue := dr["value"].(string)
×
702
                prevРash, ok := hostState["configuration_hashes"][objIdent]
×
703
                if ok { // we have existing state
×
704
                        if prevРash != objValue {
×
705
                                if objIdent == "connection_ID" {
×
706
                                        continue // ignore some weird Azure managed PG service setting
×
707
                                }
708
                                logger.Warningf("[%s][%s] detected settings change: %s = %s (prev: %s)",
×
709
                                        dbUnique, specialMetricChangeEvents, objIdent, objValue, prevРash)
×
710
                                dr["event"] = "alter"
×
711
                                detectedChanges = append(detectedChanges, dr)
×
712
                                hostState["configuration_hashes"][objIdent] = objValue
×
713
                                changeCounts.Altered++
×
714
                        }
715
                } else { // check for new, delete not relevant here (pg_upgrade)
×
716
                        if !firstRun {
×
717
                                logger.Warningf("[%s][%s] detected new setting: %s", dbUnique, specialMetricChangeEvents, objIdent)
×
718
                                dr["event"] = "create"
×
719
                                detectedChanges = append(detectedChanges, dr)
×
720
                                changeCounts.Created++
×
721
                        }
×
722
                        hostState["configuration_hashes"][objIdent] = objValue
×
723
                }
724
        }
725

726
        logger.Debugf("[%s][%s] detected %d configuration changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
×
727
        if len(detectedChanges) > 0 {
×
728
                md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
×
729
                storageCh <- []metrics.MeasurementMessage{{
×
730
                        DBName:     dbUnique,
×
731
                        MetricName: "configuration_changes",
×
732
                        Data:       detectedChanges,
×
733
                        CustomTags: md.CustomTags,
×
734
                }}
×
735
        }
×
736

737
        return changeCounts
×
738
}
739

740
func CheckForPGObjectChangesAndStore(dbUnique string, vme DBVersionMapEntry, storageCh chan<- []metrics.MeasurementMessage, hostState map[string]map[string]string) {
×
741
        sprocСounts := DetectSprocChanges(dbUnique, vme, storageCh, hostState) // TODO some of Detect*() code could be unified...
×
742
        tableСounts := DetectTableChanges(dbUnique, vme, storageCh, hostState)
×
743
        indexСounts := DetectIndexChanges(dbUnique, vme, storageCh, hostState)
×
744
        confСounts := DetectConfigurationChanges(dbUnique, vme, storageCh, hostState)
×
745
        privСhangeCounts := DetectPrivilegeChanges(dbUnique, vme, storageCh, hostState)
×
746

×
747
        // need to send info on all object changes as one message as Grafana applies "last wins" for annotations with similar timestamp
×
748
        message := ""
×
749
        if sprocСounts.Altered > 0 || sprocСounts.Created > 0 || sprocСounts.Dropped > 0 {
×
750
                message += fmt.Sprintf(" sprocs %d/%d/%d", sprocСounts.Created, sprocСounts.Altered, sprocСounts.Dropped)
×
751
        }
×
752
        if tableСounts.Altered > 0 || tableСounts.Created > 0 || tableСounts.Dropped > 0 {
×
753
                message += fmt.Sprintf(" tables/views %d/%d/%d", tableСounts.Created, tableСounts.Altered, tableСounts.Dropped)
×
754
        }
×
755
        if indexСounts.Altered > 0 || indexСounts.Created > 0 || indexСounts.Dropped > 0 {
×
756
                message += fmt.Sprintf(" indexes %d/%d/%d", indexСounts.Created, indexСounts.Altered, indexСounts.Dropped)
×
757
        }
×
758
        if confСounts.Altered > 0 || confСounts.Created > 0 {
×
759
                message += fmt.Sprintf(" configuration %d/%d/%d", confСounts.Created, confСounts.Altered, confСounts.Dropped)
×
760
        }
×
761
        if privСhangeCounts.Dropped > 0 || privСhangeCounts.Created > 0 {
×
762
                message += fmt.Sprintf(" privileges %d/%d/%d", privСhangeCounts.Created, privСhangeCounts.Altered, privСhangeCounts.Dropped)
×
763
        }
×
764

765
        if message > "" {
×
766
                message = "Detected changes for \"" + dbUnique + "\" [Created/Altered/Dropped]:" + message
×
767
                logger.Info(message)
×
768
                detectedChangesSummary := make(metrics.Measurements, 0)
×
769
                influxEntry := make(metrics.Measurement)
×
770
                influxEntry["details"] = message
×
771
                influxEntry["epoch_ns"] = time.Now().UnixNano()
×
772
                detectedChangesSummary = append(detectedChangesSummary, influxEntry)
×
773
                md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
×
774
                storageCh <- []metrics.MeasurementMessage{{DBName: dbUnique,
×
775
                        SourceType: string(md.Kind),
×
776
                        MetricName: "object_changes",
×
777
                        Data:       detectedChangesSummary,
×
778
                        CustomTags: md.CustomTags,
×
779
                }}
×
780

×
781
        }
×
782
}
783

784
// some extra work needed as pgpool SHOW commands don't specify the return data types for some reason
785
func FetchMetricsPgpool(msg MetricFetchMessage, _ DBVersionMapEntry, mvp metrics.MetricProperties) (metrics.Measurements, error) {
×
786
        var retData = make(metrics.Measurements, 0)
×
787
        epochNs := time.Now().UnixNano()
×
788

×
789
        sqlLines := strings.Split(strings.ToUpper(mvp.SQL), "\n")
×
790

×
791
        for _, sql := range sqlLines {
×
792
                if strings.HasPrefix(sql, "SHOW POOL_NODES") {
×
793
                        data, err := DBExecReadByDbUniqueName(mainContext, msg.DBUniqueName, sql)
×
794
                        if err != nil {
×
795
                                logger.Errorf("[%s][%s] Could not fetch PgPool statistics: %v", msg.DBUniqueName, msg.MetricName, err)
×
796
                                return data, err
×
797
                        }
×
798

799
                        for _, row := range data {
×
800
                                retRow := make(metrics.Measurement)
×
801
                                retRow[epochColumnName] = epochNs
×
802
                                for k, v := range row {
×
803
                                        vs := string(v.([]byte))
×
804
                                        // need 1 tag so that Influx would not merge rows
×
805
                                        if k == "node_id" {
×
806
                                                retRow["tag_node_id"] = vs
×
807
                                                continue
×
808
                                        }
809

810
                                        retRow[k] = vs
×
811
                                        if k == "status" { // was changed from numeric to string at some pgpool version so leave the string
×
812
                                                // but also add "status_num" field
×
813
                                                if vs == "up" {
×
814
                                                        retRow["status_num"] = 1
×
815
                                                } else if vs == "down" {
×
816
                                                        retRow["status_num"] = 0
×
817
                                                } else {
×
818
                                                        i, err := strconv.ParseInt(vs, 10, 64)
×
819
                                                        if err == nil {
×
820
                                                                retRow["status_num"] = i
×
821
                                                        }
×
822
                                                }
823
                                                continue
×
824
                                        }
825
                                        // everything is returned as text, so try to convert all numerics into ints / floats
826
                                        if k != "lb_weight" {
×
827
                                                i, err := strconv.ParseInt(vs, 10, 64)
×
828
                                                if err == nil {
×
829
                                                        retRow[k] = i
×
830
                                                        continue
×
831
                                                }
832
                                        }
833
                                        f, err := strconv.ParseFloat(vs, 64)
×
834
                                        if err == nil {
×
835
                                                retRow[k] = f
×
836
                                                continue
×
837
                                        }
838
                                }
839
                                retData = append(retData, retRow)
×
840
                        }
841
                } else if strings.HasPrefix(sql, "SHOW POOL_PROCESSES") {
×
842
                        if len(retData) == 0 {
×
843
                                logger.Warningf("[%s][%s] SHOW POOL_NODES needs to be placed before SHOW POOL_PROCESSES. ignoring SHOW POOL_PROCESSES", msg.DBUniqueName, msg.MetricName)
×
844
                                continue
×
845
                        }
846

847
                        data, err := DBExecReadByDbUniqueName(mainContext, msg.DBUniqueName, sql)
×
848
                        if err != nil {
×
849
                                logger.Errorf("[%s][%s] Could not fetch PgPool statistics: %v", msg.DBUniqueName, msg.MetricName, err)
×
850
                                continue
×
851
                        }
852

853
                        // summarize processesTotal / processes_active over all rows
854
                        processesTotal := 0
×
855
                        processesActive := 0
×
856
                        for _, row := range data {
×
857
                                processesTotal++
×
858
                                v, ok := row["database"]
×
859
                                if !ok {
×
860
                                        logger.Infof("[%s][%s] column 'database' not found from data returned by SHOW POOL_PROCESSES, check pool version / SQL definition", msg.DBUniqueName, msg.MetricName)
×
861
                                        continue
×
862
                                }
863
                                if len(v.([]byte)) > 0 {
×
864
                                        processesActive++
×
865
                                }
×
866
                        }
867

868
                        for _, retRow := range retData {
×
869
                                retRow["processes_total"] = processesTotal
×
870
                                retRow["processes_active"] = processesActive
×
871
                        }
×
872
                }
873
        }
874
        return retData, nil
×
875
}
876

877
func DoesFunctionExists(dbUnique, functionName string) bool {
×
878
        logger.Debug("Checking for function existence", dbUnique, functionName)
×
879
        sql := fmt.Sprintf("select /* pgwatch3_generated */ 1 from pg_proc join pg_namespace n on pronamespace = n.oid where proname = '%s' and n.nspname = 'public'", functionName)
×
880
        data, err := DBExecReadByDbUniqueName(mainContext, dbUnique, sql)
×
881
        if err != nil {
×
882
                logger.Error("Failed to check for function existence", dbUnique, functionName, err)
×
883
                return false
×
884
        }
×
885
        if len(data) > 0 {
×
886
                logger.Debugf("Function %s exists on %s", functionName, dbUnique)
×
887
                return true
×
888
        }
×
889
        return false
×
890
}
891

892
// Called once on daemon startup if some commonly wanted extension (most notably pg_stat_statements) is missing.
893
// With newer Postgres version can even succeed if the user is not a real superuser due to some cloud-specific
894
// whitelisting or "trusted extensions" (a feature from v13). Ignores errors.
895
func TryCreateMissingExtensions(dbUnique string, extensionNames []string, existingExtensions map[string]uint) []string {
×
896
        sqlAvailable := `select name::text from pg_available_extensions`
×
897
        extsCreated := make([]string, 0)
×
898

×
899
        // For security reasons don't allow to execute random strings but check that it's an existing extension
×
900
        data, err := DBExecReadByDbUniqueName(mainContext, dbUnique, sqlAvailable)
×
901
        if err != nil {
×
902
                logger.Infof("[%s] Failed to get a list of available extensions: %v", dbUnique, err)
×
903
                return extsCreated
×
904
        }
×
905

906
        availableExts := make(map[string]bool)
×
907
        for _, row := range data {
×
908
                availableExts[row["name"].(string)] = true
×
909
        }
×
910

911
        for _, extToCreate := range extensionNames {
×
912
                if _, ok := existingExtensions[extToCreate]; ok {
×
913
                        continue
×
914
                }
915
                _, ok := availableExts[extToCreate]
×
916
                if !ok {
×
917
                        logger.Errorf("[%s] Requested extension %s not available on instance, cannot try to create...", dbUnique, extToCreate)
×
918
                } else {
×
919
                        sqlCreateExt := `create extension ` + extToCreate
×
920
                        _, err := DBExecReadByDbUniqueName(mainContext, dbUnique, sqlCreateExt)
×
921
                        if err != nil {
×
922
                                logger.Errorf("[%s] Failed to create extension %s (based on --try-create-listed-exts-if-missing input): %v", dbUnique, extToCreate, err)
×
923
                        }
×
924
                        extsCreated = append(extsCreated, extToCreate)
×
925
                }
926
        }
927

928
        return extsCreated
×
929
}
930

931
// Called once on daemon startup to try to create "metric fething helper" functions automatically
932
func TryCreateMetricsFetchingHelpers(dbUnique string) error {
×
933
        dbPgVersion, err := DBGetPGVersion(mainContext, dbUnique, sources.SourcePostgres, false)
×
934
        if err != nil {
×
935
                logger.Errorf("Failed to fetch pg version for \"%s\": %s", dbUnique, err)
×
936
                return err
×
937
        }
×
938

939
        if fileBasedMetrics {
×
940
                helpers, _, err := metrics.ReadMetricsFromFolder(mainContext, path.Join(opts.Metrics.MetricsFolder, metrics.FileBasedMetricHelpersDir))
×
941
                if err != nil {
×
942
                        logger.Errorf("Failed to fetch helpers from \"%s\": %s", path.Join(opts.Metrics.MetricsFolder, metrics.FileBasedMetricHelpersDir), err)
×
943
                        return err
×
944
                }
×
945
                logger.Debug("%d helper definitions found from \"%s\"...", len(helpers), path.Join(opts.Metrics.MetricsFolder, metrics.FileBasedMetricHelpersDir))
×
946

×
947
                for helperName := range helpers {
×
948
                        if strings.Contains(helperName, "windows") {
×
949
                                logger.Infof("Skipping %s rollout. Windows helpers need to be rolled out manually", helperName)
×
950
                                continue
×
951
                        }
952
                        if !DoesFunctionExists(dbUnique, helperName) {
×
953

×
954
                                logger.Debug("Trying to create metric fetching helpers for", dbUnique, helperName)
×
955
                                mvp, err := GetMetricVersionProperties(helperName, dbPgVersion, helpers)
×
956
                                if err != nil {
×
957
                                        logger.Warning("Could not find query text for", dbUnique, helperName)
×
958
                                        continue
×
959
                                }
960
                                _, err = DBExecReadByDbUniqueName(mainContext, dbUnique, mvp.SQL)
×
961
                                if err != nil {
×
962
                                        logger.Warning("Failed to create a metric fetching helper for", dbUnique, helperName)
×
963
                                        logger.Warning(err)
×
964
                                } else {
×
965
                                        logger.Info("Successfully created metric fetching helper for", dbUnique, helperName)
×
966
                                }
×
967
                        }
968
                }
969

970
        } else {
×
971
                sqlHelpers := "select /* pgwatch3_generated */ distinct m_name from pgwatch3.metric where m_is_active and m_is_helper" // m_name is a helper function name
×
972
                data, err := DBExecRead(mainContext, configDb, sqlHelpers)
×
973
                if err != nil {
×
974
                        logger.Error(err)
×
975
                        return err
×
976
                }
×
977
                for _, row := range data {
×
978
                        metric := row["m_name"].(string)
×
979

×
980
                        if strings.Contains(metric, "windows") {
×
981
                                logger.Infof("Skipping %s rollout. Windows helpers need to be rolled out manually", metric)
×
982
                                continue
×
983
                        }
984
                        if !DoesFunctionExists(dbUnique, metric) {
×
985

×
986
                                logger.Debug("Trying to create metric fetching helpers for", dbUnique, metric)
×
987
                                mvp, err := GetMetricVersionProperties(metric, dbPgVersion, nil)
×
988
                                if err != nil {
×
989
                                        logger.Warning("Could not find query text for", dbUnique, metric)
×
990
                                        continue
×
991
                                }
992
                                _, err = DBExecReadByDbUniqueName(mainContext, dbUnique, mvp.SQL)
×
993
                                if err != nil {
×
994
                                        logger.Warning("Failed to create a metric fetching helper for", dbUnique, metric)
×
995
                                        logger.Warning(err)
×
996
                                } else {
×
997
                                        logger.Warning("Successfully created metric fetching helper for", dbUnique, metric)
×
998
                                }
×
999
                        }
1000
                }
1001
        }
1002
        return nil
×
1003
}
1004

1005
// connects actually to the instance to determine PG relevant disk paths / mounts
1006
func GetGoPsutilDiskPG(dbUnique string) (metrics.Measurements, error) {
×
1007
        sql := `select current_setting('data_directory') as dd, current_setting('log_directory') as ld, current_setting('server_version_num')::int as pgver`
×
1008
        sqlTS := `select spcname::text as name, pg_catalog.pg_tablespace_location(oid) as location from pg_catalog.pg_tablespace where not spcname like any(array[E'pg\\_%'])`
×
1009
        data, err := DBExecReadByDbUniqueName(mainContext, dbUnique, sql)
×
1010
        if err != nil || len(data) == 0 {
×
1011
                logger.Errorf("Failed to determine relevant PG disk paths via SQL: %v", err)
×
1012
                return nil, err
×
1013
        }
×
1014
        dataTblsp, err := DBExecReadByDbUniqueName(mainContext, dbUnique, sqlTS)
×
1015
        if err != nil {
×
1016
                logger.Infof("Failed to determine relevant PG tablespace paths via SQL: %v", err)
×
1017
        }
×
1018
        return psutil.GetGoPsutilDiskPG(data, dataTblsp)
×
1019
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc