• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cybertec-postgresql / pgwatch3 / 10078838251

24 Jul 2024 02:43PM UTC coverage: 28.289% (+0.2%) from 28.102%
10078838251

push

github

web-flow
[-] fix metric version discovery for old PostgreSQL versions, fixes #489 (#491)

[*] rename `DBVersionMapEntry` to `MonitoredDatabaseSettings`
[*] rename `DBGetPGVersion()` to `GetMonitoredDatabaseSettings()`
[*] refactor `GetMonitoredDatabaseSettings()`
[*] rename `MetricFetchMessage` to `MetricFetchConfig`

0 of 151 new or added lines in 5 files covered. (0.0%)

2 existing lines in 1 file now uncovered.

1273 of 4500 relevant lines covered (28.29%)

0.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.62
/src/reaper/database.go
1
package reaper
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "math"
8
        "regexp"
9
        "strconv"
10
        "strings"
11
        "sync"
12
        "time"
13

14
        "github.com/cybertec-postgresql/pgwatch3/db"
15
        "github.com/cybertec-postgresql/pgwatch3/log"
16
        "github.com/cybertec-postgresql/pgwatch3/metrics"
17
        "github.com/cybertec-postgresql/pgwatch3/metrics/psutil"
18
        "github.com/cybertec-postgresql/pgwatch3/sinks"
19
        "github.com/cybertec-postgresql/pgwatch3/sources"
20
        "github.com/jackc/pgx/v5"
21
        "github.com/jackc/pgx/v5/pgxpool"
22
)
23

24
// every DB under monitoring should have exactly 1 sql.DB connection assigned, that will internally limit parallel access
25
func InitSQLConnPoolForMonitoredDBIfNil(ctx context.Context, md *sources.MonitoredDatabase, maxConns int) (err error) {
×
26
        conn := md.Conn
×
27
        if conn != nil {
×
28
                return nil
×
29
        }
×
30

31
        md.Conn, err = db.New(ctx, md.ConnStr, func(conf *pgxpool.Config) error {
×
32
                conf.MaxConns = int32(maxConns)
×
33
                return nil
×
34
        })
×
35
        if err != nil {
×
36
                return err
×
37
        }
×
38

39
        return nil
×
40
}
41

42
func DBExecRead(ctx context.Context, conn db.PgxIface, sql string, args ...any) (metrics.Measurements, error) {
×
43
        rows, err := conn.Query(ctx, sql, args...)
×
44
        if err == nil {
×
45
                return pgx.CollectRows(rows, pgx.RowToMap)
×
46
        }
×
47
        return nil, err
×
48
}
49

50
func GetConnByUniqueName(dbUnique string) db.PgxIface {
×
51
        if md, err := GetMonitoredDatabaseByUniqueName(dbUnique); err == nil {
×
52
                return md.Conn
×
53
        }
×
54
        return nil
×
55
}
56

57
func DBExecReadByDbUniqueName(ctx context.Context, dbUnique string, sql string, args ...any) (metrics.Measurements, error) {
×
58
        var conn db.PgxIface
×
59
        var md *sources.MonitoredDatabase
×
60
        var err error
×
61
        var tx pgx.Tx
×
62
        if strings.TrimSpace(sql) == "" {
×
63
                return nil, errors.New("empty SQL")
×
64
        }
×
65
        if md, err = GetMonitoredDatabaseByUniqueName(dbUnique); err != nil {
×
66
                return nil, err
×
67
        }
×
68
        if conn = GetConnByUniqueName(dbUnique); conn == nil {
×
69
                log.GetLogger(ctx).Errorf("SQL connection for dbUnique %s not found or nil", dbUnique) // Should always be initialized in the main loop DB discovery code ...
×
70
                return nil, errors.New("SQL connection not found or nil")
×
71
        }
×
72
        if tx, err = conn.Begin(ctx); err != nil {
×
73
                return nil, err
×
74
        }
×
75
        defer func() { _ = tx.Commit(ctx) }()
×
76
        if md.IsPostgresSource() {
×
77
                _, err = tx.Exec(ctx, "SET LOCAL lock_timeout TO '100ms'")
×
78
                if err != nil {
×
79
                        return nil, err
×
80
                }
×
81
        }
82
        return DBExecRead(ctx, tx, sql, args...)
×
83
}
84

85
const (
86
        execEnvUnknown       = "UNKNOWN"
87
        execEnvAzureSingle   = "AZURE_SINGLE"
88
        execEnvAzureFlexible = "AZURE_FLEXIBLE"
89
        execEnvGoogle        = "GOOGLE"
90
)
91

92
func DBGetSizeMB(ctx context.Context, dbUnique string) (int64, error) {
×
93
        sqlDbSize := `select /* pgwatch3_generated */ pg_database_size(current_database());`
×
94
        var sizeMB int64
×
95

×
96
        lastDBSizeCheckLock.RLock()
×
97
        lastDBSizeCheckTime := lastDBSizeFetchTime[dbUnique]
×
98
        lastDBSize, ok := lastDBSizeMB[dbUnique]
×
99
        lastDBSizeCheckLock.RUnlock()
×
100

×
101
        if !ok || lastDBSizeCheckTime.Add(dbSizeCachingInterval).Before(time.Now()) {
×
NEW
102
                ver, err := GetMonitoredDatabaseSettings(ctx, dbUnique, sources.SourcePostgres, false)
×
103
                if err != nil || (ver.ExecEnv != execEnvAzureSingle) || (ver.ExecEnv == execEnvAzureSingle && ver.ApproxDBSizeB < 1e12) {
×
104
                        log.GetLogger(ctx).Debugf("[%s] determining DB size ...", dbUnique)
×
105

×
106
                        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlDbSize) // can take some time on ancient FS, use 300s stmt timeout
×
107
                        if err != nil {
×
108
                                log.GetLogger(ctx).Errorf("[%s] failed to determine DB size...cannot apply --min-db-size-mb flag. err: %v ...", dbUnique, err)
×
109
                                return 0, err
×
110
                        }
×
111
                        sizeMB = data[0]["pg_database_size"].(int64) / 1048576
×
112
                } else {
×
113
                        log.GetLogger(ctx).Debugf("[%s] Using approx DB size for the --min-db-size-mb filter ...", dbUnique)
×
114
                        sizeMB = ver.ApproxDBSizeB / 1048576
×
115
                }
×
116

117
                log.GetLogger(ctx).Debugf("[%s] DB size = %d MB, caching for %v ...", dbUnique, sizeMB, dbSizeCachingInterval)
×
118

×
119
                lastDBSizeCheckLock.Lock()
×
120
                lastDBSizeFetchTime[dbUnique] = time.Now()
×
121
                lastDBSizeMB[dbUnique] = sizeMB
×
122
                lastDBSizeCheckLock.Unlock()
×
123

×
124
                return sizeMB, nil
×
125

126
        }
127
        log.GetLogger(ctx).Debugf("[%s] using cached DBsize %d MB for the --min-db-size-mb filter check", dbUnique, lastDBSize)
×
128
        return lastDBSize, nil
×
129
}
130

NEW
131
func TryDiscoverExecutionEnv(ctx context.Context, dbUnique string) (execEnv string) {
×
NEW
132
        sql := `select /* pgwatch3_generated */
×
133
        case
×
134
          when exists (select * from pg_settings where name = 'pg_qs.host_database' and setting = 'azure_sys') and version() ~* 'compiled by Visual C' then 'AZURE_SINGLE'
×
135
          when exists (select * from pg_settings where name = 'pg_qs.host_database' and setting = 'azure_sys') and version() ~* 'compiled by gcc' then 'AZURE_FLEXIBLE'
×
136
          when exists (select * from pg_settings where name = 'cloudsql.supported_extensions') then 'GOOGLE'
×
137
        else
×
138
          'UNKNOWN'
×
NEW
139
        end as exec_env`
×
NEW
140
        _ = GetConnByUniqueName(dbUnique).QueryRow(ctx, sql).Scan(&execEnv)
×
NEW
141
        return
×
UNCOV
142
}
×
143

144
func GetDBTotalApproxSize(ctx context.Context, dbUnique string) (int64, error) {
×
145
        sqlApproxDBSize := `
×
146
        select /* pgwatch3_generated */
×
147
                current_setting('block_size')::int8 * sum(relpages) as db_size_approx
×
148
        from
×
149
                pg_class c
×
150
        where        /* works only for v9.1+*/
×
151
                c.relpersistence != 't';
×
152
        `
×
153
        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlApproxDBSize)
×
154
        if err != nil {
×
155
                return 0, err
×
156
        }
×
157
        return data[0]["db_size_approx"].(int64), nil
×
158
}
159

160
// VersionToInt parses a given version and returns an integer  or
161
// an error if unable to parse the version. Only parses valid semantic versions.
162
// Performs checking that can find errors within the version.
163
// Examples: v1.2 -> 01_02_00, v9.6.3 -> 09_06_03, v11 -> 11_00_00
164
var regVer = regexp.MustCompile(`(\d+).?(\d*).?(\d*)`)
165

166
func VersionToInt(version string) (v int) {
1✔
167
        if matches := regVer.FindStringSubmatch(version); len(matches) > 1 {
2✔
168
                for i, match := range matches[1:] {
2✔
169
                        v += func() (m int) { m, _ = strconv.Atoi(match); return }() * int(math.Pow10(4-i*2))
2✔
170
                }
171
        }
172
        return
1✔
173
}
174

175
var rBouncerAndPgpoolVerMatch = regexp.MustCompile(`\d+\.+\d+`) // extract $major.minor from "4.1.2 (karasukiboshi)" or "PgBouncer 1.12.0"
176

NEW
177
func GetMonitoredDatabaseSettings(ctx context.Context, dbUnique string, srcType sources.Kind, noCache bool) (MonitoredDatabaseSettings, error) {
×
NEW
178
        var dbSettings MonitoredDatabaseSettings
×
NEW
179
        var dbNewSettings MonitoredDatabaseSettings
×
180
        var ok bool
×
NEW
181

×
NEW
182
        l := log.GetLogger(ctx).WithField("source", dbUnique).WithField("kind", srcType)
×
NEW
183

×
184
        sqlExtensions := `select /* pgwatch3_generated */ extname::text, (regexp_matches(extversion, $$\d+\.?\d+?$$))[1]::text as extversion from pg_extension order by 1;`
×
185

×
NEW
186
        MonitoredDatabasesSettingsLock.Lock()
×
NEW
187
        getVerLock, ok := MonitoredDatabasesSettingsGetLock[dbUnique]
×
188
        if !ok {
×
NEW
189
                MonitoredDatabasesSettingsGetLock[dbUnique] = &sync.RWMutex{}
×
NEW
190
                getVerLock = MonitoredDatabasesSettingsGetLock[dbUnique]
×
191
        }
×
NEW
192
        dbSettings, ok = MonitoredDatabasesSettings[dbUnique]
×
NEW
193
        MonitoredDatabasesSettingsLock.Unlock()
×
194

×
NEW
195
        if !noCache && ok && dbSettings.LastCheckedOn.After(time.Now().Add(time.Minute*-2)) { // use cached version for 2 min
×
196
                //log.Debugf("using cached postgres version %s for %s", ver.Version.String(), dbUnique)
×
NEW
197
                return dbSettings, nil
×
198
        }
×
199
        getVerLock.Lock() // limit to 1 concurrent version info fetch per DB
×
200
        defer getVerLock.Unlock()
×
NEW
201
        l.Debug("determining DB version and recovery status...")
×
NEW
202

×
NEW
203
        if dbNewSettings.Extensions == nil {
×
NEW
204
                dbNewSettings.Extensions = make(map[string]int)
×
NEW
205
        }
×
206

NEW
207
        switch srcType {
×
NEW
208
        case sources.SourcePgBouncer:
×
NEW
209
                if err := GetConnByUniqueName(dbUnique).QueryRow(ctx, "SHOW VERSION").Scan(&dbNewSettings.VersionStr); err != nil {
×
NEW
210
                        return dbNewSettings, err
×
NEW
211
                }
×
NEW
212
                matches := rBouncerAndPgpoolVerMatch.FindStringSubmatch(dbNewSettings.VersionStr)
×
NEW
213
                if len(matches) != 1 {
×
NEW
214
                        return dbSettings, fmt.Errorf("Unexpected PgBouncer version input: %s", dbNewSettings.VersionStr)
×
NEW
215
                }
×
NEW
216
                dbNewSettings.Version = VersionToInt(matches[0])
×
NEW
217
        case sources.SourcePgPool:
×
NEW
218
                if err := GetConnByUniqueName(dbUnique).QueryRow(ctx, "SHOW POOL_VERSION").Scan(&dbNewSettings.VersionStr); err != nil {
×
NEW
219
                        return dbNewSettings, err
×
NEW
220
                }
×
221

NEW
222
                matches := rBouncerAndPgpoolVerMatch.FindStringSubmatch(dbNewSettings.VersionStr)
×
NEW
223
                if len(matches) != 1 {
×
NEW
224
                        return dbSettings, fmt.Errorf("Unexpected PgPool version input: %s", dbNewSettings.VersionStr)
×
NEW
225
                }
×
NEW
226
                dbNewSettings.Version = VersionToInt(matches[0])
×
NEW
227
        default:
×
NEW
228
                sql := `select /* pgwatch3_generated */ 
×
NEW
229
        current_setting('server_version_num')::int / 1_00_00 as ver, 
×
NEW
230
        version(), 
×
NEW
231
        pg_is_in_recovery(), 
×
NEW
232
        current_database()::TEXT,
×
NEW
233
        system_identifier
×
NEW
234
FROM
×
NEW
235
        pg_control_system()`
×
NEW
236

×
NEW
237
                err := GetConnByUniqueName(dbUnique).QueryRow(ctx, sql).
×
NEW
238
                        Scan(&dbNewSettings.Version, &dbNewSettings.VersionStr,
×
NEW
239
                                &dbNewSettings.IsInRecovery, &dbNewSettings.RealDbname,
×
NEW
240
                                &dbNewSettings.SystemIdentifier)
×
241
                if err != nil {
×
242
                        if noCache {
×
NEW
243
                                return dbSettings, err
×
244
                        }
×
NEW
245
                        l.Error("DBGetPGVersion failed, using old cached value: ", err)
×
NEW
246
                        return dbSettings, nil
×
247
                }
248

NEW
249
                if dbSettings.ExecEnv != "" {
×
NEW
250
                        dbNewSettings.ExecEnv = dbSettings.ExecEnv // carry over as not likely to change ever
×
251
                } else {
×
NEW
252
                        l.Debugf("determining the execution env...")
×
NEW
253
                        dbNewSettings.ExecEnv = TryDiscoverExecutionEnv(ctx, dbUnique)
×
UNCOV
254
                }
×
255

256
                // to work around poor Azure Single Server FS functions performance for some metrics + the --min-db-size-mb filter
NEW
257
                if dbNewSettings.ExecEnv == execEnvAzureSingle {
×
NEW
258
                        if approxSize, err := GetDBTotalApproxSize(ctx, dbUnique); err == nil {
×
NEW
259
                                dbNewSettings.ApproxDBSizeB = approxSize
×
260
                        } else {
×
NEW
261
                                dbNewSettings.ApproxDBSizeB = dbSettings.ApproxDBSizeB
×
262
                        }
×
263
                }
264

NEW
265
                l.Debugf("[%s] determining if monitoring user is a superuser...", dbUnique)
×
NEW
266
                sqlSu := `select /* pgwatch3_generated */ rolsuper from pg_roles r where rolname = session_user`
×
NEW
267

×
NEW
268
                if err = GetConnByUniqueName(dbUnique).QueryRow(ctx, sqlSu).Scan(&dbNewSettings.IsSuperuser); err != nil {
×
NEW
269
                        l.Errorf("[%s] failed to determine if monitoring user is a superuser: %v", dbUnique, err)
×
270
                }
×
271

NEW
272
                l.Debugf("[%s] determining installed extensions info...", dbUnique)
×
NEW
273
                data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlExtensions)
×
NEW
274
                if err != nil {
×
NEW
275
                        l.Errorf("[%s] failed to determine installed extensions info: %v", dbUnique, err)
×
NEW
276
                } else {
×
NEW
277
                        for _, dr := range data {
×
NEW
278
                                extver := VersionToInt(dr["extversion"].(string))
×
NEW
279
                                if extver == 0 {
×
NEW
280
                                        l.Error("failed to determine extension version info for extension: ", dr["extname"])
×
NEW
281
                                        continue
×
282
                                }
NEW
283
                                dbNewSettings.Extensions[dr["extname"].(string)] = extver
×
284
                        }
NEW
285
                        l.Debugf("[%s] installed extensions: %+v", dbUnique, dbNewSettings.Extensions)
×
286
                }
287

288
        }
289

NEW
290
        dbNewSettings.LastCheckedOn = time.Now()
×
NEW
291
        MonitoredDatabasesSettingsLock.Lock()
×
NEW
292
        MonitoredDatabasesSettings[dbUnique] = dbNewSettings
×
NEW
293
        MonitoredDatabasesSettingsLock.Unlock()
×
294

×
NEW
295
        return dbNewSettings, nil
×
296
}
297

NEW
298
func DetectSprocChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementMessage, hostState map[string]map[string]string) ChangeDetectionResults {
×
299
        detectedChanges := make(metrics.Measurements, 0)
×
300
        var firstRun bool
×
301
        var changeCounts ChangeDetectionResults
×
302

×
303
        log.GetLogger(ctx).Debugf("[%s][%s] checking for sproc changes...", dbUnique, specialMetricChangeEvents)
×
304
        if _, ok := hostState["sproc_hashes"]; !ok {
×
305
                firstRun = true
×
306
                hostState["sproc_hashes"] = make(map[string]string)
×
307
        }
×
308

309
        mvp, err := GetMetricVersionProperties("sproc_hashes", vme, nil)
×
310
        if err != nil {
×
311
                log.GetLogger(ctx).Error("could not get sproc_hashes sql:", err)
×
312
                return changeCounts
×
313
        }
×
314

315
        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
×
316
        if err != nil {
×
317
                log.GetLogger(ctx).Error("could not read sproc_hashes from monitored host: ", dbUnique, ", err:", err)
×
318
                return changeCounts
×
319
        }
×
320

321
        for _, dr := range data {
×
322
                objIdent := dr["tag_sproc"].(string) + dbMetricJoinStr + dr["tag_oid"].(string)
×
323
                prevHash, ok := hostState["sproc_hashes"][objIdent]
×
324
                if ok { // we have existing state
×
325
                        if prevHash != dr["md5"].(string) {
×
326
                                log.GetLogger(ctx).Info("detected change in sproc:", dr["tag_sproc"], ", oid:", dr["tag_oid"])
×
327
                                dr["event"] = "alter"
×
328
                                detectedChanges = append(detectedChanges, dr)
×
329
                                hostState["sproc_hashes"][objIdent] = dr["md5"].(string)
×
330
                                changeCounts.Altered++
×
331
                        }
×
332
                } else { // check for new / delete
×
333
                        if !firstRun {
×
334
                                log.GetLogger(ctx).Info("detected new sproc:", dr["tag_sproc"], ", oid:", dr["tag_oid"])
×
335
                                dr["event"] = "create"
×
336
                                detectedChanges = append(detectedChanges, dr)
×
337
                                changeCounts.Created++
×
338
                        }
×
339
                        hostState["sproc_hashes"][objIdent] = dr["md5"].(string)
×
340
                }
341
        }
342
        // detect deletes
343
        if !firstRun && len(hostState["sproc_hashes"]) != len(data) {
×
344
                deletedSProcs := make([]string, 0)
×
345
                // turn resultset to map => [oid]=true for faster checks
×
346
                currentOidMap := make(map[string]bool)
×
347
                for _, dr := range data {
×
348
                        currentOidMap[dr["tag_sproc"].(string)+dbMetricJoinStr+dr["tag_oid"].(string)] = true
×
349
                }
×
350
                for sprocIdent := range hostState["sproc_hashes"] {
×
351
                        _, ok := currentOidMap[sprocIdent]
×
352
                        if !ok {
×
353
                                splits := strings.Split(sprocIdent, dbMetricJoinStr)
×
354
                                log.GetLogger(ctx).Info("detected delete of sproc:", splits[0], ", oid:", splits[1])
×
355
                                influxEntry := make(metrics.Measurement)
×
356
                                influxEntry["event"] = "drop"
×
357
                                influxEntry["tag_sproc"] = splits[0]
×
358
                                influxEntry["tag_oid"] = splits[1]
×
359
                                if len(data) > 0 {
×
360
                                        influxEntry["epoch_ns"] = data[0]["epoch_ns"]
×
361
                                } else {
×
362
                                        influxEntry["epoch_ns"] = time.Now().UnixNano()
×
363
                                }
×
364
                                detectedChanges = append(detectedChanges, influxEntry)
×
365
                                deletedSProcs = append(deletedSProcs, sprocIdent)
×
366
                                changeCounts.Dropped++
×
367
                        }
368
                }
369
                for _, deletedSProc := range deletedSProcs {
×
370
                        delete(hostState["sproc_hashes"], deletedSProc)
×
371
                }
×
372
        }
373
        log.GetLogger(ctx).Debugf("[%s][%s] detected %d sproc changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
×
374
        if len(detectedChanges) > 0 {
×
375
                md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
×
376
                storageCh <- []metrics.MeasurementMessage{{DBName: dbUnique, MetricName: "sproc_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
×
377
        }
×
378

379
        return changeCounts
×
380
}
381

NEW
382
func DetectTableChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementMessage, hostState map[string]map[string]string) ChangeDetectionResults {
×
383
        detectedChanges := make(metrics.Measurements, 0)
×
384
        var firstRun bool
×
385
        var changeCounts ChangeDetectionResults
×
386

×
387
        log.GetLogger(ctx).Debugf("[%s][%s] checking for table changes...", dbUnique, specialMetricChangeEvents)
×
388
        if _, ok := hostState["table_hashes"]; !ok {
×
389
                firstRun = true
×
390
                hostState["table_hashes"] = make(map[string]string)
×
391
        }
×
392

393
        mvp, err := GetMetricVersionProperties("table_hashes", vme, nil)
×
394
        if err != nil {
×
395
                log.GetLogger(ctx).Error("could not get table_hashes sql:", err)
×
396
                return changeCounts
×
397
        }
×
398

399
        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
×
400
        if err != nil {
×
401
                log.GetLogger(ctx).Error("could not read table_hashes from monitored host:", dbUnique, ", err:", err)
×
402
                return changeCounts
×
403
        }
×
404

405
        for _, dr := range data {
×
406
                objIdent := dr["tag_table"].(string)
×
407
                prevHash, ok := hostState["table_hashes"][objIdent]
×
408
                //log.Debug("inspecting table:", objIdent, "hash:", prev_hash)
×
409
                if ok { // we have existing state
×
410
                        if prevHash != dr["md5"].(string) {
×
411
                                log.GetLogger(ctx).Info("detected DDL change in table:", dr["tag_table"])
×
412
                                dr["event"] = "alter"
×
413
                                detectedChanges = append(detectedChanges, dr)
×
414
                                hostState["table_hashes"][objIdent] = dr["md5"].(string)
×
415
                                changeCounts.Altered++
×
416
                        }
×
417
                } else { // check for new / delete
×
418
                        if !firstRun {
×
419
                                log.GetLogger(ctx).Info("detected new table:", dr["tag_table"])
×
420
                                dr["event"] = "create"
×
421
                                detectedChanges = append(detectedChanges, dr)
×
422
                                changeCounts.Created++
×
423
                        }
×
424
                        hostState["table_hashes"][objIdent] = dr["md5"].(string)
×
425
                }
426
        }
427
        // detect deletes
428
        if !firstRun && len(hostState["table_hashes"]) != len(data) {
×
429
                deletedTables := make([]string, 0)
×
430
                // turn resultset to map => [table]=true for faster checks
×
431
                currentTableMap := make(map[string]bool)
×
432
                for _, dr := range data {
×
433
                        currentTableMap[dr["tag_table"].(string)] = true
×
434
                }
×
435
                for table := range hostState["table_hashes"] {
×
436
                        _, ok := currentTableMap[table]
×
437
                        if !ok {
×
438
                                log.GetLogger(ctx).Info("detected drop of table:", table)
×
439
                                influxEntry := make(metrics.Measurement)
×
440
                                influxEntry["event"] = "drop"
×
441
                                influxEntry["tag_table"] = table
×
442
                                if len(data) > 0 {
×
443
                                        influxEntry["epoch_ns"] = data[0]["epoch_ns"]
×
444
                                } else {
×
445
                                        influxEntry["epoch_ns"] = time.Now().UnixNano()
×
446
                                }
×
447
                                detectedChanges = append(detectedChanges, influxEntry)
×
448
                                deletedTables = append(deletedTables, table)
×
449
                                changeCounts.Dropped++
×
450
                        }
451
                }
452
                for _, deletedTable := range deletedTables {
×
453
                        delete(hostState["table_hashes"], deletedTable)
×
454
                }
×
455
        }
456

457
        log.GetLogger(ctx).Debugf("[%s][%s] detected %d table changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
×
458
        if len(detectedChanges) > 0 {
×
459
                md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
×
460
                storageCh <- []metrics.MeasurementMessage{{DBName: dbUnique, MetricName: "table_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
×
461
        }
×
462

463
        return changeCounts
×
464
}
465

NEW
466
func DetectIndexChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementMessage, hostState map[string]map[string]string) ChangeDetectionResults {
×
467
        detectedChanges := make(metrics.Measurements, 0)
×
468
        var firstRun bool
×
469
        var changeCounts ChangeDetectionResults
×
470

×
471
        log.GetLogger(ctx).Debugf("[%s][%s] checking for index changes...", dbUnique, specialMetricChangeEvents)
×
472
        if _, ok := hostState["index_hashes"]; !ok {
×
473
                firstRun = true
×
474
                hostState["index_hashes"] = make(map[string]string)
×
475
        }
×
476

477
        mvp, err := GetMetricVersionProperties("index_hashes", vme, nil)
×
478
        if err != nil {
×
479
                log.GetLogger(ctx).Error("could not get index_hashes sql:", err)
×
480
                return changeCounts
×
481
        }
×
482

483
        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
×
484
        if err != nil {
×
485
                log.GetLogger(ctx).Error("could not read index_hashes from monitored host:", dbUnique, ", err:", err)
×
486
                return changeCounts
×
487
        }
×
488

489
        for _, dr := range data {
×
490
                objIdent := dr["tag_index"].(string)
×
491
                prevHash, ok := hostState["index_hashes"][objIdent]
×
492
                if ok { // we have existing state
×
493
                        if prevHash != (dr["md5"].(string) + dr["is_valid"].(string)) {
×
494
                                log.GetLogger(ctx).Info("detected index change:", dr["tag_index"], ", table:", dr["table"])
×
495
                                dr["event"] = "alter"
×
496
                                detectedChanges = append(detectedChanges, dr)
×
497
                                hostState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
×
498
                                changeCounts.Altered++
×
499
                        }
×
500
                } else { // check for new / delete
×
501
                        if !firstRun {
×
502
                                log.GetLogger(ctx).Info("detected new index:", dr["tag_index"])
×
503
                                dr["event"] = "create"
×
504
                                detectedChanges = append(detectedChanges, dr)
×
505
                                changeCounts.Created++
×
506
                        }
×
507
                        hostState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
×
508
                }
509
        }
510
        // detect deletes
511
        if !firstRun && len(hostState["index_hashes"]) != len(data) {
×
512
                deletedIndexes := make([]string, 0)
×
513
                // turn resultset to map => [table]=true for faster checks
×
514
                currentIndexMap := make(map[string]bool)
×
515
                for _, dr := range data {
×
516
                        currentIndexMap[dr["tag_index"].(string)] = true
×
517
                }
×
518
                for indexName := range hostState["index_hashes"] {
×
519
                        _, ok := currentIndexMap[indexName]
×
520
                        if !ok {
×
521
                                log.GetLogger(ctx).Info("detected drop of index_name:", indexName)
×
522
                                influxEntry := make(metrics.Measurement)
×
523
                                influxEntry["event"] = "drop"
×
524
                                influxEntry["tag_index"] = indexName
×
525
                                if len(data) > 0 {
×
526
                                        influxEntry["epoch_ns"] = data[0]["epoch_ns"]
×
527
                                } else {
×
528
                                        influxEntry["epoch_ns"] = time.Now().UnixNano()
×
529
                                }
×
530
                                detectedChanges = append(detectedChanges, influxEntry)
×
531
                                deletedIndexes = append(deletedIndexes, indexName)
×
532
                                changeCounts.Dropped++
×
533
                        }
534
                }
535
                for _, deletedIndex := range deletedIndexes {
×
536
                        delete(hostState["index_hashes"], deletedIndex)
×
537
                }
×
538
        }
539
        log.GetLogger(ctx).Debugf("[%s][%s] detected %d index changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
×
540
        if len(detectedChanges) > 0 {
×
541
                md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
×
542
                storageCh <- []metrics.MeasurementMessage{{DBName: dbUnique, MetricName: "index_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
×
543
        }
×
544

545
        return changeCounts
×
546
}
547

NEW
548
func DetectPrivilegeChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementMessage, hostState map[string]map[string]string) ChangeDetectionResults {
×
549
        detectedChanges := make(metrics.Measurements, 0)
×
550
        var firstRun bool
×
551
        var changeCounts ChangeDetectionResults
×
552

×
553
        log.GetLogger(ctx).Debugf("[%s][%s] checking object privilege changes...", dbUnique, specialMetricChangeEvents)
×
554
        if _, ok := hostState["object_privileges"]; !ok {
×
555
                firstRun = true
×
556
                hostState["object_privileges"] = make(map[string]string)
×
557
        }
×
558

559
        mvp, err := GetMetricVersionProperties("privilege_changes", vme, nil)
×
560
        if err != nil || mvp.GetSQL(int(vme.Version)) == "" {
×
561
                log.GetLogger(ctx).Warningf("[%s][%s] could not get SQL for 'privilege_changes'. cannot detect privilege changes", dbUnique, specialMetricChangeEvents)
×
562
                return changeCounts
×
563
        }
×
564

565
        // returns rows of: object_type, tag_role, tag_object, privilege_type
566
        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
×
567
        if err != nil {
×
568
                log.GetLogger(ctx).Errorf("[%s][%s] failed to fetch object privileges info: %v", dbUnique, specialMetricChangeEvents, err)
×
569
                return changeCounts
×
570
        }
×
571

572
        currentState := make(map[string]bool)
×
573
        for _, dr := range data {
×
574
                objIdent := fmt.Sprintf("%s#:#%s#:#%s#:#%s", dr["object_type"], dr["tag_role"], dr["tag_object"], dr["privilege_type"])
×
575
                if firstRun {
×
576
                        hostState["object_privileges"][objIdent] = ""
×
577
                } else {
×
578
                        _, ok := hostState["object_privileges"][objIdent]
×
579
                        if !ok {
×
580
                                log.GetLogger(ctx).Infof("[%s][%s] detected new object privileges: role=%s, object_type=%s, object=%s, privilege_type=%s",
×
581
                                        dbUnique, specialMetricChangeEvents, dr["tag_role"], dr["object_type"], dr["tag_object"], dr["privilege_type"])
×
582
                                dr["event"] = "GRANT"
×
583
                                detectedChanges = append(detectedChanges, dr)
×
584
                                changeCounts.Created++
×
585
                                hostState["object_privileges"][objIdent] = ""
×
586
                        }
×
587
                        currentState[objIdent] = true
×
588
                }
589
        }
590
        // check revokes - exists in old state only
591
        if !firstRun && len(currentState) > 0 {
×
592
                for objPrevRun := range hostState["object_privileges"] {
×
593
                        if _, ok := currentState[objPrevRun]; !ok {
×
594
                                splits := strings.Split(objPrevRun, "#:#")
×
595
                                log.GetLogger(ctx).Infof("[%s][%s] detected removed object privileges: role=%s, object_type=%s, object=%s, privilege_type=%s",
×
596
                                        dbUnique, specialMetricChangeEvents, splits[1], splits[0], splits[2], splits[3])
×
597
                                revokeEntry := make(metrics.Measurement)
×
598
                                if epochNs, ok := data[0]["epoch_ns"]; ok {
×
599
                                        revokeEntry["epoch_ns"] = epochNs
×
600
                                } else {
×
601
                                        revokeEntry["epoch_ns"] = time.Now().UnixNano()
×
602
                                }
×
603
                                revokeEntry["object_type"] = splits[0]
×
604
                                revokeEntry["tag_role"] = splits[1]
×
605
                                revokeEntry["tag_object"] = splits[2]
×
606
                                revokeEntry["privilege_type"] = splits[3]
×
607
                                revokeEntry["event"] = "REVOKE"
×
608
                                detectedChanges = append(detectedChanges, revokeEntry)
×
609
                                changeCounts.Dropped++
×
610
                                delete(hostState["object_privileges"], objPrevRun)
×
611
                        }
612
                }
613
        }
614

615
        log.GetLogger(ctx).Debugf("[%s][%s] detected %d object privilege changes...", dbUnique, specialMetricChangeEvents, len(detectedChanges))
×
616
        if len(detectedChanges) > 0 {
×
617
                md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
×
618
                storageCh <- []metrics.MeasurementMessage{
×
619
                        {
×
620
                                DBName:     dbUnique,
×
621
                                MetricName: "privilege_changes",
×
622
                                Data:       detectedChanges,
×
623
                                CustomTags: md.CustomTags,
×
624
                        }}
×
625
        }
×
626

627
        return changeCounts
×
628
}
629

NEW
630
func DetectConfigurationChanges(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementMessage, hostState map[string]map[string]string) ChangeDetectionResults {
×
631
        detectedChanges := make(metrics.Measurements, 0)
×
632
        var firstRun bool
×
633
        var changeCounts ChangeDetectionResults
×
634

×
635
        log.GetLogger(ctx).Debugf("[%s][%s] checking for configuration changes...", dbUnique, specialMetricChangeEvents)
×
636
        if _, ok := hostState["configuration_hashes"]; !ok {
×
637
                firstRun = true
×
638
                hostState["configuration_hashes"] = make(map[string]string)
×
639
        }
×
640

641
        mvp, err := GetMetricVersionProperties("configuration_hashes", vme, nil)
×
642
        if err != nil {
×
643
                log.GetLogger(ctx).Errorf("[%s][%s] could not get configuration_hashes sql: %v", dbUnique, specialMetricChangeEvents, err)
×
644
                return changeCounts
×
645
        }
×
646

647
        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
×
648
        if err != nil {
×
649
                log.GetLogger(ctx).Errorf("[%s][%s] could not read configuration_hashes from monitored host: %v", dbUnique, specialMetricChangeEvents, err)
×
650
                return changeCounts
×
651
        }
×
652

653
        for _, dr := range data {
×
654
                objIdent := dr["tag_setting"].(string)
×
655
                objValue := dr["value"].(string)
×
656
                prevРash, ok := hostState["configuration_hashes"][objIdent]
×
657
                if ok { // we have existing state
×
658
                        if prevРash != objValue {
×
659
                                if objIdent == "connection_ID" {
×
660
                                        continue // ignore some weird Azure managed PG service setting
×
661
                                }
662
                                log.GetLogger(ctx).Warningf("[%s][%s] detected settings change: %s = %s (prev: %s)",
×
663
                                        dbUnique, specialMetricChangeEvents, objIdent, objValue, prevРash)
×
664
                                dr["event"] = "alter"
×
665
                                detectedChanges = append(detectedChanges, dr)
×
666
                                hostState["configuration_hashes"][objIdent] = objValue
×
667
                                changeCounts.Altered++
×
668
                        }
669
                } else { // check for new, delete not relevant here (pg_upgrade)
×
670
                        if !firstRun {
×
671
                                log.GetLogger(ctx).Warningf("[%s][%s] detected new setting: %s", dbUnique, specialMetricChangeEvents, objIdent)
×
672
                                dr["event"] = "create"
×
673
                                detectedChanges = append(detectedChanges, dr)
×
674
                                changeCounts.Created++
×
675
                        }
×
676
                        hostState["configuration_hashes"][objIdent] = objValue
×
677
                }
678
        }
679

680
        log.GetLogger(ctx).Debugf("[%s][%s] detected %d configuration changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
×
681
        if len(detectedChanges) > 0 {
×
682
                md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
×
683
                storageCh <- []metrics.MeasurementMessage{{
×
684
                        DBName:     dbUnique,
×
685
                        MetricName: "configuration_changes",
×
686
                        Data:       detectedChanges,
×
687
                        CustomTags: md.CustomTags,
×
688
                }}
×
689
        }
×
690

691
        return changeCounts
×
692
}
693

NEW
694
func CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique string, vme MonitoredDatabaseSettings, storageCh chan<- []metrics.MeasurementMessage, hostState map[string]map[string]string) {
×
695
        sprocCounts := DetectSprocChanges(ctx, dbUnique, vme, storageCh, hostState) // TODO some of Detect*() code could be unified...
×
696
        tableCounts := DetectTableChanges(ctx, dbUnique, vme, storageCh, hostState)
×
697
        indexCounts := DetectIndexChanges(ctx, dbUnique, vme, storageCh, hostState)
×
698
        confCounts := DetectConfigurationChanges(ctx, dbUnique, vme, storageCh, hostState)
×
699
        privChangeCounts := DetectPrivilegeChanges(ctx, dbUnique, vme, storageCh, hostState)
×
700

×
701
        // need to send info on all object changes as one message as Grafana applies "last wins" for annotations with similar timestamp
×
702
        message := ""
×
703
        if sprocCounts.Altered > 0 || sprocCounts.Created > 0 || sprocCounts.Dropped > 0 {
×
704
                message += fmt.Sprintf(" sprocs %d/%d/%d", sprocCounts.Created, sprocCounts.Altered, sprocCounts.Dropped)
×
705
        }
×
706
        if tableCounts.Altered > 0 || tableCounts.Created > 0 || tableCounts.Dropped > 0 {
×
707
                message += fmt.Sprintf(" tables/views %d/%d/%d", tableCounts.Created, tableCounts.Altered, tableCounts.Dropped)
×
708
        }
×
709
        if indexCounts.Altered > 0 || indexCounts.Created > 0 || indexCounts.Dropped > 0 {
×
710
                message += fmt.Sprintf(" indexes %d/%d/%d", indexCounts.Created, indexCounts.Altered, indexCounts.Dropped)
×
711
        }
×
712
        if confCounts.Altered > 0 || confCounts.Created > 0 {
×
713
                message += fmt.Sprintf(" configuration %d/%d/%d", confCounts.Created, confCounts.Altered, confCounts.Dropped)
×
714
        }
×
715
        if privChangeCounts.Dropped > 0 || privChangeCounts.Created > 0 {
×
716
                message += fmt.Sprintf(" privileges %d/%d/%d", privChangeCounts.Created, privChangeCounts.Altered, privChangeCounts.Dropped)
×
717
        }
×
718

719
        if message > "" {
×
720
                message = "Detected changes for \"" + dbUnique + "\" [Created/Altered/Dropped]:" + message
×
721
                log.GetLogger(ctx).Info(message)
×
722
                detectedChangesSummary := make(metrics.Measurements, 0)
×
723
                influxEntry := make(metrics.Measurement)
×
724
                influxEntry["details"] = message
×
725
                influxEntry["epoch_ns"] = time.Now().UnixNano()
×
726
                detectedChangesSummary = append(detectedChangesSummary, influxEntry)
×
727
                md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
×
728
                storageCh <- []metrics.MeasurementMessage{{DBName: dbUnique,
×
729
                        SourceType: string(md.Kind),
×
730
                        MetricName: "object_changes",
×
731
                        Data:       detectedChangesSummary,
×
732
                        CustomTags: md.CustomTags,
×
733
                }}
×
734

×
735
        }
×
736
}
737

738
// some extra work needed as pgpool SHOW commands don't specify the return data types for some reason
NEW
739
func FetchMetricsPgpool(ctx context.Context, msg MetricFetchConfig, vme MonitoredDatabaseSettings, mvp metrics.Metric) (metrics.Measurements, error) {
×
740
        var retData = make(metrics.Measurements, 0)
×
741
        epochNs := time.Now().UnixNano()
×
742

×
743
        sqlLines := strings.Split(strings.ToUpper(mvp.GetSQL(int(vme.Version))), "\n")
×
744

×
745
        for _, sql := range sqlLines {
×
746
                if strings.HasPrefix(sql, "SHOW POOL_NODES") {
×
747
                        data, err := DBExecReadByDbUniqueName(ctx, msg.DBUniqueName, sql)
×
748
                        if err != nil {
×
749
                                log.GetLogger(ctx).Errorf("[%s][%s] Could not fetch PgPool statistics: %v", msg.DBUniqueName, msg.MetricName, err)
×
750
                                return data, err
×
751
                        }
×
752

753
                        for _, row := range data {
×
754
                                retRow := make(metrics.Measurement)
×
755
                                retRow[epochColumnName] = epochNs
×
756
                                for k, v := range row {
×
757
                                        vs := string(v.([]byte))
×
758
                                        // need 1 tag so that Influx would not merge rows
×
759
                                        if k == "node_id" {
×
760
                                                retRow["tag_node_id"] = vs
×
761
                                                continue
×
762
                                        }
763

764
                                        retRow[k] = vs
×
765
                                        if k == "status" { // was changed from numeric to string at some pgpool version so leave the string
×
766
                                                // but also add "status_num" field
×
767
                                                if vs == "up" {
×
768
                                                        retRow["status_num"] = 1
×
769
                                                } else if vs == "down" {
×
770
                                                        retRow["status_num"] = 0
×
771
                                                } else {
×
772
                                                        i, err := strconv.ParseInt(vs, 10, 64)
×
773
                                                        if err == nil {
×
774
                                                                retRow["status_num"] = i
×
775
                                                        }
×
776
                                                }
777
                                                continue
×
778
                                        }
779
                                        // everything is returned as text, so try to convert all numerics into ints / floats
780
                                        if k != "lb_weight" {
×
781
                                                i, err := strconv.ParseInt(vs, 10, 64)
×
782
                                                if err == nil {
×
783
                                                        retRow[k] = i
×
784
                                                        continue
×
785
                                                }
786
                                        }
787
                                        f, err := strconv.ParseFloat(vs, 64)
×
788
                                        if err == nil {
×
789
                                                retRow[k] = f
×
790
                                                continue
×
791
                                        }
792
                                }
793
                                retData = append(retData, retRow)
×
794
                        }
795
                } else if strings.HasPrefix(sql, "SHOW POOL_PROCESSES") {
×
796
                        if len(retData) == 0 {
×
797
                                log.GetLogger(ctx).Warningf("[%s][%s] SHOW POOL_NODES needs to be placed before SHOW POOL_PROCESSES. ignoring SHOW POOL_PROCESSES", msg.DBUniqueName, msg.MetricName)
×
798
                                continue
×
799
                        }
800

801
                        data, err := DBExecReadByDbUniqueName(ctx, msg.DBUniqueName, sql)
×
802
                        if err != nil {
×
803
                                log.GetLogger(ctx).Errorf("[%s][%s] Could not fetch PgPool statistics: %v", msg.DBUniqueName, msg.MetricName, err)
×
804
                                continue
×
805
                        }
806

807
                        // summarize processesTotal / processes_active over all rows
808
                        processesTotal := 0
×
809
                        processesActive := 0
×
810
                        for _, row := range data {
×
811
                                processesTotal++
×
812
                                v, ok := row["database"]
×
813
                                if !ok {
×
814
                                        log.GetLogger(ctx).Infof("[%s][%s] column 'database' not found from data returned by SHOW POOL_PROCESSES, check pool version / SQL definition", msg.DBUniqueName, msg.MetricName)
×
815
                                        continue
×
816
                                }
817
                                if len(v.([]byte)) > 0 {
×
818
                                        processesActive++
×
819
                                }
×
820
                        }
821

822
                        for _, retRow := range retData {
×
823
                                retRow["processes_total"] = processesTotal
×
824
                                retRow["processes_active"] = processesActive
×
825
                        }
×
826
                }
827
        }
828
        return retData, nil
×
829
}
830

831
func DoesFunctionExists(ctx context.Context, dbUnique, functionName string) bool {
×
832
        log.GetLogger(ctx).Debug("Checking for function existence", dbUnique, functionName)
×
833
        sql := fmt.Sprintf("select /* pgwatch3_generated */ 1 from pg_proc join pg_namespace n on pronamespace = n.oid where proname = '%s' and n.nspname = 'public'", functionName)
×
834
        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sql)
×
835
        if err != nil {
×
836
                log.GetLogger(ctx).Error("Failed to check for function existence", dbUnique, functionName, err)
×
837
                return false
×
838
        }
×
839
        if len(data) > 0 {
×
840
                log.GetLogger(ctx).Debugf("Function %s exists on %s", functionName, dbUnique)
×
841
                return true
×
842
        }
×
843
        return false
×
844
}
845

846
// Called once on daemon startup if some commonly wanted extension (most notably pg_stat_statements) is missing.
847
// With newer Postgres version can even succeed if the user is not a real superuser due to some cloud-specific
848
// whitelisting or "trusted extensions" (a feature from v13). Ignores errors.
849
func TryCreateMissingExtensions(ctx context.Context, dbUnique string, extensionNames []string, existingExtensions map[string]int) []string {
×
850
        sqlAvailable := `select name::text from pg_available_extensions`
×
851
        extsCreated := make([]string, 0)
×
852

×
853
        // For security reasons don't allow to execute random strings but check that it's an existing extension
×
854
        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlAvailable)
×
855
        if err != nil {
×
856
                log.GetLogger(ctx).Infof("[%s] Failed to get a list of available extensions: %v", dbUnique, err)
×
857
                return extsCreated
×
858
        }
×
859

860
        availableExts := make(map[string]bool)
×
861
        for _, row := range data {
×
862
                availableExts[row["name"].(string)] = true
×
863
        }
×
864

865
        for _, extToCreate := range extensionNames {
×
866
                if _, ok := existingExtensions[extToCreate]; ok {
×
867
                        continue
×
868
                }
869
                _, ok := availableExts[extToCreate]
×
870
                if !ok {
×
871
                        log.GetLogger(ctx).Errorf("[%s] Requested extension %s not available on instance, cannot try to create...", dbUnique, extToCreate)
×
872
                } else {
×
873
                        sqlCreateExt := `create extension ` + extToCreate
×
874
                        _, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlCreateExt)
×
875
                        if err != nil {
×
876
                                log.GetLogger(ctx).Errorf("[%s] Failed to create extension %s (based on --try-create-listed-exts-if-missing input): %v", dbUnique, extToCreate, err)
×
877
                        }
×
878
                        extsCreated = append(extsCreated, extToCreate)
×
879
                }
880
        }
881

882
        return extsCreated
×
883
}
884

885
// Called once on daemon startup to try to create "metric fething helper" functions automatically
886
func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.MonitoredDatabase) (err error) {
×
887
        metricConfig := func() map[string]float64 {
×
888
                if len(md.Metrics) > 0 {
×
889
                        return md.Metrics
×
890
                }
×
891
                if md.PresetMetrics > "" {
×
892
                        return metricDefinitionMap.PresetDefs[md.PresetMetrics].Metrics
×
893
                }
×
894
                return nil
×
895
        }()
896
        conf, err := pgx.ParseConfig(md.ConnStr)
×
897
        if err != nil {
×
898
                return err
×
899
        }
×
900
        conf.DefaultQueryExecMode = pgx.QueryExecModeExec
×
901
        c, err := pgx.ConnectConfig(ctx, conf)
×
902
        if err != nil {
×
903
                return nil
×
904
        }
×
905
        defer c.Close(ctx)
×
906

×
907
        for metricName := range metricConfig {
×
908
                Metric := metricDefinitionMap.MetricDefs[metricName]
×
909
                if Metric.InitSQL == "" {
×
910
                        continue
×
911
                }
912

913
                _, err = c.Exec(ctx, Metric.InitSQL)
×
914
                if err != nil {
×
915
                        log.GetLogger(ctx).Warningf("Failed to create a metric fetching helper for %s in %s: %v", md.DBUniqueName, metricName, err)
×
916
                } else {
×
917
                        log.GetLogger(ctx).Info("Successfully created metric fetching helper for", md.DBUniqueName, metricName)
×
918
                }
×
919
        }
920
        return nil
×
921
}
922

923
// connects actually to the instance to determine PG relevant disk paths / mounts
924
func GetGoPsutilDiskPG(ctx context.Context, dbUnique string) (metrics.Measurements, error) {
×
925
        sql := `select current_setting('data_directory') as dd, current_setting('log_directory') as ld, current_setting('server_version_num')::int as pgver`
×
926
        sqlTS := `select spcname::text as name, pg_catalog.pg_tablespace_location(oid) as location from pg_catalog.pg_tablespace where not spcname like any(array[E'pg\\_%'])`
×
927
        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sql)
×
928
        if err != nil || len(data) == 0 {
×
929
                log.GetLogger(ctx).Errorf("Failed to determine relevant PG disk paths via SQL: %v", err)
×
930
                return nil, err
×
931
        }
×
932
        dataTblsp, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlTS)
×
933
        if err != nil {
×
934
                log.GetLogger(ctx).Infof("Failed to determine relevant PG tablespace paths via SQL: %v", err)
×
935
        }
×
936
        return psutil.GetGoPsutilDiskPG(data, dataTblsp)
×
937
}
938

939
func CloseResourcesForRemovedMonitoredDBs(metricsWriter *sinks.MultiWriter, currentDBs, prevLoopDBs sources.MonitoredDatabases, shutDownDueToRoleChange map[string]bool) {
×
940
        var curDBsMap = make(map[string]bool)
×
941

×
942
        for _, curDB := range currentDBs {
×
943
                curDBsMap[curDB.DBUniqueName] = true
×
944
        }
×
945

946
        for _, prevDB := range prevLoopDBs {
×
947
                if _, ok := curDBsMap[prevDB.DBUniqueName]; !ok { // removed from config
×
948
                        prevDB.Conn.Close()
×
949
                        _ = metricsWriter.SyncMetrics(prevDB.DBUniqueName, "", "remove")
×
950
                }
×
951
        }
952

953
        // or to be ignored due to current instance state
954
        for roleChangedDB := range shutDownDueToRoleChange {
×
955
                if db := currentDBs.GetMonitoredDatabase(roleChangedDB); db != nil {
×
956
                        db.Conn.Close()
×
957
                }
×
958
                _ = metricsWriter.SyncMetrics(roleChangedDB, "", "remove")
×
959
        }
960
}
961

962
func SetDBUnreachableState(dbUnique string) {
×
963
        unreachableDBsLock.Lock()
×
964
        unreachableDB[dbUnique] = time.Now()
×
965
        unreachableDBsLock.Unlock()
×
966
}
×
967

968
func ClearDBUnreachableStateIfAny(dbUnique string) {
×
969
        unreachableDBsLock.Lock()
×
970
        delete(unreachableDB, dbUnique)
×
971
        unreachableDBsLock.Unlock()
×
972
}
×
973

974
func SetUndersizedDBState(dbUnique string, state bool) {
×
975
        undersizedDBsLock.Lock()
×
976
        undersizedDBs[dbUnique] = state
×
977
        undersizedDBsLock.Unlock()
×
978
}
×
979

980
func IsDBUndersized(dbUnique string) bool {
×
981
        undersizedDBsLock.RLock()
×
982
        defer undersizedDBsLock.RUnlock()
×
983
        undersized, ok := undersizedDBs[dbUnique]
×
984
        if ok {
×
985
                return undersized
×
986
        }
×
987
        return false
×
988
}
989

990
func SetRecoveryIgnoredDBState(dbUnique string, state bool) {
×
991
        recoveryIgnoredDBsLock.Lock()
×
992
        recoveryIgnoredDBs[dbUnique] = state
×
993
        recoveryIgnoredDBsLock.Unlock()
×
994
}
×
995

996
func IsDBIgnoredBasedOnRecoveryState(dbUnique string) bool {
×
997
        recoveryIgnoredDBsLock.RLock()
×
998
        defer recoveryIgnoredDBsLock.RUnlock()
×
999
        recoveryIgnored, ok := recoveryIgnoredDBs[dbUnique]
×
1000
        if ok {
×
1001
                return recoveryIgnored
×
1002
        }
×
1003
        return false
×
1004
}
1005

1006
func IsDBDormant(dbUnique string) bool {
×
1007
        return IsDBUndersized(dbUnique) || IsDBIgnoredBasedOnRecoveryState(dbUnique)
×
1008
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc