• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cybertec-postgresql / pgwatch3 / 9208786291

23 May 2024 01:19PM UTC coverage: 12.682% (+3.0%) from 9.667%
9208786291

push

github

web-flow
[+] add tests for `sinks` (#449)

* [+] add tests for `multiwriter`
* [*] prepare postgres writer for tests
* [+] add `TestNewWriterFromPostgresConn()`
* [+] add tests for `JSONWriter`
* [+] add `TestSyncMetric` and `TestWrite`
* [*] remove obsolete `EnsureMetric()`
* [*] prettify code

38 of 84 new or added lines in 5 files covered. (45.24%)

156 existing lines in 5 files now uncovered.

584 of 4605 relevant lines covered (12.68%)

0.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

21.68
/src/sinks/postgres.go
1
package sinks
2

3
import (
4
        "context"
5
        _ "embed"
6
        "encoding/json"
7
        "errors"
8
        "fmt"
9
        "regexp"
10
        "strings"
11
        "time"
12

13
        "github.com/cybertec-postgresql/pgwatch3/config"
14
        "github.com/cybertec-postgresql/pgwatch3/db"
15
        "github.com/cybertec-postgresql/pgwatch3/log"
16
        "github.com/cybertec-postgresql/pgwatch3/metrics"
17
        "github.com/jackc/pgx/v5"
18
)
19

20
var (
21
        cacheLimit      = 512
22
        highLoadTimeout = time.Second * 5
23
        deleterDelay    = time.Hour
24
)
25

NEW
26
func NewPostgresWriter(ctx context.Context, connstr string, opts *config.MeasurementOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error) {
×
NEW
27
        var conn db.PgxPoolIface
×
NEW
28
        if conn, err = db.New(ctx, connstr); err != nil {
×
NEW
29
                return
×
NEW
30
        }
×
NEW
31
        return NewWriterFromPostgresConn(ctx, conn, opts, metricDefs)
×
32
}
33

34
func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *config.MeasurementOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error) {
1✔
35
        pgw = &PostgresWriter{
1✔
36
                сtx:        ctx,
1✔
37
                metricDefs: metricDefs,
1✔
38
                opts:       opts,
1✔
39
                input:      make(chan []metrics.MeasurementMessage, cacheLimit),
1✔
40
                lastError:  make(chan error),
1✔
41
                sinkDb:     conn,
1✔
42
        }
1✔
43
        if err = db.Init(ctx, pgw.sinkDb, func(ctx context.Context, conn db.PgxIface) error {
2✔
44
                log.GetLogger(ctx).Info("initialising the measurement database...")
1✔
45
                exists, err := db.DoesSchemaExist(ctx, conn, "admin")
1✔
46
                if err != nil || exists {
2✔
47
                        return err
1✔
48
                }
1✔
49
                for _, sql := range metricSchemaSQLs {
×
50
                        if _, err = conn.Exec(ctx, sql); err != nil {
×
51
                                return err
×
52
                        }
×
53
                }
54
                return nil
×
55
        }); err != nil {
×
UNCOV
56
                return
×
UNCOV
57
        }
×
58
        if err = pgw.ReadMetricSchemaType(); err != nil {
1✔
UNCOV
59
                return
×
UNCOV
60
        }
×
61
        if err = pgw.EnsureBuiltinMetricDummies(); err != nil {
1✔
UNCOV
62
                return
×
UNCOV
63
        }
×
64
        go pgw.deleteOldPartitions(deleterDelay)
1✔
65
        go pgw.maintainUniqueSources()
1✔
66
        go pgw.poll()
1✔
67
        return
1✔
68
}
69

70
//go:embed sql/admin_schema.sql
71
var sqlMetricAdminSchema string
72

73
//go:embed sql/admin_functions.sql
74
var sqlMetricAdminFunctions string
75

76
//go:embed sql/ensure_partition_postgres.sql
77
var sqlMetricEnsurePartitionPostgres string
78

79
//go:embed sql/ensure_partition_timescale.sql
80
var sqlMetricEnsurePartitionTimescale string
81

82
//go:embed sql/change_chunk_interval.sql
83
var sqlMetricChangeChunkIntervalTimescale string
84

85
//go:embed sql/change_compression_interval.sql
86
var sqlMetricChangeCompressionIntervalTimescale string
87

88
var (
89
        metricSchemaSQLs = []string{
90
                sqlMetricAdminSchema,
91
                sqlMetricAdminFunctions,
92
                sqlMetricEnsurePartitionPostgres,
93
                sqlMetricEnsurePartitionTimescale,
94
                sqlMetricChangeChunkIntervalTimescale,
95
                sqlMetricChangeCompressionIntervalTimescale,
96
        }
97
)
98

99
type PostgresWriter struct {
100
        сtx          context.Context
101
        sinkDb       db.PgxPoolIface
102
        metricSchema DbStorageSchemaType
103
        metricDefs   *metrics.Metrics
104
        opts         *config.MeasurementOpts
105
        input        chan []metrics.MeasurementMessage
106
        lastError    chan error
107
}
108

109
type ExistingPartitionInfo struct {
110
        StartTime time.Time
111
        EndTime   time.Time
112
}
113

114
type MeasurementMessagePostgres struct {
115
        Time    time.Time
116
        DBName  string
117
        Metric  string
118
        Data    map[string]any
119
        TagData map[string]any
120
}
121

122
type DbStorageSchemaType int
123

124
const (
125
        DbStorageSchemaPostgres DbStorageSchemaType = iota
126
        DbStorageSchemaTimescale
127
)
128

129
func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
1✔
130
        var isTs bool
1✔
131
        pgw.metricSchema = DbStorageSchemaPostgres
1✔
132
        sqlSchemaType := `SELECT schema_type = 'timescale' FROM admin.storage_schema_type`
1✔
133
        if err = pgw.sinkDb.QueryRow(pgw.сtx, sqlSchemaType).Scan(&isTs); err == nil && isTs {
2✔
134
                pgw.metricSchema = DbStorageSchemaTimescale
1✔
135
        }
1✔
136
        return
1✔
137
}
138

139
const (
140
        epochColumnName string = "epoch_ns" // this column (epoch in nanoseconds) is expected in every metric query
141
        tagPrefix       string = "tag_"
142
)
143

144
const specialMetricPgbouncer = "^pgbouncer_(stats|pools)$"
145

146
var (
147
        regexIsPgbouncerMetrics         = regexp.MustCompile(specialMetricPgbouncer)
148
        forceRecreatePGMetricPartitions = false                                             // to signal override PG metrics storage cache
149
        partitionMapMetric              = make(map[string]ExistingPartitionInfo)            // metric = min/max bounds
150
        partitionMapMetricDbname        = make(map[string]map[string]ExistingPartitionInfo) // metric[dbname = min/max bounds]
151
)
152

153
// SyncMetric ensures that tables exist for newly added metrics and/or sources
154
func (pgw *PostgresWriter) SyncMetric(dbUnique, metricName, op string) error {
1✔
155
        if op == "add" {
2✔
156
                return errors.Join(
1✔
157
                        pgw.AddDBUniqueMetricToListingTable(dbUnique, metricName),
1✔
158
                        pgw.EnsureMetricDummy(metricName), // ensure that there is at least an empty top-level table not to get ugly Grafana notifications
1✔
159
                )
1✔
160
        }
1✔
161
        return nil
1✔
162
}
163

164
// EnsureBuiltinMetricDummies creates empty tables for all built-in metrics if they don't exist
165
func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error) {
1✔
166
        for _, name := range metrics.GetDefaultBuiltInMetrics() {
2✔
167
                err = errors.Join(err, pgw.EnsureMetricDummy(name))
1✔
168
        }
1✔
169
        return
1✔
170
}
171

172
// EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist
173
func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error) {
1✔
174
        _, err = pgw.sinkDb.Exec(pgw.сtx, "select admin.ensure_dummy_metrics_table($1)", metric)
1✔
175
        return
1✔
176
}
1✔
177

178
// Write send the measurements to the cache channel
179
func (pgw *PostgresWriter) Write(msgs []metrics.MeasurementMessage) error {
1✔
180
        if pgw.сtx.Err() != nil {
2✔
181
                return pgw.сtx.Err()
1✔
182
        }
1✔
183
        select {
1✔
184
        case pgw.input <- msgs:
1✔
185
                // msgs sent
186
        case <-time.After(highLoadTimeout):
1✔
187
                // msgs dropped due to a huge load, check stdout or file for detailed log
188
        }
189
        select {
1✔
190
        case err := <-pgw.lastError:
×
191
                return err
×
192
        default:
1✔
193
                return nil
1✔
194
        }
195
}
196

197
// poll is the main loop that reads from the input channel and flushes the data to the database
198
func (pgw *PostgresWriter) poll() {
1✔
199
        cache := make([]metrics.MeasurementMessage, 0, cacheLimit)
1✔
200
        cacheTimeout := pgw.opts.BatchingDelay
1✔
201
        tick := time.NewTicker(cacheTimeout)
1✔
202
        for {
2✔
203
                select {
1✔
NEW
204
                case <-pgw.сtx.Done(): //check context with high priority
×
205
                        return
×
206
                default:
1✔
207
                        select {
1✔
208
                        case entry := <-pgw.input:
×
209
                                cache = append(cache, entry...)
×
210
                                if len(cache) < cacheLimit {
×
211
                                        break
×
212
                                }
213
                                tick.Stop()
×
NEW
214
                                pgw.flush(cache)
×
215
                                cache = cache[:0]
×
216
                                tick = time.NewTicker(cacheTimeout)
×
217
                        case <-tick.C:
×
NEW
218
                                pgw.flush(cache)
×
UNCOV
219
                                cache = cache[:0]
×
NEW
220
                        case <-pgw.сtx.Done():
×
221
                                return
×
222
                        }
223
                }
224
        }
225
}
226

227
// flush sends the cached measurements to the database
NEW
228
func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementMessage) {
×
229
        if len(msgs) == 0 {
×
230
                return
×
231
        }
×
NEW
232
        logger := log.GetLogger(pgw.сtx).
×
233
                WithField("sink", "postgres").
×
NEW
234
                WithField("db", pgw.sinkDb.Config().ConnConfig.Database)
×
UNCOV
235
        tsWarningPrinted := false
×
236
        metricsToStorePerMetric := make(map[string][]MeasurementMessagePostgres)
×
237
        rowsBatched := 0
×
238
        totalRows := 0
×
239
        pgPartBounds := make(map[string]ExistingPartitionInfo)                  // metric=min/max
×
240
        pgPartBoundsDbName := make(map[string]map[string]ExistingPartitionInfo) // metric=[dbname=min/max]
×
241
        var err error
×
242

×
243
        for _, msg := range msgs {
×
244
                if msg.Data == nil || len(msg.Data) == 0 {
×
UNCOV
245
                        continue
×
246
                }
247
                logger.WithField("data", msg.Data).WithField("len", len(msg.Data)).Debug("Sending To Postgres")
×
248

×
249
                for _, dataRow := range msg.Data {
×
250
                        var epochTime time.Time
×
251
                        var epochNs int64
×
252

×
253
                        tags := make(map[string]any)
×
254
                        fields := make(map[string]any)
×
255

×
256
                        totalRows++
×
257

×
258
                        if msg.CustomTags != nil {
×
259
                                for k, v := range msg.CustomTags {
×
260
                                        tags[k] = fmt.Sprintf("%v", v)
×
261
                                }
×
262
                        }
263

264
                        for k, v := range dataRow {
×
265
                                if v == nil || v == "" {
×
266
                                        continue // not storing NULLs
×
267
                                }
268
                                if k == epochColumnName {
×
269
                                        epochNs = v.(int64)
×
270
                                } else if strings.HasPrefix(k, tagPrefix) {
×
271
                                        tag := k[4:]
×
272
                                        tags[tag] = fmt.Sprintf("%v", v)
×
273
                                } else {
×
274
                                        fields[k] = v
×
275
                                }
×
276
                        }
277

278
                        if epochNs == 0 {
×
279
                                if !tsWarningPrinted && !regexIsPgbouncerMetrics.MatchString(msg.MetricName) {
×
280
                                        logger.Warning("No timestamp_ns found, server time will be used. measurement:", msg.MetricName)
×
281
                                        tsWarningPrinted = true
×
282
                                }
×
283
                                epochTime = time.Now()
×
284
                        } else {
×
285
                                epochTime = time.Unix(0, epochNs)
×
UNCOV
286
                        }
×
287

UNCOV
288
                        var metricsArr []MeasurementMessagePostgres
×
UNCOV
289
                        var ok bool
×
290

×
291
                        metricNameTemp := msg.MetricName
×
292

×
293
                        metricsArr, ok = metricsToStorePerMetric[metricNameTemp]
×
294
                        if !ok {
×
295
                                metricsToStorePerMetric[metricNameTemp] = make([]MeasurementMessagePostgres, 0)
×
296
                        }
×
297
                        metricsArr = append(metricsArr, MeasurementMessagePostgres{Time: epochTime, DBName: msg.DBName,
×
298
                                Metric: msg.MetricName, Data: fields, TagData: tags})
×
299
                        metricsToStorePerMetric[metricNameTemp] = metricsArr
×
300

×
301
                        rowsBatched++
×
302

×
NEW
303
                        if pgw.metricSchema == DbStorageSchemaTimescale {
×
UNCOV
304
                                // set min/max timestamps to check/create partitions
×
UNCOV
305
                                bounds, ok := pgPartBounds[msg.MetricName]
×
306
                                if !ok || (ok && epochTime.Before(bounds.StartTime)) {
×
307
                                        bounds.StartTime = epochTime
×
308
                                        pgPartBounds[msg.MetricName] = bounds
×
309
                                }
×
310
                                if !ok || (ok && epochTime.After(bounds.EndTime)) {
×
311
                                        bounds.EndTime = epochTime
×
312
                                        pgPartBounds[msg.MetricName] = bounds
×
313
                                }
×
NEW
314
                        } else if pgw.metricSchema == DbStorageSchemaPostgres {
×
315
                                _, ok := pgPartBoundsDbName[msg.MetricName]
×
316
                                if !ok {
×
317
                                        pgPartBoundsDbName[msg.MetricName] = make(map[string]ExistingPartitionInfo)
×
UNCOV
318
                                }
×
319
                                bounds, ok := pgPartBoundsDbName[msg.MetricName][msg.DBName]
×
320
                                if !ok || (ok && epochTime.Before(bounds.StartTime)) {
×
321
                                        bounds.StartTime = epochTime
×
322
                                        pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
×
323
                                }
×
324
                                if !ok || (ok && epochTime.After(bounds.EndTime)) {
×
325
                                        bounds.EndTime = epochTime
×
UNCOV
326
                                        pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
×
UNCOV
327
                                }
×
328
                        }
329
                }
330
        }
331

NEW
332
        if pgw.metricSchema == DbStorageSchemaPostgres {
×
333
                err = pgw.EnsureMetricDbnameTime(pgPartBoundsDbName, forceRecreatePGMetricPartitions)
×
NEW
334
        } else if pgw.metricSchema == DbStorageSchemaTimescale {
×
335
                err = pgw.EnsureMetricTimescale(pgPartBounds, forceRecreatePGMetricPartitions)
×
336
        } else {
×
UNCOV
337
                logger.Fatal("should never happen...")
×
338
        }
×
UNCOV
339
        if forceRecreatePGMetricPartitions {
×
UNCOV
340
                forceRecreatePGMetricPartitions = false
×
341
        }
×
342
        if err != nil {
×
343
                pgw.lastError <- err
×
344
        }
×
345

346
        // send data to PG, with a separate COPY for all metrics
347
        logger.Debugf("COPY-ing %d metrics to Postgres metricsDB...", rowsBatched)
×
348
        t1 := time.Now()
×
UNCOV
349

×
UNCOV
350
        for metricName, metrics := range metricsToStorePerMetric {
×
UNCOV
351

×
UNCOV
352
                getTargetTable := func() pgx.Identifier {
×
353
                        return pgx.Identifier{metricName}
×
354
                }
×
355

356
                getTargetColumns := func() []string {
×
357
                        return []string{"time", "dbname", "data", "tag_data"}
×
358
                }
×
359

360
                for _, m := range metrics {
×
361
                        l := logger.WithField("db", m.DBName).WithField("metric", m.Metric)
×
UNCOV
362
                        jsonBytes, err := json.Marshal(m.Data)
×
UNCOV
363
                        if err != nil {
×
364
                                logger.Errorf("Skipping 1 metric for [%s:%s] due to JSON conversion error: %s", m.DBName, m.Metric, err)
×
365
                                continue
×
366
                        }
367

368
                        getTagData := func() any {
×
369
                                if len(m.TagData) > 0 {
×
370
                                        jsonBytesTags, err := json.Marshal(m.TagData)
×
371
                                        if err != nil {
×
372
                                                l.Error(err)
×
UNCOV
373
                                                return nil
×
UNCOV
374
                                        }
×
UNCOV
375
                                        return string(jsonBytesTags)
×
376
                                }
UNCOV
377
                                return nil
×
378
                        }
379

380
                        rows := [][]any{{m.Time, m.DBName, string(jsonBytes), getTagData()}}
×
381

×
NEW
382
                        if _, err = pgw.sinkDb.CopyFrom(context.Background(), getTargetTable(), getTargetColumns(), pgx.CopyFromRows(rows)); err != nil {
×
383
                                l.Error(err)
×
UNCOV
384
                                forceRecreatePGMetricPartitions = strings.Contains(err.Error(), "no partition")
×
385
                                if forceRecreatePGMetricPartitions {
×
386
                                        logger.Warning("Some metric partitions might have been removed, halting all metric storage. Trying to re-create all needed partitions on next run")
×
UNCOV
387
                                }
×
388
                        }
389
                }
390
        }
391

392
        diff := time.Since(t1)
×
393
        if err == nil {
×
394
                logger.WithField("rows", rowsBatched).WithField("elapsed", diff).Info("measurements written")
×
395
                return
×
UNCOV
396
        }
×
397
        pgw.lastError <- err
×
398
}
399

400
// EnsureMetricTime creates special partitions if Timescale used for realtime metrics
UNCOV
401
func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error {
×
NEW
402
        logger := log.GetLogger(pgw.сtx)
×
403
        sqlEnsure := `select * from admin.ensure_partition_metric_time($1, $2)`
×
UNCOV
404
        for metric, pb := range pgPartBounds {
×
UNCOV
405
                if !strings.HasSuffix(metric, "_realtime") {
×
406
                        continue
×
407
                }
UNCOV
408
                if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
×
409
                        return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
×
410
                }
×
411

412
                partInfo, ok := partitionMapMetric[metric]
×
413
                if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
×
NEW
414
                        err := pgw.sinkDb.QueryRow(pgw.сtx, sqlEnsure, metric, pb.StartTime).Scan(&partInfo)
×
415
                        if err != nil {
×
416
                                logger.Error("Failed to create partition on 'metrics':", err)
×
UNCOV
417
                                return err
×
418
                        }
×
419
                        partitionMapMetric[metric] = partInfo
×
420
                }
421
                if pb.EndTime.After(partInfo.EndTime) || force {
×
NEW
422
                        err := pgw.sinkDb.QueryRow(pgw.сtx, sqlEnsure, metric, pb.EndTime).Scan(&partInfo.EndTime)
×
423
                        if err != nil {
×
424
                                logger.Error("Failed to create partition on 'metrics':", err)
×
425
                                return err
×
426
                        }
×
427
                        partitionMapMetric[metric] = partInfo
×
428
                }
429
        }
430
        return nil
×
431
}
432

433
func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error) {
×
NEW
434
        logger := log.GetLogger(pgw.сtx)
×
435
        sqlEnsure := `select * from admin.ensure_partition_timescale($1)`
×
436
        for metric := range pgPartBounds {
×
437
                if strings.HasSuffix(metric, "_realtime") {
×
438
                        continue
×
439
                }
UNCOV
440
                if _, ok := partitionMapMetric[metric]; !ok {
×
NEW
441
                        if _, err = pgw.sinkDb.Exec(pgw.сtx, sqlEnsure, metric); err != nil {
×
UNCOV
442
                                logger.Errorf("Failed to create a TimescaleDB table for metric '%s': %v", metric, err)
×
443
                                return err
×
UNCOV
444
                        }
×
UNCOV
445
                        partitionMapMetric[metric] = ExistingPartitionInfo{}
×
446
                }
447
        }
448
        return pgw.EnsureMetricTime(pgPartBounds, force)
×
449
}
450

451
func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error) {
×
452
        var rows pgx.Rows
×
453
        sqlEnsure := `select * from admin.ensure_partition_metric_dbname_time($1, $2, $3)`
×
454
        for metric, dbnameTimestampMap := range metricDbnamePartBounds {
×
455
                _, ok := partitionMapMetricDbname[metric]
×
UNCOV
456
                if !ok {
×
UNCOV
457
                        partitionMapMetricDbname[metric] = make(map[string]ExistingPartitionInfo)
×
UNCOV
458
                }
×
459

460
                for dbname, pb := range dbnameTimestampMap {
×
461
                        if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
×
462
                                return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
×
463
                        }
×
464
                        partInfo, ok := partitionMapMetricDbname[metric][dbname]
×
UNCOV
465
                        if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
×
NEW
466
                                if rows, err = pgw.sinkDb.Query(pgw.сtx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
×
467
                                        return
×
468
                                }
×
469
                                if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
×
470
                                        return err
×
471
                                }
×
472
                                partitionMapMetricDbname[metric][dbname] = partInfo
×
473
                        }
474
                        if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || force {
×
NEW
475
                                if rows, err = pgw.sinkDb.Query(pgw.сtx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
×
476
                                        return
×
477
                                }
×
478
                                if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
×
479
                                        return err
×
480
                                }
×
481
                                partitionMapMetricDbname[metric][dbname] = partInfo
×
482
                        }
483
                }
484
        }
485
        return nil
×
486
}
487

488
// deleteOldPartitions is a background task that deletes old partitions from the measurements DB
489
func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) {
1✔
490
        metricAgeDaysThreshold := pgw.opts.Retention
1✔
491
        if metricAgeDaysThreshold <= 0 {
1✔
492
                return
×
493
        }
×
494
        logger := log.GetLogger(pgw.сtx)
1✔
495
        select {
1✔
NEW
496
        case <-pgw.сtx.Done():
×
UNCOV
497
                return
×
NEW
498
        case <-time.After(delay):
×
499
                // to reduce distracting log messages at startup
500
        }
501

502
        for {
×
NEW
503
                if pgw.metricSchema == DbStorageSchemaTimescale {
×
504
                        partsDropped, err := pgw.DropOldTimePartitions(metricAgeDaysThreshold)
×
505
                        if err != nil {
×
506
                                logger.Errorf("Failed to drop old partitions (>%d days) from Postgres: %v", metricAgeDaysThreshold, err)
×
507
                                continue
×
508
                        }
509
                        logger.Infof("Dropped %d old metric partitions...", partsDropped)
×
NEW
510
                } else if pgw.metricSchema == DbStorageSchemaPostgres {
×
511
                        partsToDrop, err := pgw.GetOldTimePartitions(metricAgeDaysThreshold)
×
512
                        if err != nil {
×
513
                                logger.Errorf("Failed to get a listing of old (>%d days) time partitions from Postgres metrics DB - check that the admin.get_old_time_partitions() function is rolled out: %v", metricAgeDaysThreshold, err)
×
514
                                time.Sleep(time.Second * 300)
×
515
                                continue
×
516
                        }
517
                        if len(partsToDrop) > 0 {
×
518
                                logger.Infof("Dropping %d old metric partitions one by one...", len(partsToDrop))
×
519
                                for _, toDrop := range partsToDrop {
×
NEW
520
                                        sqlDropTable := `DROP TABLE IF EXISTS ` + toDrop
×
521
                                        logger.Debugf("Dropping old metric data partition: %s", toDrop)
×
522

×
NEW
523
                                        if _, err := pgw.sinkDb.Exec(pgw.сtx, sqlDropTable); err != nil {
×
524
                                                logger.Errorf("Failed to drop old partition %s from Postgres metrics DB: %w", toDrop, err)
×
525
                                                time.Sleep(time.Second * 300)
×
526
                                        } else {
×
527
                                                time.Sleep(time.Second * 5)
×
528
                                        }
×
529
                                }
530
                        } else {
×
531
                                logger.Infof("No old metric partitions found to drop...")
×
532
                        }
×
533
                }
UNCOV
534
                select {
×
NEW
535
                case <-pgw.сtx.Done():
×
536
                        return
×
537
                case <-time.After(time.Hour * 12):
×
538
                }
539
        }
540
}
541

542
// maintainUniqueSources is a background task that maintains a listing of unique sources for each metric.
543
// This is used to avoid listing the same source multiple times in Grafana dropdowns.
544
func (pgw *PostgresWriter) maintainUniqueSources() {
1✔
545
        logger := log.GetLogger(pgw.сtx)
1✔
546
        // due to metrics deletion the listing can go out of sync (a trigger not really wanted)
1✔
547
        sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint
1✔
548
        sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
1✔
549
        sqlDistinct := `
1✔
550
        WITH RECURSIVE t(dbname) AS (
1✔
551
                SELECT MIN(dbname) AS dbname FROM %s
1✔
552
                UNION
1✔
553
                SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t )
1✔
554
        SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
1✔
555
        sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
1✔
556
        sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
1✔
557
        sqlAdd := `
1✔
558
                INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x
1✔
559
                WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
1✔
560
                RETURNING *`
1✔
561

1✔
562
        for {
2✔
563
                select {
1✔
NEW
564
                case <-pgw.сtx.Done():
×
565
                        return
×
566
                case <-time.After(time.Hour * 24):
×
567
                }
568
                var lock bool
×
569
                logger.Infof("Trying to get metricsDb listing maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
×
NEW
570
                if err := pgw.sinkDb.QueryRow(pgw.сtx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
×
571
                        logger.Error("Getting metricsDb listing maintainer advisory lock failed:", err)
×
572
                        continue
×
573
                }
UNCOV
574
                if !lock {
×
575
                        logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
×
576
                        continue
×
577
                }
578

579
                logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
×
NEW
580
                rows, _ := pgw.sinkDb.Query(pgw.сtx, sqlTopLevelMetrics)
×
581
                allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
×
582
                if err != nil {
×
583
                        logger.Error(err)
×
584
                        continue
×
585
                }
586

587
                for _, tableName := range allDistinctMetricTables {
×
UNCOV
588
                        foundDbnamesMap := make(map[string]bool)
×
UNCOV
589
                        foundDbnamesArr := make([]string, 0)
×
UNCOV
590
                        metricName := strings.Replace(tableName, "public.", "", 1)
×
UNCOV
591

×
UNCOV
592
                        logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
×
NEW
593
                        rows, _ := pgw.sinkDb.Query(pgw.сtx, fmt.Sprintf(sqlDistinct, tableName, tableName))
×
594
                        ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
×
595
                        // ret, err := DBExecRead(mainContext, metricDb, fmt.Sprintf(sqlDistinct, tableName, tableName))
×
596
                        if err != nil {
×
597
                                logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for '%s': %s", metricName, err)
×
UNCOV
598
                                break
×
599
                        }
600
                        for _, drDbname := range ret {
×
601
                                foundDbnamesMap[drDbname] = true // "set" behaviour, don't want duplicates
×
602
                        }
×
603

604
                        // delete all that are not known and add all that are not there
605
                        for k := range foundDbnamesMap {
×
UNCOV
606
                                foundDbnamesArr = append(foundDbnamesArr, k)
×
UNCOV
607
                        }
×
608
                        if len(foundDbnamesArr) == 0 { // delete all entries for given metric
×
609
                                logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)
×
610

×
NEW
611
                                _, err = pgw.sinkDb.Exec(pgw.сtx, sqlDeleteAll, metricName)
×
612
                                if err != nil {
×
613
                                        logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
×
614
                                }
×
615
                                continue
×
616
                        }
NEW
617
                        cmdTag, err := pgw.sinkDb.Exec(pgw.сtx, sqlDelete, foundDbnamesArr, metricName)
×
UNCOV
618
                        if err != nil {
×
UNCOV
619
                                logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
×
UNCOV
620
                        } else if cmdTag.RowsAffected() > 0 {
×
UNCOV
621
                                logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
×
UNCOV
622
                        }
×
NEW
623
                        cmdTag, err = pgw.sinkDb.Exec(pgw.сtx, sqlAdd, foundDbnamesArr, metricName)
×
UNCOV
624
                        if err != nil {
×
UNCOV
625
                                logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
×
UNCOV
626
                        } else if cmdTag.RowsAffected() > 0 {
×
UNCOV
627
                                logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
×
UNCOV
628
                        }
×
UNCOV
629
                        time.Sleep(time.Minute)
×
630
                }
631

632
        }
633
}
634

UNCOV
635
func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error) {
×
UNCOV
636
        sqlOldPart := `select admin.drop_old_time_partitions($1, $2)`
×
NEW
637
        err = pgw.sinkDb.QueryRow(pgw.сtx, sqlOldPart, metricAgeDaysThreshold, false).Scan(&res)
×
UNCOV
638
        return
×
UNCOV
639
}
×
640

UNCOV
641
func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error) {
×
UNCOV
642
        sqlGetOldParts := `select admin.get_old_time_partitions($1)`
×
NEW
643
        rows, err := pgw.sinkDb.Query(pgw.сtx, sqlGetOldParts, metricAgeDaysThreshold)
×
UNCOV
644
        if err == nil {
×
UNCOV
645
                return pgx.CollectRows(rows, pgx.RowTo[string])
×
UNCOV
646
        }
×
UNCOV
647
        return nil, err
×
648
}
649

650
func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error {
1✔
651
        sql := `insert into admin.all_distinct_dbname_metrics
1✔
652
                        select $1, $2
1✔
653
                        where not exists (
1✔
654
                                select * from admin.all_distinct_dbname_metrics where dbname = $1 and metric = $2
1✔
655
                        )`
1✔
656
        _, err := pgw.sinkDb.Exec(pgw.сtx, sql, dbUnique, metric)
1✔
657
        return err
1✔
658
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc