• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cybertec-postgresql / pgwatch3 / 10433111267

17 Aug 2024 02:49PM UTC coverage: 28.106% (-0.2%) from 28.289%
10433111267

Pull #504

github

web-flow
Merge 24d0cebaa into 11ec15c1b
Pull Request #504: [!] add support for application commands

24 of 130 new or added lines in 8 files covered. (18.46%)

206 existing lines in 5 files now uncovered.

1285 of 4572 relevant lines covered (28.11%)

0.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

21.68
/src/sinks/postgres.go
1
package sinks
2

3
import (
4
        "context"
5
        _ "embed"
6
        "encoding/json"
7
        "errors"
8
        "fmt"
9
        "regexp"
10
        "strings"
11
        "time"
12

13
        "github.com/cybertec-postgresql/pgwatch3/db"
14
        "github.com/cybertec-postgresql/pgwatch3/log"
15
        "github.com/cybertec-postgresql/pgwatch3/metrics"
16
        "github.com/jackc/pgx/v5"
17
)
18

19
var (
20
        cacheLimit      = 512
21
        highLoadTimeout = time.Second * 5
22
        deleterDelay    = time.Hour
23
)
24

NEW
25
func NewPostgresWriter(ctx context.Context, connstr string, opts *SinkCmdOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error) {
×
26
        var conn db.PgxPoolIface
×
27
        if conn, err = db.New(ctx, connstr); err != nil {
×
28
                return
×
29
        }
×
30
        return NewWriterFromPostgresConn(ctx, conn, opts, metricDefs)
×
31
}
32

33
func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *SinkCmdOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error) {
1✔
34
        pgw = &PostgresWriter{
1✔
35
                сtx:        ctx,
1✔
36
                metricDefs: metricDefs,
1✔
37
                opts:       opts,
1✔
38
                input:      make(chan []metrics.MeasurementMessage, cacheLimit),
1✔
39
                lastError:  make(chan error),
1✔
40
                sinkDb:     conn,
1✔
41
        }
1✔
42
        if err = db.Init(ctx, pgw.sinkDb, func(ctx context.Context, conn db.PgxIface) error {
2✔
43
                log.GetLogger(ctx).Info("initialising the measurement database...")
1✔
44
                exists, err := db.DoesSchemaExist(ctx, conn, "admin")
1✔
45
                if err != nil || exists {
2✔
46
                        return err
1✔
47
                }
1✔
48
                for _, sql := range metricSchemaSQLs {
×
49
                        if _, err = conn.Exec(ctx, sql); err != nil {
×
50
                                return err
×
51
                        }
×
52
                }
53
                return nil
×
54
        }); err != nil {
×
55
                return
×
56
        }
×
57
        if err = pgw.ReadMetricSchemaType(); err != nil {
1✔
58
                return
×
59
        }
×
60
        if err = pgw.EnsureBuiltinMetricDummies(); err != nil {
1✔
61
                return
×
62
        }
×
63
        go pgw.deleteOldPartitions(deleterDelay)
1✔
64
        go pgw.maintainUniqueSources()
1✔
65
        go pgw.poll()
1✔
66
        return
1✔
67
}
68

69
//go:embed sql/admin_schema.sql
70
var sqlMetricAdminSchema string
71

72
//go:embed sql/admin_functions.sql
73
var sqlMetricAdminFunctions string
74

75
//go:embed sql/ensure_partition_postgres.sql
76
var sqlMetricEnsurePartitionPostgres string
77

78
//go:embed sql/ensure_partition_timescale.sql
79
var sqlMetricEnsurePartitionTimescale string
80

81
//go:embed sql/change_chunk_interval.sql
82
var sqlMetricChangeChunkIntervalTimescale string
83

84
//go:embed sql/change_compression_interval.sql
85
var sqlMetricChangeCompressionIntervalTimescale string
86

87
var (
88
        metricSchemaSQLs = []string{
89
                sqlMetricAdminSchema,
90
                sqlMetricAdminFunctions,
91
                sqlMetricEnsurePartitionPostgres,
92
                sqlMetricEnsurePartitionTimescale,
93
                sqlMetricChangeChunkIntervalTimescale,
94
                sqlMetricChangeCompressionIntervalTimescale,
95
        }
96
)
97

98
// PostgresWriter is a sink that writes metric measurements to a Postgres database.
99
// At the moment, it supports both Postgres and TimescaleDB as a storage backend.
100
// However, one is able to use any Postgres-compatible database as a storage backend,
101
// e.g. PGEE, Citus, Greenplum, CockroachDB, etc.
102
type PostgresWriter struct {
103
        сtx          context.Context
104
        sinkDb       db.PgxPoolIface
105
        metricSchema DbStorageSchemaType
106
        metricDefs   *metrics.Metrics
107
        opts         *SinkCmdOpts
108
        input        chan []metrics.MeasurementMessage
109
        lastError    chan error
110
}
111

112
type ExistingPartitionInfo struct {
113
        StartTime time.Time
114
        EndTime   time.Time
115
}
116

117
type MeasurementMessagePostgres struct {
118
        Time    time.Time
119
        DBName  string
120
        Metric  string
121
        Data    map[string]any
122
        TagData map[string]any
123
}
124

125
type DbStorageSchemaType int
126

127
const (
128
        DbStorageSchemaPostgres DbStorageSchemaType = iota
129
        DbStorageSchemaTimescale
130
)
131

132
func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
1✔
133
        var isTs bool
1✔
134
        pgw.metricSchema = DbStorageSchemaPostgres
1✔
135
        sqlSchemaType := `SELECT schema_type = 'timescale' FROM admin.storage_schema_type`
1✔
136
        if err = pgw.sinkDb.QueryRow(pgw.сtx, sqlSchemaType).Scan(&isTs); err == nil && isTs {
2✔
137
                pgw.metricSchema = DbStorageSchemaTimescale
1✔
138
        }
1✔
139
        return
1✔
140
}
141

142
const (
143
        epochColumnName string = "epoch_ns" // this column (epoch in nanoseconds) is expected in every metric query
144
        tagPrefix       string = "tag_"
145
)
146

147
const specialMetricPgbouncer = "^pgbouncer_(stats|pools)$"
148

149
var (
150
        regexIsPgbouncerMetrics         = regexp.MustCompile(specialMetricPgbouncer)
151
        forceRecreatePGMetricPartitions = false                                             // to signal override PG metrics storage cache
152
        partitionMapMetric              = make(map[string]ExistingPartitionInfo)            // metric = min/max bounds
153
        partitionMapMetricDbname        = make(map[string]map[string]ExistingPartitionInfo) // metric[dbname = min/max bounds]
154
)
155

156
// SyncMetric ensures that tables exist for newly added metrics and/or sources
157
func (pgw *PostgresWriter) SyncMetric(dbUnique, metricName, op string) error {
1✔
158
        if op == "add" {
2✔
159
                return errors.Join(
1✔
160
                        pgw.AddDBUniqueMetricToListingTable(dbUnique, metricName),
1✔
161
                        pgw.EnsureMetricDummy(metricName), // ensure that there is at least an empty top-level table not to get ugly Grafana notifications
1✔
162
                )
1✔
163
        }
1✔
164
        return nil
1✔
165
}
166

167
// EnsureBuiltinMetricDummies creates empty tables for all built-in metrics if they don't exist
168
func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error) {
1✔
169
        for _, name := range metrics.GetDefaultBuiltInMetrics() {
2✔
170
                err = errors.Join(err, pgw.EnsureMetricDummy(name))
1✔
171
        }
1✔
172
        return
1✔
173
}
174

175
// EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist
176
func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error) {
1✔
177
        _, err = pgw.sinkDb.Exec(pgw.сtx, "select admin.ensure_dummy_metrics_table($1)", metric)
1✔
178
        return
1✔
179
}
1✔
180

181
// Write send the measurements to the cache channel
182
func (pgw *PostgresWriter) Write(msgs []metrics.MeasurementMessage) error {
1✔
183
        if pgw.сtx.Err() != nil {
2✔
184
                return pgw.сtx.Err()
1✔
185
        }
1✔
186
        select {
1✔
187
        case pgw.input <- msgs:
1✔
188
                // msgs sent
189
        case <-time.After(highLoadTimeout):
1✔
190
                // msgs dropped due to a huge load, check stdout or file for detailed log
191
        }
192
        select {
1✔
193
        case err := <-pgw.lastError:
×
194
                return err
×
195
        default:
1✔
196
                return nil
1✔
197
        }
198
}
199

200
// poll is the main loop that reads from the input channel and flushes the data to the database
201
func (pgw *PostgresWriter) poll() {
1✔
202
        cache := make([]metrics.MeasurementMessage, 0, cacheLimit)
1✔
203
        cacheTimeout := pgw.opts.BatchingDelay
1✔
204
        tick := time.NewTicker(cacheTimeout)
1✔
205
        for {
2✔
206
                select {
1✔
207
                case <-pgw.сtx.Done(): //check context with high priority
×
208
                        return
×
209
                default:
1✔
210
                        select {
1✔
211
                        case entry := <-pgw.input:
×
212
                                cache = append(cache, entry...)
×
213
                                if len(cache) < cacheLimit {
×
214
                                        break
×
215
                                }
216
                                tick.Stop()
×
217
                                pgw.flush(cache)
×
218
                                cache = cache[:0]
×
219
                                tick = time.NewTicker(cacheTimeout)
×
220
                        case <-tick.C:
×
221
                                pgw.flush(cache)
×
222
                                cache = cache[:0]
×
223
                        case <-pgw.сtx.Done():
×
224
                                return
×
225
                        }
226
                }
227
        }
228
}
229

230
// flush sends the cached measurements to the database
231
func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementMessage) {
×
232
        if len(msgs) == 0 {
×
233
                return
×
234
        }
×
235
        logger := log.GetLogger(pgw.сtx).
×
236
                WithField("sink", "postgres").
×
237
                WithField("db", pgw.sinkDb.Config().ConnConfig.Database)
×
238
        tsWarningPrinted := false
×
239
        metricsToStorePerMetric := make(map[string][]MeasurementMessagePostgres)
×
240
        rowsBatched := 0
×
241
        totalRows := 0
×
242
        pgPartBounds := make(map[string]ExistingPartitionInfo)                  // metric=min/max
×
243
        pgPartBoundsDbName := make(map[string]map[string]ExistingPartitionInfo) // metric=[dbname=min/max]
×
244
        var err error
×
245

×
246
        for _, msg := range msgs {
×
247
                if len(msg.Data) == 0 {
×
248
                        continue
×
249
                }
250
                logger.WithField("data", msg.Data).WithField("len", len(msg.Data)).Debug("Sending To Postgres")
×
251

×
252
                for _, dataRow := range msg.Data {
×
253
                        var epochTime time.Time
×
254
                        var epochNs int64
×
255

×
256
                        tags := make(map[string]any)
×
257
                        fields := make(map[string]any)
×
258

×
259
                        totalRows++
×
260

×
261
                        if msg.CustomTags != nil {
×
262
                                for k, v := range msg.CustomTags {
×
263
                                        tags[k] = fmt.Sprintf("%v", v)
×
264
                                }
×
265
                        }
266

267
                        for k, v := range dataRow {
×
268
                                if v == nil || v == "" {
×
269
                                        continue // not storing NULLs
×
270
                                }
271
                                if k == epochColumnName {
×
272
                                        epochNs = v.(int64)
×
273
                                } else if strings.HasPrefix(k, tagPrefix) {
×
274
                                        tag := k[4:]
×
275
                                        tags[tag] = fmt.Sprintf("%v", v)
×
276
                                } else {
×
277
                                        fields[k] = v
×
278
                                }
×
279
                        }
280

281
                        if epochNs == 0 {
×
282
                                if !tsWarningPrinted && !regexIsPgbouncerMetrics.MatchString(msg.MetricName) {
×
283
                                        logger.Warning("No timestamp_ns found, server time will be used. measurement:", msg.MetricName)
×
284
                                        tsWarningPrinted = true
×
285
                                }
×
286
                                epochTime = time.Now()
×
287
                        } else {
×
288
                                epochTime = time.Unix(0, epochNs)
×
289
                        }
×
290

291
                        var metricsArr []MeasurementMessagePostgres
×
292
                        var ok bool
×
293

×
294
                        metricNameTemp := msg.MetricName
×
295

×
296
                        metricsArr, ok = metricsToStorePerMetric[metricNameTemp]
×
297
                        if !ok {
×
298
                                metricsToStorePerMetric[metricNameTemp] = make([]MeasurementMessagePostgres, 0)
×
299
                        }
×
300
                        metricsArr = append(metricsArr, MeasurementMessagePostgres{Time: epochTime, DBName: msg.DBName,
×
301
                                Metric: msg.MetricName, Data: fields, TagData: tags})
×
302
                        metricsToStorePerMetric[metricNameTemp] = metricsArr
×
303

×
304
                        rowsBatched++
×
305

×
306
                        if pgw.metricSchema == DbStorageSchemaTimescale {
×
307
                                // set min/max timestamps to check/create partitions
×
308
                                bounds, ok := pgPartBounds[msg.MetricName]
×
309
                                if !ok || (ok && epochTime.Before(bounds.StartTime)) {
×
310
                                        bounds.StartTime = epochTime
×
311
                                        pgPartBounds[msg.MetricName] = bounds
×
312
                                }
×
313
                                if !ok || (ok && epochTime.After(bounds.EndTime)) {
×
314
                                        bounds.EndTime = epochTime
×
315
                                        pgPartBounds[msg.MetricName] = bounds
×
316
                                }
×
317
                        } else if pgw.metricSchema == DbStorageSchemaPostgres {
×
318
                                _, ok := pgPartBoundsDbName[msg.MetricName]
×
319
                                if !ok {
×
320
                                        pgPartBoundsDbName[msg.MetricName] = make(map[string]ExistingPartitionInfo)
×
321
                                }
×
322
                                bounds, ok := pgPartBoundsDbName[msg.MetricName][msg.DBName]
×
323
                                if !ok || (ok && epochTime.Before(bounds.StartTime)) {
×
324
                                        bounds.StartTime = epochTime
×
325
                                        pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
×
326
                                }
×
327
                                if !ok || (ok && epochTime.After(bounds.EndTime)) {
×
328
                                        bounds.EndTime = epochTime
×
329
                                        pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
×
330
                                }
×
331
                        }
332
                }
333
        }
334

335
        if pgw.metricSchema == DbStorageSchemaPostgres {
×
336
                err = pgw.EnsureMetricDbnameTime(pgPartBoundsDbName, forceRecreatePGMetricPartitions)
×
337
        } else if pgw.metricSchema == DbStorageSchemaTimescale {
×
338
                err = pgw.EnsureMetricTimescale(pgPartBounds, forceRecreatePGMetricPartitions)
×
339
        } else {
×
340
                logger.Fatal("should never happen...")
×
341
        }
×
342
        if forceRecreatePGMetricPartitions {
×
343
                forceRecreatePGMetricPartitions = false
×
344
        }
×
345
        if err != nil {
×
346
                pgw.lastError <- err
×
347
        }
×
348

349
        // send data to PG, with a separate COPY for all metrics
350
        logger.Debugf("COPY-ing %d metrics to Postgres metricsDB...", rowsBatched)
×
351
        t1 := time.Now()
×
352

×
353
        for metricName, metrics := range metricsToStorePerMetric {
×
354

×
355
                getTargetTable := func() pgx.Identifier {
×
356
                        return pgx.Identifier{metricName}
×
357
                }
×
358

359
                getTargetColumns := func() []string {
×
360
                        return []string{"time", "dbname", "data", "tag_data"}
×
361
                }
×
362

363
                for _, m := range metrics {
×
364
                        l := logger.WithField("db", m.DBName).WithField("metric", m.Metric)
×
365
                        jsonBytes, err := json.Marshal(m.Data)
×
366
                        if err != nil {
×
367
                                logger.Errorf("Skipping 1 metric for [%s:%s] due to JSON conversion error: %s", m.DBName, m.Metric, err)
×
368
                                continue
×
369
                        }
370

371
                        getTagData := func() any {
×
372
                                if len(m.TagData) > 0 {
×
373
                                        jsonBytesTags, err := json.Marshal(m.TagData)
×
374
                                        if err != nil {
×
375
                                                l.Error(err)
×
376
                                                return nil
×
377
                                        }
×
378
                                        return string(jsonBytesTags)
×
379
                                }
380
                                return nil
×
381
                        }
382

383
                        rows := [][]any{{m.Time, m.DBName, string(jsonBytes), getTagData()}}
×
384

×
385
                        if _, err = pgw.sinkDb.CopyFrom(context.Background(), getTargetTable(), getTargetColumns(), pgx.CopyFromRows(rows)); err != nil {
×
386
                                l.Error(err)
×
387
                                forceRecreatePGMetricPartitions = strings.Contains(err.Error(), "no partition")
×
388
                                if forceRecreatePGMetricPartitions {
×
389
                                        logger.Warning("Some metric partitions might have been removed, halting all metric storage. Trying to re-create all needed partitions on next run")
×
390
                                }
×
391
                        }
392
                }
393
        }
394

395
        diff := time.Since(t1)
×
396
        if err == nil {
×
397
                logger.WithField("rows", rowsBatched).WithField("elapsed", diff).Info("measurements written")
×
398
                return
×
399
        }
×
400
        pgw.lastError <- err
×
401
}
402

403
// EnsureMetricTime creates special partitions if Timescale used for realtime metrics
404
func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error {
×
405
        logger := log.GetLogger(pgw.сtx)
×
406
        sqlEnsure := `select * from admin.ensure_partition_metric_time($1, $2)`
×
407
        for metric, pb := range pgPartBounds {
×
408
                if !strings.HasSuffix(metric, "_realtime") {
×
409
                        continue
×
410
                }
411
                if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
×
412
                        return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
×
413
                }
×
414

415
                partInfo, ok := partitionMapMetric[metric]
×
416
                if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
×
417
                        err := pgw.sinkDb.QueryRow(pgw.сtx, sqlEnsure, metric, pb.StartTime).Scan(&partInfo)
×
418
                        if err != nil {
×
419
                                logger.Error("Failed to create partition on 'metrics':", err)
×
420
                                return err
×
421
                        }
×
422
                        partitionMapMetric[metric] = partInfo
×
423
                }
424
                if pb.EndTime.After(partInfo.EndTime) || force {
×
425
                        err := pgw.sinkDb.QueryRow(pgw.сtx, sqlEnsure, metric, pb.EndTime).Scan(&partInfo.EndTime)
×
426
                        if err != nil {
×
427
                                logger.Error("Failed to create partition on 'metrics':", err)
×
428
                                return err
×
429
                        }
×
430
                        partitionMapMetric[metric] = partInfo
×
431
                }
432
        }
433
        return nil
×
434
}
435

436
func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error) {
×
437
        logger := log.GetLogger(pgw.сtx)
×
438
        sqlEnsure := `select * from admin.ensure_partition_timescale($1)`
×
439
        for metric := range pgPartBounds {
×
440
                if strings.HasSuffix(metric, "_realtime") {
×
441
                        continue
×
442
                }
443
                if _, ok := partitionMapMetric[metric]; !ok {
×
444
                        if _, err = pgw.sinkDb.Exec(pgw.сtx, sqlEnsure, metric); err != nil {
×
445
                                logger.Errorf("Failed to create a TimescaleDB table for metric '%s': %v", metric, err)
×
446
                                return err
×
447
                        }
×
448
                        partitionMapMetric[metric] = ExistingPartitionInfo{}
×
449
                }
450
        }
451
        return pgw.EnsureMetricTime(pgPartBounds, force)
×
452
}
453

454
func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error) {
×
455
        var rows pgx.Rows
×
456
        sqlEnsure := `select * from admin.ensure_partition_metric_dbname_time($1, $2, $3)`
×
457
        for metric, dbnameTimestampMap := range metricDbnamePartBounds {
×
458
                _, ok := partitionMapMetricDbname[metric]
×
459
                if !ok {
×
460
                        partitionMapMetricDbname[metric] = make(map[string]ExistingPartitionInfo)
×
461
                }
×
462

463
                for dbname, pb := range dbnameTimestampMap {
×
464
                        if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
×
465
                                return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
×
466
                        }
×
467
                        partInfo, ok := partitionMapMetricDbname[metric][dbname]
×
468
                        if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
×
469
                                if rows, err = pgw.sinkDb.Query(pgw.сtx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
×
470
                                        return
×
471
                                }
×
472
                                if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
×
473
                                        return err
×
474
                                }
×
475
                                partitionMapMetricDbname[metric][dbname] = partInfo
×
476
                        }
477
                        if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || force {
×
478
                                if rows, err = pgw.sinkDb.Query(pgw.сtx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
×
479
                                        return
×
480
                                }
×
481
                                if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
×
482
                                        return err
×
483
                                }
×
484
                                partitionMapMetricDbname[metric][dbname] = partInfo
×
485
                        }
486
                }
487
        }
488
        return nil
×
489
}
490

491
// deleteOldPartitions is a background task that deletes old partitions from the measurements DB
492
func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) {
1✔
493
        metricAgeDaysThreshold := pgw.opts.Retention
1✔
494
        if metricAgeDaysThreshold <= 0 {
1✔
495
                return
×
496
        }
×
497
        logger := log.GetLogger(pgw.сtx)
1✔
498
        select {
1✔
499
        case <-pgw.сtx.Done():
×
500
                return
×
501
        case <-time.After(delay):
×
502
                // to reduce distracting log messages at startup
503
        }
504

505
        for {
×
506
                if pgw.metricSchema == DbStorageSchemaTimescale {
×
507
                        partsDropped, err := pgw.DropOldTimePartitions(metricAgeDaysThreshold)
×
508
                        if err != nil {
×
509
                                logger.Errorf("Failed to drop old partitions (>%d days) from Postgres: %v", metricAgeDaysThreshold, err)
×
510
                                continue
×
511
                        }
512
                        logger.Infof("Dropped %d old metric partitions...", partsDropped)
×
513
                } else if pgw.metricSchema == DbStorageSchemaPostgres {
×
514
                        partsToDrop, err := pgw.GetOldTimePartitions(metricAgeDaysThreshold)
×
515
                        if err != nil {
×
516
                                logger.Errorf("Failed to get a listing of old (>%d days) time partitions from Postgres metrics DB - check that the admin.get_old_time_partitions() function is rolled out: %v", metricAgeDaysThreshold, err)
×
517
                                time.Sleep(time.Second * 300)
×
518
                                continue
×
519
                        }
520
                        if len(partsToDrop) > 0 {
×
521
                                logger.Infof("Dropping %d old metric partitions one by one...", len(partsToDrop))
×
522
                                for _, toDrop := range partsToDrop {
×
523
                                        sqlDropTable := `DROP TABLE IF EXISTS ` + toDrop
×
524
                                        logger.Debugf("Dropping old metric data partition: %s", toDrop)
×
525

×
526
                                        if _, err := pgw.sinkDb.Exec(pgw.сtx, sqlDropTable); err != nil {
×
527
                                                logger.Errorf("Failed to drop old partition %s from Postgres metrics DB: %w", toDrop, err)
×
528
                                                time.Sleep(time.Second * 300)
×
529
                                        } else {
×
530
                                                time.Sleep(time.Second * 5)
×
531
                                        }
×
532
                                }
533
                        } else {
×
534
                                logger.Infof("No old metric partitions found to drop...")
×
535
                        }
×
536
                }
537
                select {
×
538
                case <-pgw.сtx.Done():
×
539
                        return
×
540
                case <-time.After(time.Hour * 12):
×
541
                }
542
        }
543
}
544

545
// maintainUniqueSources is a background task that maintains a listing of unique sources for each metric.
546
// This is used to avoid listing the same source multiple times in Grafana dropdowns.
547
func (pgw *PostgresWriter) maintainUniqueSources() {
1✔
548
        logger := log.GetLogger(pgw.сtx)
1✔
549
        // due to metrics deletion the listing can go out of sync (a trigger not really wanted)
1✔
550
        sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint
1✔
551
        sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
1✔
552
        sqlDistinct := `
1✔
553
        WITH RECURSIVE t(dbname) AS (
1✔
554
                SELECT MIN(dbname) AS dbname FROM %s
1✔
555
                UNION
1✔
556
                SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t )
1✔
557
        SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
1✔
558
        sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
1✔
559
        sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
1✔
560
        sqlAdd := `
1✔
561
                INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x
1✔
562
                WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
1✔
563
                RETURNING *`
1✔
564

1✔
565
        for {
2✔
566
                select {
1✔
567
                case <-pgw.сtx.Done():
×
568
                        return
×
569
                case <-time.After(time.Hour * 24):
×
570
                }
571
                var lock bool
×
572
                logger.Infof("Trying to get metricsDb listing maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
×
573
                if err := pgw.sinkDb.QueryRow(pgw.сtx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
×
574
                        logger.Error("Getting metricsDb listing maintainer advisory lock failed:", err)
×
575
                        continue
×
576
                }
577
                if !lock {
×
578
                        logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
×
579
                        continue
×
580
                }
581

582
                logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
×
583
                rows, _ := pgw.sinkDb.Query(pgw.сtx, sqlTopLevelMetrics)
×
584
                allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
×
585
                if err != nil {
×
586
                        logger.Error(err)
×
587
                        continue
×
588
                }
589

590
                for _, tableName := range allDistinctMetricTables {
×
591
                        foundDbnamesMap := make(map[string]bool)
×
592
                        foundDbnamesArr := make([]string, 0)
×
593
                        metricName := strings.Replace(tableName, "public.", "", 1)
×
594

×
595
                        logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
×
596
                        rows, _ := pgw.sinkDb.Query(pgw.сtx, fmt.Sprintf(sqlDistinct, tableName, tableName))
×
597
                        ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
×
598
                        // ret, err := DBExecRead(mainContext, metricDb, fmt.Sprintf(sqlDistinct, tableName, tableName))
×
599
                        if err != nil {
×
600
                                logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for '%s': %s", metricName, err)
×
601
                                break
×
602
                        }
603
                        for _, drDbname := range ret {
×
604
                                foundDbnamesMap[drDbname] = true // "set" behaviour, don't want duplicates
×
605
                        }
×
606

607
                        // delete all that are not known and add all that are not there
608
                        for k := range foundDbnamesMap {
×
609
                                foundDbnamesArr = append(foundDbnamesArr, k)
×
610
                        }
×
611
                        if len(foundDbnamesArr) == 0 { // delete all entries for given metric
×
612
                                logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)
×
613

×
614
                                _, err = pgw.sinkDb.Exec(pgw.сtx, sqlDeleteAll, metricName)
×
615
                                if err != nil {
×
616
                                        logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
×
617
                                }
×
618
                                continue
×
619
                        }
620
                        cmdTag, err := pgw.sinkDb.Exec(pgw.сtx, sqlDelete, foundDbnamesArr, metricName)
×
621
                        if err != nil {
×
622
                                logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
×
623
                        } else if cmdTag.RowsAffected() > 0 {
×
624
                                logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
×
625
                        }
×
626
                        cmdTag, err = pgw.sinkDb.Exec(pgw.сtx, sqlAdd, foundDbnamesArr, metricName)
×
627
                        if err != nil {
×
628
                                logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
×
629
                        } else if cmdTag.RowsAffected() > 0 {
×
630
                                logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
×
631
                        }
×
632
                        time.Sleep(time.Minute)
×
633
                }
634

635
        }
636
}
637

638
func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error) {
×
639
        sqlOldPart := `select admin.drop_old_time_partitions($1, $2)`
×
640
        err = pgw.sinkDb.QueryRow(pgw.сtx, sqlOldPart, metricAgeDaysThreshold, false).Scan(&res)
×
641
        return
×
642
}
×
643

644
func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error) {
×
645
        sqlGetOldParts := `select admin.get_old_time_partitions($1)`
×
646
        rows, err := pgw.sinkDb.Query(pgw.сtx, sqlGetOldParts, metricAgeDaysThreshold)
×
647
        if err == nil {
×
648
                return pgx.CollectRows(rows, pgx.RowTo[string])
×
649
        }
×
650
        return nil, err
×
651
}
652

653
func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error {
1✔
654
        sql := `insert into admin.all_distinct_dbname_metrics
1✔
655
                        select $1, $2
1✔
656
                        where not exists (
1✔
657
                                select * from admin.all_distinct_dbname_metrics where dbname = $1 and metric = $2
1✔
658
                        )`
1✔
659
        _, err := pgw.sinkDb.Exec(pgw.сtx, sql, dbUnique, metric)
1✔
660
        return err
1✔
661
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc