• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cybertec-postgresql / pgwatch3 / 9921437457

13 Jul 2024 04:44PM UTC coverage: 20.233% (+1.5%) from 18.699%
9921437457

push

github

web-flow
[!] differentiate the concepts of source and monitored database (#472)

Source represents a configuration how to get databases to monitor.
It can be a single database, a group of databases in postgres cluster,
a group of databases in HA patroni cluster.
MonitoredDatabase represents a single database to monitor. Unlike source,
it means a single physical database connection. Continuous discovery
sources (postgres-continuous-discovery, patroni-continuous-discovery,
patroni-namespace-discovery) will produce multiple monitored databases.
pgwatch first reads sources (single dbs, postgres and patroni clusters)
and then lists all found databases in those as monitored databases

* [!] differentiate between the concepts of source and monitored database
* [*] rename api methods
* [+] add sources tests
* [-] fix postgres_test

62 of 118 new or added lines in 8 files covered. (52.54%)

9 existing lines in 5 files now uncovered.

939 of 4641 relevant lines covered (20.23%)

0.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.6
/src/reaper/database.go
1
package reaper
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "math"
8
        "regexp"
9
        "strconv"
10
        "strings"
11
        "sync"
12
        "time"
13

14
        "github.com/cybertec-postgresql/pgwatch3/db"
15
        "github.com/cybertec-postgresql/pgwatch3/log"
16
        "github.com/cybertec-postgresql/pgwatch3/metrics"
17
        "github.com/cybertec-postgresql/pgwatch3/metrics/psutil"
18
        "github.com/cybertec-postgresql/pgwatch3/sinks"
19
        "github.com/cybertec-postgresql/pgwatch3/sources"
20
        "github.com/jackc/pgx/v5"
21
        "github.com/jackc/pgx/v5/pgxpool"
22
)
23

24
// every DB under monitoring should have exactly 1 sql.DB connection assigned, that will internally limit parallel access
25
func InitSQLConnPoolForMonitoredDBIfNil(ctx context.Context, md *sources.MonitoredDatabase, maxConns int) (err error) {
×
26
        conn := md.Conn
×
27
        if conn != nil {
×
28
                return nil
×
29
        }
×
30

31
        md.Conn, err = db.New(ctx, md.ConnStr, func(conf *pgxpool.Config) error {
×
32
                conf.MaxConns = int32(maxConns)
×
33
                return nil
×
34
        })
×
35
        if err != nil {
×
36
                return err
×
37
        }
×
38

39
        return nil
×
40
}
41

42
func DBExecRead(ctx context.Context, conn db.PgxIface, sql string, args ...any) (metrics.Measurements, error) {
×
43
        rows, err := conn.Query(ctx, sql, args...)
×
44
        if err == nil {
×
45
                return pgx.CollectRows(rows, pgx.RowToMap)
×
46
        }
×
47
        return nil, err
×
48
}
49

50
func GetConnByUniqueName(dbUnique string) db.PgxIface {
×
51
        if md, err := GetMonitoredDatabaseByUniqueName(dbUnique); err == nil {
×
52
                return md.Conn
×
53
        }
×
54
        return nil
×
55
}
56

57
func DBExecReadByDbUniqueName(ctx context.Context, dbUnique string, sql string, args ...any) (metrics.Measurements, error) {
×
58
        var conn db.PgxIface
×
59
        var md *sources.MonitoredDatabase
×
60
        var err error
×
61
        var tx pgx.Tx
×
62
        if strings.TrimSpace(sql) == "" {
×
63
                return nil, errors.New("empty SQL")
×
64
        }
×
65
        if md, err = GetMonitoredDatabaseByUniqueName(dbUnique); err != nil {
×
66
                return nil, err
×
67
        }
×
68
        if conn = GetConnByUniqueName(dbUnique); conn == nil {
×
69
                log.GetLogger(ctx).Errorf("SQL connection for dbUnique %s not found or nil", dbUnique) // Should always be initialized in the main loop DB discovery code ...
×
70
                return nil, errors.New("SQL connection not found or nil")
×
71
        }
×
72
        if tx, err = conn.Begin(ctx); err != nil {
×
73
                return nil, err
×
74
        }
×
75
        defer func() { _ = tx.Commit(ctx) }()
×
76
        if md.IsPostgresSource() {
×
77
                _, err = tx.Exec(ctx, "SET LOCAL lock_timeout TO '100ms'")
×
78
                if err != nil {
×
79
                        return nil, err
×
80
                }
×
81
        }
82
        return DBExecRead(ctx, tx, sql, args...)
×
83
}
84

85
const (
86
        execEnvUnknown       = "UNKNOWN"
87
        execEnvAzureSingle   = "AZURE_SINGLE"
88
        execEnvAzureFlexible = "AZURE_FLEXIBLE"
89
        execEnvGoogle        = "GOOGLE"
90
)
91

92
func DBGetSizeMB(ctx context.Context, dbUnique string) (int64, error) {
×
93
        sqlDbSize := `select /* pgwatch3_generated */ pg_database_size(current_database());`
×
94
        var sizeMB int64
×
95

×
96
        lastDBSizeCheckLock.RLock()
×
97
        lastDBSizeCheckTime := lastDBSizeFetchTime[dbUnique]
×
98
        lastDBSize, ok := lastDBSizeMB[dbUnique]
×
99
        lastDBSizeCheckLock.RUnlock()
×
100

×
101
        if !ok || lastDBSizeCheckTime.Add(dbSizeCachingInterval).Before(time.Now()) {
×
102
                ver, err := DBGetPGVersion(ctx, dbUnique, sources.SourcePostgres, false, "")
×
103
                if err != nil || (ver.ExecEnv != execEnvAzureSingle) || (ver.ExecEnv == execEnvAzureSingle && ver.ApproxDBSizeB < 1e12) {
×
104
                        log.GetLogger(ctx).Debugf("[%s] determining DB size ...", dbUnique)
×
105

×
106
                        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlDbSize) // can take some time on ancient FS, use 300s stmt timeout
×
107
                        if err != nil {
×
108
                                log.GetLogger(ctx).Errorf("[%s] failed to determine DB size...cannot apply --min-db-size-mb flag. err: %v ...", dbUnique, err)
×
109
                                return 0, err
×
110
                        }
×
111
                        sizeMB = data[0]["pg_database_size"].(int64) / 1048576
×
112
                } else {
×
113
                        log.GetLogger(ctx).Debugf("[%s] Using approx DB size for the --min-db-size-mb filter ...", dbUnique)
×
114
                        sizeMB = ver.ApproxDBSizeB / 1048576
×
115
                }
×
116

117
                log.GetLogger(ctx).Debugf("[%s] DB size = %d MB, caching for %v ...", dbUnique, sizeMB, dbSizeCachingInterval)
×
118

×
119
                lastDBSizeCheckLock.Lock()
×
120
                lastDBSizeFetchTime[dbUnique] = time.Now()
×
121
                lastDBSizeMB[dbUnique] = sizeMB
×
122
                lastDBSizeCheckLock.Unlock()
×
123

×
124
                return sizeMB, nil
×
125

126
        }
127
        log.GetLogger(ctx).Debugf("[%s] using cached DBsize %d MB for the --min-db-size-mb filter check", dbUnique, lastDBSize)
×
128
        return lastDBSize, nil
×
129
}
130

131
func TryDiscoverExecutionEnv(ctx context.Context, dbUnique string) string {
×
132
        sqlPGExecEnv := `select /* pgwatch3_generated */
×
133
        case
×
134
          when exists (select * from pg_settings where name = 'pg_qs.host_database' and setting = 'azure_sys') and version() ~* 'compiled by Visual C' then 'AZURE_SINGLE'
×
135
          when exists (select * from pg_settings where name = 'pg_qs.host_database' and setting = 'azure_sys') and version() ~* 'compiled by gcc' then 'AZURE_FLEXIBLE'
×
136
          when exists (select * from pg_settings where name = 'cloudsql.supported_extensions') then 'GOOGLE'
×
137
        else
×
138
          'UNKNOWN'
×
139
        end as exec_env;
×
140
  `
×
141
        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlPGExecEnv)
×
142
        if err != nil {
×
143
                return ""
×
144
        }
×
145
        return data[0]["exec_env"].(string)
×
146
}
147

148
func GetDBTotalApproxSize(ctx context.Context, dbUnique string) (int64, error) {
×
149
        sqlApproxDBSize := `
×
150
        select /* pgwatch3_generated */
×
151
                current_setting('block_size')::int8 * sum(relpages) as db_size_approx
×
152
        from
×
153
                pg_class c
×
154
        where        /* works only for v9.1+*/
×
155
                c.relpersistence != 't';
×
156
        `
×
157
        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlApproxDBSize)
×
158
        if err != nil {
×
159
                return 0, err
×
160
        }
×
161
        return data[0]["db_size_approx"].(int64), nil
×
162
}
163

164
// VersionToInt parses a given version and returns an integer  or
165
// an error if unable to parse the version. Only parses valid semantic versions.
166
// Performs checking that can find errors within the version.
167
// Examples: v1.2 -> 01_02_00, v9.6.3 -> 09_06_03, v11 -> 11_00_00
168
var regVer = regexp.MustCompile(`(\d+).?(\d*).?(\d*)`)
169

170
func VersionToInt(version string) (v int) {
1✔
171
        if matches := regVer.FindStringSubmatch(version); len(matches) > 1 {
2✔
172
                for i, match := range matches[1:] {
2✔
173
                        v += func() (m int) { m, _ = strconv.Atoi(match); return }() * int(math.Pow10(4-i*2))
2✔
174
                }
175
        }
176
        return
1✔
177
}
178

179
const MinExtensionInfoAvailable = 9_01_00
180

181
var rBouncerAndPgpoolVerMatch = regexp.MustCompile(`\d+\.+\d+`) // extract $major.minor from "4.1.2 (karasukiboshi)" or "PgBouncer 1.12.0"
182

183
func DBGetPGVersion(ctx context.Context, dbUnique string, srcType sources.Kind, noCache bool, SysID string) (DBVersionMapEntry, error) {
×
184
        var ver DBVersionMapEntry
×
185
        var verNew DBVersionMapEntry
×
186
        var ok bool
×
187
        sql := `
×
188
                select /* pgwatch3_generated */ (regexp_matches(
×
189
                        regexp_replace(current_setting('server_version'), '(beta|devel).*', '', 'g'),
×
190
                        E'\\d+\\.?\\d+?')
×
191
                        )[1]::text as ver, pg_is_in_recovery(), current_database()::text;
×
192
        `
×
193
        sqlSysid := `select /* pgwatch3_generated */ system_identifier::text from pg_control_system();`
×
194
        sqlSu := `select /* pgwatch3_generated */ rolsuper
×
195
                           from pg_roles r where rolname = session_user;`
×
196
        sqlExtensions := `select /* pgwatch3_generated */ extname::text, (regexp_matches(extversion, $$\d+\.?\d+?$$))[1]::text as extversion from pg_extension order by 1;`
×
197
        pgpoolVersion := `SHOW POOL_VERSION` // supported from pgpool2 v3.0
×
198

×
199
        dbPgVersionMapLock.Lock()
×
200
        getVerLock, ok := dbGetPgVersionMapLock[dbUnique]
×
201
        if !ok {
×
202
                dbGetPgVersionMapLock[dbUnique] = &sync.RWMutex{}
×
203
                getVerLock = dbGetPgVersionMapLock[dbUnique]
×
204
        }
×
205
        ver, ok = dbPgVersionMap[dbUnique]
×
206
        dbPgVersionMapLock.Unlock()
×
207

×
208
        if !noCache && ok && ver.LastCheckedOn.After(time.Now().Add(time.Minute*-2)) { // use cached version for 2 min
×
209
                //log.Debugf("using cached postgres version %s for %s", ver.Version.String(), dbUnique)
×
210
                return ver, nil
×
211
        }
×
212
        getVerLock.Lock() // limit to 1 concurrent version info fetch per DB
×
213
        defer getVerLock.Unlock()
×
214
        log.GetLogger(ctx).WithField("source", dbUnique).
×
215
                WithField("type", srcType).Debug("determining DB version and recovery status...")
×
216

×
217
        if verNew.Extensions == nil {
×
218
                verNew.Extensions = make(map[string]int)
×
219
        }
×
220

221
        if srcType == sources.SourcePgBouncer {
×
222
                data, err := DBExecReadByDbUniqueName(ctx, dbUnique, "show version")
×
223
                if err != nil {
×
224
                        return verNew, err
×
225
                }
×
226
                if len(data) == 0 {
×
227
                        // surprisingly pgbouncer 'show version' outputs in pre v1.12 is emitted as 'NOTICE' which cannot be accessed from Go lib/pg
×
228
                        verNew.Version = 0
×
229
                        verNew.VersionStr = "0"
×
230
                } else {
×
231
                        matches := rBouncerAndPgpoolVerMatch.FindStringSubmatch(data[0]["version"].(string))
×
232
                        if len(matches) != 1 {
×
233
                                log.GetLogger(ctx).Errorf("[%s] Unexpected PgBouncer version input: %s", dbUnique, data[0]["version"].(string))
×
234
                                return ver, fmt.Errorf("Unexpected PgBouncer version input: %s", data[0]["version"].(string))
×
235
                        }
×
236
                        verNew.VersionStr = matches[0]
×
237
                        verNew.Version = VersionToInt(matches[0])
×
238
                }
239
        } else if srcType == sources.SourcePgPool {
×
240
                data, err := DBExecReadByDbUniqueName(ctx, dbUnique, pgpoolVersion)
×
241
                if err != nil {
×
242
                        return verNew, err
×
243
                }
×
244
                if len(data) == 0 {
×
245
                        verNew.Version = 3_00_00
×
246
                        verNew.VersionStr = "3.0"
×
247
                } else {
×
248
                        matches := rBouncerAndPgpoolVerMatch.FindStringSubmatch(string(data[0]["pool_version"].([]byte)))
×
249
                        if len(matches) != 1 {
×
250
                                log.GetLogger(ctx).Errorf("[%s] Unexpected PgPool version input: %s", dbUnique, data[0]["pool_version"].([]byte))
×
251
                                return ver, fmt.Errorf("Unexpected PgPool version input: %s", data[0]["pool_version"].([]byte))
×
252
                        }
×
253
                        verNew.VersionStr = matches[0]
×
254
                        verNew.Version = VersionToInt(matches[0])
×
255
                }
256
        } else {
×
257
                data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sql)
×
258
                if err != nil {
×
259
                        if noCache {
×
260
                                return ver, err
×
261
                        }
×
262
                        log.GetLogger(ctx).Infof("[%s] DBGetPGVersion failed, using old cached value. err: %v", dbUnique, err)
×
263
                        return ver, nil
×
264

265
                }
266
                verNew.Version = VersionToInt(data[0]["ver"].(string))
×
267
                verNew.VersionStr = data[0]["ver"].(string)
×
268
                verNew.IsInRecovery = data[0]["pg_is_in_recovery"].(bool)
×
269
                verNew.RealDbname = data[0]["current_database"].(string)
×
270

×
271
                if verNew.Version > 100000 && SysID > "" {
×
272
                        log.GetLogger(ctx).Debugf("[%s] determining system identifier version (pg ver: %v)", dbUnique, verNew.VersionStr)
×
273
                        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlSysid)
×
274
                        if err == nil && len(data) > 0 {
×
275
                                verNew.SystemIdentifier = data[0]["system_identifier"].(string)
×
276
                        }
×
277
                }
278

279
                if ver.ExecEnv != "" {
×
280
                        verNew.ExecEnv = ver.ExecEnv // carry over as not likely to change ever
×
281
                } else {
×
282
                        log.GetLogger(ctx).Debugf("[%s] determining the execution env...", dbUnique)
×
283
                        execEnv := TryDiscoverExecutionEnv(ctx, dbUnique)
×
284
                        if execEnv != "" {
×
285
                                log.GetLogger(ctx).Debugf("[%s] running on execution env: %s", dbUnique, execEnv)
×
286
                                verNew.ExecEnv = execEnv
×
287
                        }
×
288
                }
289

290
                // to work around poor Azure Single Server FS functions performance for some metrics + the --min-db-size-mb filter
291
                if verNew.ExecEnv == execEnvAzureSingle {
×
292
                        approxSize, err := GetDBTotalApproxSize(ctx, dbUnique)
×
293
                        if err == nil {
×
294
                                verNew.ApproxDBSizeB = approxSize
×
295
                        } else {
×
296
                                verNew.ApproxDBSizeB = ver.ApproxDBSizeB
×
297
                        }
×
298
                }
299

300
                log.GetLogger(ctx).Debugf("[%s] determining if monitoring user is a superuser...", dbUnique)
×
301
                data, err = DBExecReadByDbUniqueName(ctx, dbUnique, sqlSu)
×
302
                if err == nil {
×
303
                        verNew.IsSuperuser = data[0]["rolsuper"].(bool)
×
304
                }
×
305
                log.GetLogger(ctx).Debugf("[%s] superuser=%v", dbUnique, verNew.IsSuperuser)
×
306

×
307
                if verNew.Version >= MinExtensionInfoAvailable {
×
308
                        //log.Debugf("[%s] determining installed extensions info...", dbUnique)
×
309
                        data, err = DBExecReadByDbUniqueName(ctx, dbUnique, sqlExtensions)
×
310
                        if err != nil {
×
311
                                log.GetLogger(ctx).Errorf("[%s] failed to determine installed extensions info: %v", dbUnique, err)
×
312
                        } else {
×
313
                                for _, dr := range data {
×
314
                                        extver := VersionToInt(dr["extversion"].(string))
×
315
                                        if extver == 0 {
×
316
                                                log.GetLogger(ctx).Error("[%s] failed to determine extension version info for extension %s: %v", dbUnique, dr["extname"])
×
317
                                                continue
×
318
                                        }
319
                                        verNew.Extensions[dr["extname"].(string)] = extver
×
320
                                }
321
                                log.GetLogger(ctx).Debugf("[%s] installed extensions: %+v", dbUnique, verNew.Extensions)
×
322
                        }
323
                }
324
        }
325

326
        verNew.LastCheckedOn = time.Now()
×
327
        dbPgVersionMapLock.Lock()
×
328
        dbPgVersionMap[dbUnique] = verNew
×
329
        dbPgVersionMapLock.Unlock()
×
330

×
331
        return verNew, nil
×
332
}
333

334
func DetectSprocChanges(ctx context.Context, dbUnique string, vme DBVersionMapEntry, storageCh chan<- []metrics.MeasurementMessage, hostState map[string]map[string]string) ChangeDetectionResults {
×
335
        detectedChanges := make(metrics.Measurements, 0)
×
336
        var firstRun bool
×
337
        var changeCounts ChangeDetectionResults
×
338

×
339
        log.GetLogger(ctx).Debugf("[%s][%s] checking for sproc changes...", dbUnique, specialMetricChangeEvents)
×
340
        if _, ok := hostState["sproc_hashes"]; !ok {
×
341
                firstRun = true
×
342
                hostState["sproc_hashes"] = make(map[string]string)
×
343
        }
×
344

345
        mvp, err := GetMetricVersionProperties("sproc_hashes", vme, nil)
×
346
        if err != nil {
×
347
                log.GetLogger(ctx).Error("could not get sproc_hashes sql:", err)
×
348
                return changeCounts
×
349
        }
×
350

351
        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
×
352
        if err != nil {
×
353
                log.GetLogger(ctx).Error("could not read sproc_hashes from monitored host: ", dbUnique, ", err:", err)
×
354
                return changeCounts
×
355
        }
×
356

357
        for _, dr := range data {
×
358
                objIdent := dr["tag_sproc"].(string) + dbMetricJoinStr + dr["tag_oid"].(string)
×
359
                prevHash, ok := hostState["sproc_hashes"][objIdent]
×
360
                if ok { // we have existing state
×
361
                        if prevHash != dr["md5"].(string) {
×
362
                                log.GetLogger(ctx).Info("detected change in sproc:", dr["tag_sproc"], ", oid:", dr["tag_oid"])
×
363
                                dr["event"] = "alter"
×
364
                                detectedChanges = append(detectedChanges, dr)
×
365
                                hostState["sproc_hashes"][objIdent] = dr["md5"].(string)
×
366
                                changeCounts.Altered++
×
367
                        }
×
368
                } else { // check for new / delete
×
369
                        if !firstRun {
×
370
                                log.GetLogger(ctx).Info("detected new sproc:", dr["tag_sproc"], ", oid:", dr["tag_oid"])
×
371
                                dr["event"] = "create"
×
372
                                detectedChanges = append(detectedChanges, dr)
×
373
                                changeCounts.Created++
×
374
                        }
×
375
                        hostState["sproc_hashes"][objIdent] = dr["md5"].(string)
×
376
                }
377
        }
378
        // detect deletes
379
        if !firstRun && len(hostState["sproc_hashes"]) != len(data) {
×
380
                deletedSProcs := make([]string, 0)
×
381
                // turn resultset to map => [oid]=true for faster checks
×
382
                currentOidMap := make(map[string]bool)
×
383
                for _, dr := range data {
×
384
                        currentOidMap[dr["tag_sproc"].(string)+dbMetricJoinStr+dr["tag_oid"].(string)] = true
×
385
                }
×
386
                for sprocIdent := range hostState["sproc_hashes"] {
×
387
                        _, ok := currentOidMap[sprocIdent]
×
388
                        if !ok {
×
389
                                splits := strings.Split(sprocIdent, dbMetricJoinStr)
×
390
                                log.GetLogger(ctx).Info("detected delete of sproc:", splits[0], ", oid:", splits[1])
×
391
                                influxEntry := make(metrics.Measurement)
×
392
                                influxEntry["event"] = "drop"
×
393
                                influxEntry["tag_sproc"] = splits[0]
×
394
                                influxEntry["tag_oid"] = splits[1]
×
395
                                if len(data) > 0 {
×
396
                                        influxEntry["epoch_ns"] = data[0]["epoch_ns"]
×
397
                                } else {
×
398
                                        influxEntry["epoch_ns"] = time.Now().UnixNano()
×
399
                                }
×
400
                                detectedChanges = append(detectedChanges, influxEntry)
×
401
                                deletedSProcs = append(deletedSProcs, sprocIdent)
×
402
                                changeCounts.Dropped++
×
403
                        }
404
                }
405
                for _, deletedSProc := range deletedSProcs {
×
406
                        delete(hostState["sproc_hashes"], deletedSProc)
×
407
                }
×
408
        }
409
        log.GetLogger(ctx).Debugf("[%s][%s] detected %d sproc changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
×
410
        if len(detectedChanges) > 0 {
×
411
                md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
×
412
                storageCh <- []metrics.MeasurementMessage{{DBName: dbUnique, MetricName: "sproc_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
×
413
        }
×
414

415
        return changeCounts
×
416
}
417

418
func DetectTableChanges(ctx context.Context, dbUnique string, vme DBVersionMapEntry, storageCh chan<- []metrics.MeasurementMessage, hostState map[string]map[string]string) ChangeDetectionResults {
×
419
        detectedChanges := make(metrics.Measurements, 0)
×
420
        var firstRun bool
×
421
        var changeCounts ChangeDetectionResults
×
422

×
423
        log.GetLogger(ctx).Debugf("[%s][%s] checking for table changes...", dbUnique, specialMetricChangeEvents)
×
424
        if _, ok := hostState["table_hashes"]; !ok {
×
425
                firstRun = true
×
426
                hostState["table_hashes"] = make(map[string]string)
×
427
        }
×
428

429
        mvp, err := GetMetricVersionProperties("table_hashes", vme, nil)
×
430
        if err != nil {
×
431
                log.GetLogger(ctx).Error("could not get table_hashes sql:", err)
×
432
                return changeCounts
×
433
        }
×
434

435
        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
×
436
        if err != nil {
×
437
                log.GetLogger(ctx).Error("could not read table_hashes from monitored host:", dbUnique, ", err:", err)
×
438
                return changeCounts
×
439
        }
×
440

441
        for _, dr := range data {
×
442
                objIdent := dr["tag_table"].(string)
×
443
                prevHash, ok := hostState["table_hashes"][objIdent]
×
444
                //log.Debug("inspecting table:", objIdent, "hash:", prev_hash)
×
445
                if ok { // we have existing state
×
446
                        if prevHash != dr["md5"].(string) {
×
447
                                log.GetLogger(ctx).Info("detected DDL change in table:", dr["tag_table"])
×
448
                                dr["event"] = "alter"
×
449
                                detectedChanges = append(detectedChanges, dr)
×
450
                                hostState["table_hashes"][objIdent] = dr["md5"].(string)
×
451
                                changeCounts.Altered++
×
452
                        }
×
453
                } else { // check for new / delete
×
454
                        if !firstRun {
×
455
                                log.GetLogger(ctx).Info("detected new table:", dr["tag_table"])
×
456
                                dr["event"] = "create"
×
457
                                detectedChanges = append(detectedChanges, dr)
×
458
                                changeCounts.Created++
×
459
                        }
×
460
                        hostState["table_hashes"][objIdent] = dr["md5"].(string)
×
461
                }
462
        }
463
        // detect deletes
464
        if !firstRun && len(hostState["table_hashes"]) != len(data) {
×
465
                deletedTables := make([]string, 0)
×
466
                // turn resultset to map => [table]=true for faster checks
×
467
                currentTableMap := make(map[string]bool)
×
468
                for _, dr := range data {
×
469
                        currentTableMap[dr["tag_table"].(string)] = true
×
470
                }
×
471
                for table := range hostState["table_hashes"] {
×
472
                        _, ok := currentTableMap[table]
×
473
                        if !ok {
×
474
                                log.GetLogger(ctx).Info("detected drop of table:", table)
×
475
                                influxEntry := make(metrics.Measurement)
×
476
                                influxEntry["event"] = "drop"
×
477
                                influxEntry["tag_table"] = table
×
478
                                if len(data) > 0 {
×
479
                                        influxEntry["epoch_ns"] = data[0]["epoch_ns"]
×
480
                                } else {
×
481
                                        influxEntry["epoch_ns"] = time.Now().UnixNano()
×
482
                                }
×
483
                                detectedChanges = append(detectedChanges, influxEntry)
×
484
                                deletedTables = append(deletedTables, table)
×
485
                                changeCounts.Dropped++
×
486
                        }
487
                }
488
                for _, deletedTable := range deletedTables {
×
489
                        delete(hostState["table_hashes"], deletedTable)
×
490
                }
×
491
        }
492

493
        log.GetLogger(ctx).Debugf("[%s][%s] detected %d table changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
×
494
        if len(detectedChanges) > 0 {
×
495
                md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
×
496
                storageCh <- []metrics.MeasurementMessage{{DBName: dbUnique, MetricName: "table_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
×
497
        }
×
498

499
        return changeCounts
×
500
}
501

502
func DetectIndexChanges(ctx context.Context, dbUnique string, vme DBVersionMapEntry, storageCh chan<- []metrics.MeasurementMessage, hostState map[string]map[string]string) ChangeDetectionResults {
×
503
        detectedChanges := make(metrics.Measurements, 0)
×
504
        var firstRun bool
×
505
        var changeCounts ChangeDetectionResults
×
506

×
507
        log.GetLogger(ctx).Debugf("[%s][%s] checking for index changes...", dbUnique, specialMetricChangeEvents)
×
508
        if _, ok := hostState["index_hashes"]; !ok {
×
509
                firstRun = true
×
510
                hostState["index_hashes"] = make(map[string]string)
×
511
        }
×
512

513
        mvp, err := GetMetricVersionProperties("index_hashes", vme, nil)
×
514
        if err != nil {
×
515
                log.GetLogger(ctx).Error("could not get index_hashes sql:", err)
×
516
                return changeCounts
×
517
        }
×
518

519
        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
×
520
        if err != nil {
×
521
                log.GetLogger(ctx).Error("could not read index_hashes from monitored host:", dbUnique, ", err:", err)
×
522
                return changeCounts
×
523
        }
×
524

525
        for _, dr := range data {
×
526
                objIdent := dr["tag_index"].(string)
×
527
                prevHash, ok := hostState["index_hashes"][objIdent]
×
528
                if ok { // we have existing state
×
529
                        if prevHash != (dr["md5"].(string) + dr["is_valid"].(string)) {
×
530
                                log.GetLogger(ctx).Info("detected index change:", dr["tag_index"], ", table:", dr["table"])
×
531
                                dr["event"] = "alter"
×
532
                                detectedChanges = append(detectedChanges, dr)
×
533
                                hostState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
×
534
                                changeCounts.Altered++
×
535
                        }
×
536
                } else { // check for new / delete
×
537
                        if !firstRun {
×
538
                                log.GetLogger(ctx).Info("detected new index:", dr["tag_index"])
×
539
                                dr["event"] = "create"
×
540
                                detectedChanges = append(detectedChanges, dr)
×
541
                                changeCounts.Created++
×
542
                        }
×
543
                        hostState["index_hashes"][objIdent] = dr["md5"].(string) + dr["is_valid"].(string)
×
544
                }
545
        }
546
        // detect deletes
547
        if !firstRun && len(hostState["index_hashes"]) != len(data) {
×
548
                deletedIndexes := make([]string, 0)
×
549
                // turn resultset to map => [table]=true for faster checks
×
550
                currentIndexMap := make(map[string]bool)
×
551
                for _, dr := range data {
×
552
                        currentIndexMap[dr["tag_index"].(string)] = true
×
553
                }
×
554
                for indexName := range hostState["index_hashes"] {
×
555
                        _, ok := currentIndexMap[indexName]
×
556
                        if !ok {
×
557
                                log.GetLogger(ctx).Info("detected drop of index_name:", indexName)
×
558
                                influxEntry := make(metrics.Measurement)
×
559
                                influxEntry["event"] = "drop"
×
560
                                influxEntry["tag_index"] = indexName
×
561
                                if len(data) > 0 {
×
562
                                        influxEntry["epoch_ns"] = data[0]["epoch_ns"]
×
563
                                } else {
×
564
                                        influxEntry["epoch_ns"] = time.Now().UnixNano()
×
565
                                }
×
566
                                detectedChanges = append(detectedChanges, influxEntry)
×
567
                                deletedIndexes = append(deletedIndexes, indexName)
×
568
                                changeCounts.Dropped++
×
569
                        }
570
                }
571
                for _, deletedIndex := range deletedIndexes {
×
572
                        delete(hostState["index_hashes"], deletedIndex)
×
573
                }
×
574
        }
575
        log.GetLogger(ctx).Debugf("[%s][%s] detected %d index changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
×
576
        if len(detectedChanges) > 0 {
×
577
                md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
×
578
                storageCh <- []metrics.MeasurementMessage{{DBName: dbUnique, MetricName: "index_changes", Data: detectedChanges, CustomTags: md.CustomTags}}
×
579
        }
×
580

581
        return changeCounts
×
582
}
583

584
func DetectPrivilegeChanges(ctx context.Context, dbUnique string, vme DBVersionMapEntry, storageCh chan<- []metrics.MeasurementMessage, hostState map[string]map[string]string) ChangeDetectionResults {
×
585
        detectedChanges := make(metrics.Measurements, 0)
×
586
        var firstRun bool
×
587
        var changeCounts ChangeDetectionResults
×
588

×
589
        log.GetLogger(ctx).Debugf("[%s][%s] checking object privilege changes...", dbUnique, specialMetricChangeEvents)
×
590
        if _, ok := hostState["object_privileges"]; !ok {
×
591
                firstRun = true
×
592
                hostState["object_privileges"] = make(map[string]string)
×
593
        }
×
594

595
        mvp, err := GetMetricVersionProperties("privilege_changes", vme, nil)
×
596
        if err != nil || mvp.GetSQL(int(vme.Version)) == "" {
×
597
                log.GetLogger(ctx).Warningf("[%s][%s] could not get SQL for 'privilege_changes'. cannot detect privilege changes", dbUnique, specialMetricChangeEvents)
×
598
                return changeCounts
×
599
        }
×
600

601
        // returns rows of: object_type, tag_role, tag_object, privilege_type
602
        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
×
603
        if err != nil {
×
604
                log.GetLogger(ctx).Errorf("[%s][%s] failed to fetch object privileges info: %v", dbUnique, specialMetricChangeEvents, err)
×
605
                return changeCounts
×
606
        }
×
607

608
        currentState := make(map[string]bool)
×
609
        for _, dr := range data {
×
610
                objIdent := fmt.Sprintf("%s#:#%s#:#%s#:#%s", dr["object_type"], dr["tag_role"], dr["tag_object"], dr["privilege_type"])
×
611
                if firstRun {
×
612
                        hostState["object_privileges"][objIdent] = ""
×
613
                } else {
×
614
                        _, ok := hostState["object_privileges"][objIdent]
×
615
                        if !ok {
×
616
                                log.GetLogger(ctx).Infof("[%s][%s] detected new object privileges: role=%s, object_type=%s, object=%s, privilege_type=%s",
×
617
                                        dbUnique, specialMetricChangeEvents, dr["tag_role"], dr["object_type"], dr["tag_object"], dr["privilege_type"])
×
618
                                dr["event"] = "GRANT"
×
619
                                detectedChanges = append(detectedChanges, dr)
×
620
                                changeCounts.Created++
×
621
                                hostState["object_privileges"][objIdent] = ""
×
622
                        }
×
623
                        currentState[objIdent] = true
×
624
                }
625
        }
626
        // check revokes - exists in old state only
627
        if !firstRun && len(currentState) > 0 {
×
628
                for objPrevRun := range hostState["object_privileges"] {
×
629
                        if _, ok := currentState[objPrevRun]; !ok {
×
630
                                splits := strings.Split(objPrevRun, "#:#")
×
631
                                log.GetLogger(ctx).Infof("[%s][%s] detected removed object privileges: role=%s, object_type=%s, object=%s, privilege_type=%s",
×
632
                                        dbUnique, specialMetricChangeEvents, splits[1], splits[0], splits[2], splits[3])
×
633
                                revokeEntry := make(metrics.Measurement)
×
634
                                if epochNs, ok := data[0]["epoch_ns"]; ok {
×
635
                                        revokeEntry["epoch_ns"] = epochNs
×
636
                                } else {
×
637
                                        revokeEntry["epoch_ns"] = time.Now().UnixNano()
×
638
                                }
×
639
                                revokeEntry["object_type"] = splits[0]
×
640
                                revokeEntry["tag_role"] = splits[1]
×
641
                                revokeEntry["tag_object"] = splits[2]
×
642
                                revokeEntry["privilege_type"] = splits[3]
×
643
                                revokeEntry["event"] = "REVOKE"
×
644
                                detectedChanges = append(detectedChanges, revokeEntry)
×
645
                                changeCounts.Dropped++
×
646
                                delete(hostState["object_privileges"], objPrevRun)
×
647
                        }
648
                }
649
        }
650

651
        log.GetLogger(ctx).Debugf("[%s][%s] detected %d object privilege changes...", dbUnique, specialMetricChangeEvents, len(detectedChanges))
×
652
        if len(detectedChanges) > 0 {
×
653
                md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
×
654
                storageCh <- []metrics.MeasurementMessage{
×
655
                        {
×
656
                                DBName:     dbUnique,
×
657
                                MetricName: "privilege_changes",
×
658
                                Data:       detectedChanges,
×
659
                                CustomTags: md.CustomTags,
×
660
                        }}
×
661
        }
×
662

663
        return changeCounts
×
664
}
665

666
func DetectConfigurationChanges(ctx context.Context, dbUnique string, vme DBVersionMapEntry, storageCh chan<- []metrics.MeasurementMessage, hostState map[string]map[string]string) ChangeDetectionResults {
×
667
        detectedChanges := make(metrics.Measurements, 0)
×
668
        var firstRun bool
×
669
        var changeCounts ChangeDetectionResults
×
670

×
671
        log.GetLogger(ctx).Debugf("[%s][%s] checking for configuration changes...", dbUnique, specialMetricChangeEvents)
×
672
        if _, ok := hostState["configuration_hashes"]; !ok {
×
673
                firstRun = true
×
674
                hostState["configuration_hashes"] = make(map[string]string)
×
675
        }
×
676

677
        mvp, err := GetMetricVersionProperties("configuration_hashes", vme, nil)
×
678
        if err != nil {
×
679
                log.GetLogger(ctx).Errorf("[%s][%s] could not get configuration_hashes sql: %v", dbUnique, specialMetricChangeEvents, err)
×
680
                return changeCounts
×
681
        }
×
682

683
        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, mvp.GetSQL(int(vme.Version)))
×
684
        if err != nil {
×
685
                log.GetLogger(ctx).Errorf("[%s][%s] could not read configuration_hashes from monitored host: %v", dbUnique, specialMetricChangeEvents, err)
×
686
                return changeCounts
×
687
        }
×
688

689
        for _, dr := range data {
×
690
                objIdent := dr["tag_setting"].(string)
×
691
                objValue := dr["value"].(string)
×
692
                prevРash, ok := hostState["configuration_hashes"][objIdent]
×
693
                if ok { // we have existing state
×
694
                        if prevРash != objValue {
×
695
                                if objIdent == "connection_ID" {
×
696
                                        continue // ignore some weird Azure managed PG service setting
×
697
                                }
698
                                log.GetLogger(ctx).Warningf("[%s][%s] detected settings change: %s = %s (prev: %s)",
×
699
                                        dbUnique, specialMetricChangeEvents, objIdent, objValue, prevРash)
×
700
                                dr["event"] = "alter"
×
701
                                detectedChanges = append(detectedChanges, dr)
×
702
                                hostState["configuration_hashes"][objIdent] = objValue
×
703
                                changeCounts.Altered++
×
704
                        }
705
                } else { // check for new, delete not relevant here (pg_upgrade)
×
706
                        if !firstRun {
×
707
                                log.GetLogger(ctx).Warningf("[%s][%s] detected new setting: %s", dbUnique, specialMetricChangeEvents, objIdent)
×
708
                                dr["event"] = "create"
×
709
                                detectedChanges = append(detectedChanges, dr)
×
710
                                changeCounts.Created++
×
711
                        }
×
712
                        hostState["configuration_hashes"][objIdent] = objValue
×
713
                }
714
        }
715

716
        log.GetLogger(ctx).Debugf("[%s][%s] detected %d configuration changes", dbUnique, specialMetricChangeEvents, len(detectedChanges))
×
717
        if len(detectedChanges) > 0 {
×
718
                md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
×
719
                storageCh <- []metrics.MeasurementMessage{{
×
720
                        DBName:     dbUnique,
×
721
                        MetricName: "configuration_changes",
×
722
                        Data:       detectedChanges,
×
723
                        CustomTags: md.CustomTags,
×
724
                }}
×
725
        }
×
726

727
        return changeCounts
×
728
}
729

730
func CheckForPGObjectChangesAndStore(ctx context.Context, dbUnique string, vme DBVersionMapEntry, storageCh chan<- []metrics.MeasurementMessage, hostState map[string]map[string]string) {
×
731
        sprocCounts := DetectSprocChanges(ctx, dbUnique, vme, storageCh, hostState) // TODO some of Detect*() code could be unified...
×
732
        tableCounts := DetectTableChanges(ctx, dbUnique, vme, storageCh, hostState)
×
733
        indexCounts := DetectIndexChanges(ctx, dbUnique, vme, storageCh, hostState)
×
734
        confCounts := DetectConfigurationChanges(ctx, dbUnique, vme, storageCh, hostState)
×
735
        privChangeCounts := DetectPrivilegeChanges(ctx, dbUnique, vme, storageCh, hostState)
×
736

×
737
        // need to send info on all object changes as one message as Grafana applies "last wins" for annotations with similar timestamp
×
738
        message := ""
×
739
        if sprocCounts.Altered > 0 || sprocCounts.Created > 0 || sprocCounts.Dropped > 0 {
×
740
                message += fmt.Sprintf(" sprocs %d/%d/%d", sprocCounts.Created, sprocCounts.Altered, sprocCounts.Dropped)
×
741
        }
×
742
        if tableCounts.Altered > 0 || tableCounts.Created > 0 || tableCounts.Dropped > 0 {
×
743
                message += fmt.Sprintf(" tables/views %d/%d/%d", tableCounts.Created, tableCounts.Altered, tableCounts.Dropped)
×
744
        }
×
745
        if indexCounts.Altered > 0 || indexCounts.Created > 0 || indexCounts.Dropped > 0 {
×
746
                message += fmt.Sprintf(" indexes %d/%d/%d", indexCounts.Created, indexCounts.Altered, indexCounts.Dropped)
×
747
        }
×
748
        if confCounts.Altered > 0 || confCounts.Created > 0 {
×
749
                message += fmt.Sprintf(" configuration %d/%d/%d", confCounts.Created, confCounts.Altered, confCounts.Dropped)
×
750
        }
×
751
        if privChangeCounts.Dropped > 0 || privChangeCounts.Created > 0 {
×
752
                message += fmt.Sprintf(" privileges %d/%d/%d", privChangeCounts.Created, privChangeCounts.Altered, privChangeCounts.Dropped)
×
753
        }
×
754

755
        if message > "" {
×
756
                message = "Detected changes for \"" + dbUnique + "\" [Created/Altered/Dropped]:" + message
×
757
                log.GetLogger(ctx).Info(message)
×
758
                detectedChangesSummary := make(metrics.Measurements, 0)
×
759
                influxEntry := make(metrics.Measurement)
×
760
                influxEntry["details"] = message
×
761
                influxEntry["epoch_ns"] = time.Now().UnixNano()
×
762
                detectedChangesSummary = append(detectedChangesSummary, influxEntry)
×
763
                md, _ := GetMonitoredDatabaseByUniqueName(dbUnique)
×
764
                storageCh <- []metrics.MeasurementMessage{{DBName: dbUnique,
×
765
                        SourceType: string(md.Kind),
×
766
                        MetricName: "object_changes",
×
767
                        Data:       detectedChangesSummary,
×
768
                        CustomTags: md.CustomTags,
×
769
                }}
×
770

×
771
        }
×
772
}
773

774
// some extra work needed as pgpool SHOW commands don't specify the return data types for some reason
775
func FetchMetricsPgpool(ctx context.Context, msg MetricFetchMessage, vme DBVersionMapEntry, mvp metrics.Metric) (metrics.Measurements, error) {
×
776
        var retData = make(metrics.Measurements, 0)
×
777
        epochNs := time.Now().UnixNano()
×
778

×
779
        sqlLines := strings.Split(strings.ToUpper(mvp.GetSQL(int(vme.Version))), "\n")
×
780

×
781
        for _, sql := range sqlLines {
×
782
                if strings.HasPrefix(sql, "SHOW POOL_NODES") {
×
783
                        data, err := DBExecReadByDbUniqueName(ctx, msg.DBUniqueName, sql)
×
784
                        if err != nil {
×
785
                                log.GetLogger(ctx).Errorf("[%s][%s] Could not fetch PgPool statistics: %v", msg.DBUniqueName, msg.MetricName, err)
×
786
                                return data, err
×
787
                        }
×
788

789
                        for _, row := range data {
×
790
                                retRow := make(metrics.Measurement)
×
791
                                retRow[epochColumnName] = epochNs
×
792
                                for k, v := range row {
×
793
                                        vs := string(v.([]byte))
×
794
                                        // need 1 tag so that Influx would not merge rows
×
795
                                        if k == "node_id" {
×
796
                                                retRow["tag_node_id"] = vs
×
797
                                                continue
×
798
                                        }
799

800
                                        retRow[k] = vs
×
801
                                        if k == "status" { // was changed from numeric to string at some pgpool version so leave the string
×
802
                                                // but also add "status_num" field
×
803
                                                if vs == "up" {
×
804
                                                        retRow["status_num"] = 1
×
805
                                                } else if vs == "down" {
×
806
                                                        retRow["status_num"] = 0
×
807
                                                } else {
×
808
                                                        i, err := strconv.ParseInt(vs, 10, 64)
×
809
                                                        if err == nil {
×
810
                                                                retRow["status_num"] = i
×
811
                                                        }
×
812
                                                }
813
                                                continue
×
814
                                        }
815
                                        // everything is returned as text, so try to convert all numerics into ints / floats
816
                                        if k != "lb_weight" {
×
817
                                                i, err := strconv.ParseInt(vs, 10, 64)
×
818
                                                if err == nil {
×
819
                                                        retRow[k] = i
×
820
                                                        continue
×
821
                                                }
822
                                        }
823
                                        f, err := strconv.ParseFloat(vs, 64)
×
824
                                        if err == nil {
×
825
                                                retRow[k] = f
×
826
                                                continue
×
827
                                        }
828
                                }
829
                                retData = append(retData, retRow)
×
830
                        }
831
                } else if strings.HasPrefix(sql, "SHOW POOL_PROCESSES") {
×
832
                        if len(retData) == 0 {
×
833
                                log.GetLogger(ctx).Warningf("[%s][%s] SHOW POOL_NODES needs to be placed before SHOW POOL_PROCESSES. ignoring SHOW POOL_PROCESSES", msg.DBUniqueName, msg.MetricName)
×
834
                                continue
×
835
                        }
836

837
                        data, err := DBExecReadByDbUniqueName(ctx, msg.DBUniqueName, sql)
×
838
                        if err != nil {
×
839
                                log.GetLogger(ctx).Errorf("[%s][%s] Could not fetch PgPool statistics: %v", msg.DBUniqueName, msg.MetricName, err)
×
840
                                continue
×
841
                        }
842

843
                        // summarize processesTotal / processes_active over all rows
844
                        processesTotal := 0
×
845
                        processesActive := 0
×
846
                        for _, row := range data {
×
847
                                processesTotal++
×
848
                                v, ok := row["database"]
×
849
                                if !ok {
×
850
                                        log.GetLogger(ctx).Infof("[%s][%s] column 'database' not found from data returned by SHOW POOL_PROCESSES, check pool version / SQL definition", msg.DBUniqueName, msg.MetricName)
×
851
                                        continue
×
852
                                }
853
                                if len(v.([]byte)) > 0 {
×
854
                                        processesActive++
×
855
                                }
×
856
                        }
857

858
                        for _, retRow := range retData {
×
859
                                retRow["processes_total"] = processesTotal
×
860
                                retRow["processes_active"] = processesActive
×
861
                        }
×
862
                }
863
        }
864
        return retData, nil
×
865
}
866

867
func DoesFunctionExists(ctx context.Context, dbUnique, functionName string) bool {
×
868
        log.GetLogger(ctx).Debug("Checking for function existence", dbUnique, functionName)
×
869
        sql := fmt.Sprintf("select /* pgwatch3_generated */ 1 from pg_proc join pg_namespace n on pronamespace = n.oid where proname = '%s' and n.nspname = 'public'", functionName)
×
870
        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sql)
×
871
        if err != nil {
×
872
                log.GetLogger(ctx).Error("Failed to check for function existence", dbUnique, functionName, err)
×
873
                return false
×
874
        }
×
875
        if len(data) > 0 {
×
876
                log.GetLogger(ctx).Debugf("Function %s exists on %s", functionName, dbUnique)
×
877
                return true
×
878
        }
×
879
        return false
×
880
}
881

882
// Called once on daemon startup if some commonly wanted extension (most notably pg_stat_statements) is missing.
883
// With newer Postgres version can even succeed if the user is not a real superuser due to some cloud-specific
884
// whitelisting or "trusted extensions" (a feature from v13). Ignores errors.
885
func TryCreateMissingExtensions(ctx context.Context, dbUnique string, extensionNames []string, existingExtensions map[string]int) []string {
×
886
        sqlAvailable := `select name::text from pg_available_extensions`
×
887
        extsCreated := make([]string, 0)
×
888

×
889
        // For security reasons don't allow to execute random strings but check that it's an existing extension
×
890
        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlAvailable)
×
891
        if err != nil {
×
892
                log.GetLogger(ctx).Infof("[%s] Failed to get a list of available extensions: %v", dbUnique, err)
×
893
                return extsCreated
×
894
        }
×
895

896
        availableExts := make(map[string]bool)
×
897
        for _, row := range data {
×
898
                availableExts[row["name"].(string)] = true
×
899
        }
×
900

901
        for _, extToCreate := range extensionNames {
×
902
                if _, ok := existingExtensions[extToCreate]; ok {
×
903
                        continue
×
904
                }
905
                _, ok := availableExts[extToCreate]
×
906
                if !ok {
×
907
                        log.GetLogger(ctx).Errorf("[%s] Requested extension %s not available on instance, cannot try to create...", dbUnique, extToCreate)
×
908
                } else {
×
909
                        sqlCreateExt := `create extension ` + extToCreate
×
910
                        _, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlCreateExt)
×
911
                        if err != nil {
×
912
                                log.GetLogger(ctx).Errorf("[%s] Failed to create extension %s (based on --try-create-listed-exts-if-missing input): %v", dbUnique, extToCreate, err)
×
913
                        }
×
914
                        extsCreated = append(extsCreated, extToCreate)
×
915
                }
916
        }
917

918
        return extsCreated
×
919
}
920

921
// Called once on daemon startup to try to create "metric fething helper" functions automatically
922
func TryCreateMetricsFetchingHelpers(ctx context.Context, md *sources.MonitoredDatabase) (err error) {
×
923
        metricConfig := func() map[string]float64 {
×
924
                if len(md.Metrics) > 0 {
×
925
                        return md.Metrics
×
926
                }
×
927
                if md.PresetMetrics > "" {
×
928
                        return metricDefinitionMap.PresetDefs[md.PresetMetrics].Metrics
×
929
                }
×
930
                return nil
×
931
        }()
932
        conf, err := pgx.ParseConfig(md.ConnStr)
×
933
        if err != nil {
×
934
                return err
×
935
        }
×
936
        conf.DefaultQueryExecMode = pgx.QueryExecModeExec
×
937
        c, err := pgx.ConnectConfig(ctx, conf)
×
938
        if err != nil {
×
939
                return nil
×
940
        }
×
941
        defer c.Close(ctx)
×
942

×
943
        for metricName := range metricConfig {
×
944
                Metric := metricDefinitionMap.MetricDefs[metricName]
×
945
                if Metric.InitSQL == "" {
×
946
                        continue
×
947
                }
948

949
                _, err = c.Exec(ctx, Metric.InitSQL)
×
950
                if err != nil {
×
NEW
951
                        log.GetLogger(ctx).Warningf("Failed to create a metric fetching helper for %s in %s: %v", md.DBUniqueName, metricName, err)
×
952
                } else {
×
953
                        log.GetLogger(ctx).Info("Successfully created metric fetching helper for", md.DBUniqueName, metricName)
×
954
                }
×
955
        }
956
        return nil
×
957
}
958

959
// connects actually to the instance to determine PG relevant disk paths / mounts
960
func GetGoPsutilDiskPG(ctx context.Context, dbUnique string) (metrics.Measurements, error) {
×
961
        sql := `select current_setting('data_directory') as dd, current_setting('log_directory') as ld, current_setting('server_version_num')::int as pgver`
×
962
        sqlTS := `select spcname::text as name, pg_catalog.pg_tablespace_location(oid) as location from pg_catalog.pg_tablespace where not spcname like any(array[E'pg\\_%'])`
×
963
        data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sql)
×
964
        if err != nil || len(data) == 0 {
×
965
                log.GetLogger(ctx).Errorf("Failed to determine relevant PG disk paths via SQL: %v", err)
×
966
                return nil, err
×
967
        }
×
968
        dataTblsp, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlTS)
×
969
        if err != nil {
×
970
                log.GetLogger(ctx).Infof("Failed to determine relevant PG tablespace paths via SQL: %v", err)
×
971
        }
×
972
        return psutil.GetGoPsutilDiskPG(data, dataTblsp)
×
973
}
974

975
func CloseResourcesForRemovedMonitoredDBs(metricsWriter *sinks.MultiWriter, currentDBs, prevLoopDBs sources.MonitoredDatabases, shutDownDueToRoleChange map[string]bool) {
×
976
        var curDBsMap = make(map[string]bool)
×
977

×
978
        for _, curDB := range currentDBs {
×
979
                curDBsMap[curDB.DBUniqueName] = true
×
980
        }
×
981

982
        for _, prevDB := range prevLoopDBs {
×
983
                if _, ok := curDBsMap[prevDB.DBUniqueName]; !ok { // removed from config
×
984
                        prevDB.Conn.Close()
×
985
                        _ = metricsWriter.SyncMetrics(prevDB.DBUniqueName, "", "remove")
×
986
                }
×
987
        }
988

989
        // or to be ignored due to current instance state
990
        for roleChangedDB := range shutDownDueToRoleChange {
×
991
                if db := currentDBs.GetMonitoredDatabase(roleChangedDB); db != nil {
×
992
                        db.Conn.Close()
×
993
                }
×
994
                _ = metricsWriter.SyncMetrics(roleChangedDB, "", "remove")
×
995
        }
996
}
997

998
func SetDBUnreachableState(dbUnique string) {
×
999
        unreachableDBsLock.Lock()
×
1000
        unreachableDB[dbUnique] = time.Now()
×
1001
        unreachableDBsLock.Unlock()
×
1002
}
×
1003

1004
func ClearDBUnreachableStateIfAny(dbUnique string) {
×
1005
        unreachableDBsLock.Lock()
×
1006
        delete(unreachableDB, dbUnique)
×
1007
        unreachableDBsLock.Unlock()
×
1008
}
×
1009

1010
func SetUndersizedDBState(dbUnique string, state bool) {
×
1011
        undersizedDBsLock.Lock()
×
1012
        undersizedDBs[dbUnique] = state
×
1013
        undersizedDBsLock.Unlock()
×
1014
}
×
1015

1016
func IsDBUndersized(dbUnique string) bool {
×
1017
        undersizedDBsLock.RLock()
×
1018
        defer undersizedDBsLock.RUnlock()
×
1019
        undersized, ok := undersizedDBs[dbUnique]
×
1020
        if ok {
×
1021
                return undersized
×
1022
        }
×
1023
        return false
×
1024
}
1025

1026
func SetRecoveryIgnoredDBState(dbUnique string, state bool) {
×
1027
        recoveryIgnoredDBsLock.Lock()
×
1028
        recoveryIgnoredDBs[dbUnique] = state
×
1029
        recoveryIgnoredDBsLock.Unlock()
×
1030
}
×
1031

1032
func IsDBIgnoredBasedOnRecoveryState(dbUnique string) bool {
×
1033
        recoveryIgnoredDBsLock.RLock()
×
1034
        defer recoveryIgnoredDBsLock.RUnlock()
×
1035
        recoveryIgnored, ok := recoveryIgnoredDBs[dbUnique]
×
1036
        if ok {
×
1037
                return recoveryIgnored
×
1038
        }
×
1039
        return false
×
1040
}
1041

1042
func IsDBDormant(dbUnique string) bool {
×
1043
        return IsDBUndersized(dbUnique) || IsDBIgnoredBasedOnRecoveryState(dbUnique)
×
1044
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc