• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cybertec-postgresql / pgwatch3 / 8298420684

15 Mar 2024 03:06PM UTC coverage: 8.047% (-0.5%) from 8.559%
8298420684

push

github

web-flow
[*] update docker compose command to force recreate test database (#398)

419 of 5207 relevant lines covered (8.05%)

0.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.75
/src/sinks/postgres.go
1
package sinks
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "errors"
7
        "fmt"
8
        "regexp"
9
        "strings"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/cybertec-postgresql/pgwatch3/config"
14
        "github.com/cybertec-postgresql/pgwatch3/db"
15
        "github.com/cybertec-postgresql/pgwatch3/log"
16
        "github.com/cybertec-postgresql/pgwatch3/metrics"
17
        "github.com/jackc/pgx/v5"
18
)
19

20
const (
21
        cacheLimit      = 512
22
        highLoadTimeout = time.Second * 5
23
)
24

25
func NewPostgresWriter(ctx context.Context, connstr string, opts *config.Options, metricDefs metrics.MetricVersionDefs) (pgw *PostgresWriter, err error) {
×
26
        pgw = &PostgresWriter{
×
27
                Ctx:        ctx,
×
28
                MetricDefs: metricDefs,
×
29
                opts:       opts,
×
30
                input:      make(chan []metrics.MeasurementMessage, cacheLimit),
×
31
                lastError:  make(chan error),
×
32
        }
×
33

×
34
        if pgw.SinkDb, err = db.New(ctx, connstr); err != nil {
×
35
                return
×
36
        }
×
37
        if err = db.InitMeasurementDb(ctx, pgw.SinkDb); err != nil {
×
38
                return
×
39
        }
×
40
        if err = pgw.ReadMetricSchemaType(); err != nil {
×
41
                pgw.SinkDb.Close()
×
42
                return
×
43
        }
×
44
        if err = pgw.EnsureBuiltinMetricDummies(); err != nil {
×
45
                return
×
46
        }
×
47
        go pgw.OldPostgresMetricsDeleter()
×
48
        go pgw.UniqueDbnamesListingMaintainer()
×
49
        go pgw.poll()
×
50
        return
×
51
}
52

53
type PostgresWriter struct {
54
        Ctx          context.Context
55
        SinkDb       db.PgxPoolIface
56
        MetricSchema DbStorageSchemaType
57
        MetricDefs   metrics.MetricVersionDefs
58
        opts         *config.Options
59
        input        chan []metrics.MeasurementMessage
60
        lastError    chan error
61
}
62

63
type ExistingPartitionInfo struct {
64
        StartTime time.Time
65
        EndTime   time.Time
66
}
67

68
type MeasurementMessagePostgres struct {
69
        Time    time.Time
70
        DBName  string
71
        Metric  string
72
        Data    map[string]any
73
        TagData map[string]any
74
}
75

76
type DbStorageSchemaType int
77

78
const (
79
        DbStorageSchemaPostgres DbStorageSchemaType = iota
80
        DbStorageSchemaTimescale
81
)
82

83
func (pgw *PostgresWriter) ReadMetricSchemaType() (err error) {
1✔
84
        var isTs bool
1✔
85
        pgw.MetricSchema = DbStorageSchemaPostgres
1✔
86
        sqlSchemaType := `SELECT schema_type = 'timescale' FROM admin.storage_schema_type`
1✔
87
        if err = pgw.SinkDb.QueryRow(pgw.Ctx, sqlSchemaType).Scan(&isTs); err == nil && isTs {
2✔
88
                pgw.MetricSchema = DbStorageSchemaTimescale
1✔
89
        }
1✔
90
        return
1✔
91
}
92

93
const (
94
        epochColumnName string = "epoch_ns" // this column (epoch in nanoseconds) is expected in every metric query
95
        tagPrefix       string = "tag_"
96
)
97

98
const specialMetricPgbouncer = "^pgbouncer_(stats|pools)$"
99

100
var (
101
        regexIsPgbouncerMetrics         = regexp.MustCompile(specialMetricPgbouncer)
102
        forceRecreatePGMetricPartitions = false                                             // to signal override PG metrics storage cache
103
        partitionMapMetric              = make(map[string]ExistingPartitionInfo)            // metric = min/max bounds
104
        partitionMapMetricDbname        = make(map[string]map[string]ExistingPartitionInfo) // metric[dbname = min/max bounds]
105
)
106

107
func (pgw *PostgresWriter) SyncMetric(dbUnique, metricName, op string) error {
×
108
        if op == "add" {
×
109
                return errors.Join(
×
110
                        pgw.AddDBUniqueMetricToListingTable(dbUnique, metricName),
×
111
                        pgw.EnsureMetricDummy(metricName), // ensure that there is at least an empty top-level table not to get ugly Grafana notifications
×
112
                )
×
113
        }
×
114
        return nil
×
115
}
116

117
func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error) {
×
118
        names := []string{"sproc_changes", "table_changes", "index_changes", "privilege_changes", "object_changes", "configuration_changes"}
×
119
        for _, name := range names {
×
120
                err = errors.Join(err, pgw.EnsureMetricDummy(name))
×
121
        }
×
122
        return
×
123
}
124

125
func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error) {
×
126
        _, err = pgw.SinkDb.Exec(pgw.Ctx, "select admin.ensure_dummy_metrics_table($1)", metric)
×
127
        return
×
128
}
×
129

130
func (pgw *PostgresWriter) Write(msgs []metrics.MeasurementMessage) error {
×
131
        if pgw.Ctx.Err() != nil {
×
132
                return nil
×
133
        }
×
134
        select {
×
135
        case pgw.input <- msgs:
×
136
                // msgs sent
137
        case <-time.After(highLoadTimeout):
×
138
                // msgs dropped due to a huge load, check stdout or file for detailed log
139
        }
140
        select {
×
141
        case err := <-pgw.lastError:
×
142
                return err
×
143
        default:
×
144
                return nil
×
145
        }
146
}
147

148
func (pgw *PostgresWriter) poll() {
×
149
        cache := make([]metrics.MeasurementMessage, 0, cacheLimit)
×
150
        cacheTimeout := pgw.opts.Measurements.BatchingDelay
×
151
        tick := time.NewTicker(cacheTimeout)
×
152
        for {
×
153
                select {
×
154
                case <-pgw.Ctx.Done(): //check context with high priority
×
155
                        return
×
156
                default:
×
157
                        select {
×
158
                        case entry := <-pgw.input:
×
159
                                cache = append(cache, entry...)
×
160
                                if len(cache) < cacheLimit {
×
161
                                        break
×
162
                                }
163
                                tick.Stop()
×
164
                                pgw.write(cache)
×
165
                                cache = cache[:0]
×
166
                                tick = time.NewTicker(cacheTimeout)
×
167
                        case <-tick.C:
×
168
                                pgw.write(cache)
×
169
                                cache = cache[:0]
×
170
                        case <-pgw.Ctx.Done():
×
171
                                return
×
172
                        }
173
                }
174
        }
175
}
176

177
func (pgw *PostgresWriter) write(msgs []metrics.MeasurementMessage) {
×
178
        if len(msgs) == 0 {
×
179
                return
×
180
        }
×
181
        logger := log.GetLogger(pgw.Ctx).
×
182
                WithField("sink", "postgres").
×
183
                WithField("db", pgw.SinkDb.Config().ConnConfig.Database)
×
184
        tsWarningPrinted := false
×
185
        metricsToStorePerMetric := make(map[string][]MeasurementMessagePostgres)
×
186
        rowsBatched := 0
×
187
        totalRows := 0
×
188
        pgPartBounds := make(map[string]ExistingPartitionInfo)                  // metric=min/max
×
189
        pgPartBoundsDbName := make(map[string]map[string]ExistingPartitionInfo) // metric=[dbname=min/max]
×
190
        var err error
×
191

×
192
        for _, msg := range msgs {
×
193
                if msg.Data == nil || len(msg.Data) == 0 {
×
194
                        continue
×
195
                }
196
                logger.WithField("data", msg.Data).WithField("len", len(msg.Data)).Debug("Sending To Postgres")
×
197

×
198
                for _, dataRow := range msg.Data {
×
199
                        var epochTime time.Time
×
200
                        var epochNs int64
×
201

×
202
                        tags := make(map[string]any)
×
203
                        fields := make(map[string]any)
×
204

×
205
                        totalRows++
×
206

×
207
                        if msg.CustomTags != nil {
×
208
                                for k, v := range msg.CustomTags {
×
209
                                        tags[k] = fmt.Sprintf("%v", v)
×
210
                                }
×
211
                        }
212

213
                        for k, v := range dataRow {
×
214
                                if v == nil || v == "" {
×
215
                                        continue // not storing NULLs
×
216
                                }
217
                                if k == epochColumnName {
×
218
                                        epochNs = v.(int64)
×
219
                                } else if strings.HasPrefix(k, tagPrefix) {
×
220
                                        tag := k[4:]
×
221
                                        tags[tag] = fmt.Sprintf("%v", v)
×
222
                                } else {
×
223
                                        fields[k] = v
×
224
                                }
×
225
                        }
226

227
                        if epochNs == 0 {
×
228
                                if !tsWarningPrinted && !regexIsPgbouncerMetrics.MatchString(msg.MetricName) {
×
229
                                        logger.Warning("No timestamp_ns found, server time will be used. measurement:", msg.MetricName)
×
230
                                        tsWarningPrinted = true
×
231
                                }
×
232
                                epochTime = time.Now()
×
233
                        } else {
×
234
                                epochTime = time.Unix(0, epochNs)
×
235
                        }
×
236

237
                        var metricsArr []MeasurementMessagePostgres
×
238
                        var ok bool
×
239

×
240
                        metricNameTemp := msg.MetricName
×
241

×
242
                        metricsArr, ok = metricsToStorePerMetric[metricNameTemp]
×
243
                        if !ok {
×
244
                                metricsToStorePerMetric[metricNameTemp] = make([]MeasurementMessagePostgres, 0)
×
245
                        }
×
246
                        metricsArr = append(metricsArr, MeasurementMessagePostgres{Time: epochTime, DBName: msg.DBName,
×
247
                                Metric: msg.MetricName, Data: fields, TagData: tags})
×
248
                        metricsToStorePerMetric[metricNameTemp] = metricsArr
×
249

×
250
                        rowsBatched++
×
251

×
252
                        if pgw.MetricSchema == DbStorageSchemaTimescale {
×
253
                                // set min/max timestamps to check/create partitions
×
254
                                bounds, ok := pgPartBounds[msg.MetricName]
×
255
                                if !ok || (ok && epochTime.Before(bounds.StartTime)) {
×
256
                                        bounds.StartTime = epochTime
×
257
                                        pgPartBounds[msg.MetricName] = bounds
×
258
                                }
×
259
                                if !ok || (ok && epochTime.After(bounds.EndTime)) {
×
260
                                        bounds.EndTime = epochTime
×
261
                                        pgPartBounds[msg.MetricName] = bounds
×
262
                                }
×
263
                        } else if pgw.MetricSchema == DbStorageSchemaPostgres {
×
264
                                _, ok := pgPartBoundsDbName[msg.MetricName]
×
265
                                if !ok {
×
266
                                        pgPartBoundsDbName[msg.MetricName] = make(map[string]ExistingPartitionInfo)
×
267
                                }
×
268
                                bounds, ok := pgPartBoundsDbName[msg.MetricName][msg.DBName]
×
269
                                if !ok || (ok && epochTime.Before(bounds.StartTime)) {
×
270
                                        bounds.StartTime = epochTime
×
271
                                        pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
×
272
                                }
×
273
                                if !ok || (ok && epochTime.After(bounds.EndTime)) {
×
274
                                        bounds.EndTime = epochTime
×
275
                                        pgPartBoundsDbName[msg.MetricName][msg.DBName] = bounds
×
276
                                }
×
277
                        }
278
                }
279
        }
280

281
        if pgw.MetricSchema == DbStorageSchemaPostgres {
×
282
                err = pgw.EnsureMetricDbnameTime(pgPartBoundsDbName, forceRecreatePGMetricPartitions)
×
283
        } else if pgw.MetricSchema == DbStorageSchemaTimescale {
×
284
                err = pgw.EnsureMetricTimescale(pgPartBounds, forceRecreatePGMetricPartitions)
×
285
        } else {
×
286
                logger.Fatal("should never happen...")
×
287
        }
×
288
        if forceRecreatePGMetricPartitions {
×
289
                forceRecreatePGMetricPartitions = false
×
290
        }
×
291
        if err != nil {
×
292
                atomic.AddUint64(&datastoreWriteFailuresCounter, 1)
×
293
                pgw.lastError <- err
×
294
        }
×
295

296
        // send data to PG, with a separate COPY for all metrics
297
        logger.Debugf("COPY-ing %d metrics to Postgres metricsDB...", rowsBatched)
×
298
        t1 := time.Now()
×
299

×
300
        for metricName, metrics := range metricsToStorePerMetric {
×
301

×
302
                getTargetTable := func() pgx.Identifier {
×
303
                        return pgx.Identifier{metricName}
×
304
                }
×
305

306
                getTargetColumns := func() []string {
×
307
                        return []string{"time", "dbname", "data", "tag_data"}
×
308
                }
×
309

310
                for _, m := range metrics {
×
311
                        l := logger.WithField("db", m.DBName).WithField("metric", m.Metric)
×
312
                        jsonBytes, err := json.Marshal(m.Data)
×
313
                        if err != nil {
×
314
                                logger.Errorf("Skipping 1 metric for [%s:%s] due to JSON conversion error: %s", m.DBName, m.Metric, err)
×
315
                                atomic.AddUint64(&totalMetricsDroppedCounter, 1)
×
316
                                continue
×
317
                        }
318

319
                        getTagData := func() any {
×
320
                                if len(m.TagData) > 0 {
×
321
                                        jsonBytesTags, err := json.Marshal(m.TagData)
×
322
                                        if err != nil {
×
323
                                                l.Error(err)
×
324
                                                atomic.AddUint64(&datastoreWriteFailuresCounter, 1)
×
325
                                                return nil
×
326
                                        }
×
327
                                        return string(jsonBytesTags)
×
328
                                }
329
                                return nil
×
330
                        }
331

332
                        rows := [][]any{{m.Time, m.DBName, string(jsonBytes), getTagData()}}
×
333

×
334
                        if _, err = pgw.SinkDb.CopyFrom(context.Background(), getTargetTable(), getTargetColumns(), pgx.CopyFromRows(rows)); err != nil {
×
335
                                l.Error(err)
×
336
                                atomic.AddUint64(&datastoreWriteFailuresCounter, 1)
×
337
                                forceRecreatePGMetricPartitions = strings.Contains(err.Error(), "no partition")
×
338
                                if forceRecreatePGMetricPartitions {
×
339
                                        logger.Warning("Some metric partitions might have been removed, halting all metric storage. Trying to re-create all needed partitions on next run")
×
340
                                }
×
341
                        }
342
                }
343
        }
344

345
        diff := time.Since(t1)
×
346
        if err == nil {
×
347
                logger.WithField("rows", rowsBatched).WithField("elapsed", diff).Info("measurements written")
×
348
                // atomic.StoreInt64(&lastSuccessfulDatastoreWriteTimeEpoch, t1.Unix())
×
349
                // atomic.AddUint64(&datastoreTotalWriteTimeMicroseconds, uint64(diff.Microseconds()))
×
350
                // atomic.AddUint64(&datastoreWriteSuccessCounter, 1)
×
351
                return
×
352
        }
×
353
        pgw.lastError <- err
×
354
}
355

356
func (pgw *PostgresWriter) EnsureMetric(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error) {
×
357
        logger := log.GetLogger(pgw.Ctx)
×
358
        sqlEnsure := `select * from admin.ensure_partition_metric($1)`
×
359
        for metric := range pgPartBounds {
×
360
                if _, ok := partitionMapMetric[metric]; !ok || force {
×
361
                        if _, err = pgw.SinkDb.Exec(pgw.Ctx, sqlEnsure, metric); err != nil {
×
362
                                logger.Errorf("Failed to create partition on metric '%s': %w", metric, err)
×
363
                                return err
×
364
                        }
×
365
                        partitionMapMetric[metric] = ExistingPartitionInfo{}
×
366
                }
367
        }
368
        return nil
×
369
}
370

371
// EnsureMetricTime creates special partitions if Timescale used for realtime metrics
372
func (pgw *PostgresWriter) EnsureMetricTime(pgPartBounds map[string]ExistingPartitionInfo, force bool) error {
×
373
        logger := log.GetLogger(pgw.Ctx)
×
374
        sqlEnsure := `select * from admin.ensure_partition_metric_time($1, $2)`
×
375
        for metric, pb := range pgPartBounds {
×
376
                if !strings.HasSuffix(metric, "_realtime") {
×
377
                        continue
×
378
                }
379
                if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
×
380
                        return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
×
381
                }
×
382

383
                partInfo, ok := partitionMapMetric[metric]
×
384
                if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
×
385
                        err := pgw.SinkDb.QueryRow(pgw.Ctx, sqlEnsure, metric, pb.StartTime).Scan(&partInfo)
×
386
                        if err != nil {
×
387
                                logger.Error("Failed to create partition on 'metrics':", err)
×
388
                                return err
×
389
                        }
×
390
                        partitionMapMetric[metric] = partInfo
×
391
                }
392
                if pb.EndTime.After(partInfo.EndTime) || force {
×
393
                        err := pgw.SinkDb.QueryRow(pgw.Ctx, sqlEnsure, metric, pb.EndTime).Scan(&partInfo.EndTime)
×
394
                        if err != nil {
×
395
                                logger.Error("Failed to create partition on 'metrics':", err)
×
396
                                return err
×
397
                        }
×
398
                        partitionMapMetric[metric] = partInfo
×
399
                }
400
        }
401
        return nil
×
402
}
403

404
func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]ExistingPartitionInfo, force bool) (err error) {
×
405
        logger := log.GetLogger(pgw.Ctx)
×
406
        sqlEnsure := `select * from admin.ensure_partition_timescale($1)`
×
407
        for metric := range pgPartBounds {
×
408
                if strings.HasSuffix(metric, "_realtime") {
×
409
                        continue
×
410
                }
411
                if _, ok := partitionMapMetric[metric]; !ok {
×
412
                        if _, err = pgw.SinkDb.Exec(pgw.Ctx, sqlEnsure, metric); err != nil {
×
413
                                logger.Errorf("Failed to create a TimescaleDB table for metric '%s': %v", metric, err)
×
414
                                return err
×
415
                        }
×
416
                        partitionMapMetric[metric] = ExistingPartitionInfo{}
×
417
                }
418
        }
419
        return pgw.EnsureMetricTime(pgPartBounds, force)
×
420
}
421

422
func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error) {
×
423
        var rows pgx.Rows
×
424
        sqlEnsure := `select * from admin.ensure_partition_metric_dbname_time($1, $2, $3)`
×
425
        for metric, dbnameTimestampMap := range metricDbnamePartBounds {
×
426
                _, ok := partitionMapMetricDbname[metric]
×
427
                if !ok {
×
428
                        partitionMapMetricDbname[metric] = make(map[string]ExistingPartitionInfo)
×
429
                }
×
430

431
                for dbname, pb := range dbnameTimestampMap {
×
432
                        if pb.StartTime.IsZero() || pb.EndTime.IsZero() {
×
433
                                return fmt.Errorf("zero StartTime/EndTime in partitioning request: [%s:%v]", metric, pb)
×
434
                        }
×
435
                        partInfo, ok := partitionMapMetricDbname[metric][dbname]
×
436
                        if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
×
437
                                if rows, err = pgw.SinkDb.Query(pgw.Ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
×
438
                                        return
×
439
                                }
×
440
                                if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
×
441
                                        return err
×
442
                                }
×
443
                                partitionMapMetricDbname[metric][dbname] = partInfo
×
444
                        }
445
                        if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || force {
×
446
                                if rows, err = pgw.SinkDb.Query(pgw.Ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
×
447
                                        return
×
448
                                }
×
449
                                if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
×
450
                                        return err
×
451
                                }
×
452
                                partitionMapMetricDbname[metric][dbname] = partInfo
×
453
                        }
454
                }
455
        }
456
        return nil
×
457
}
458

459
func (pgw *PostgresWriter) OldPostgresMetricsDeleter() {
×
460
        metricAgeDaysThreshold := pgw.opts.Measurements.Retention
×
461
        if metricAgeDaysThreshold <= 0 {
×
462
                return
×
463
        }
×
464
        logger := log.GetLogger(pgw.Ctx)
×
465
        select {
×
466
        case <-pgw.Ctx.Done():
×
467
                return
×
468
        case <-time.After(time.Hour):
×
469
                // to reduce distracting log messages at startup
470
        }
471

472
        for {
×
473
                if pgw.MetricSchema == DbStorageSchemaTimescale {
×
474
                        partsDropped, err := pgw.DropOldTimePartitions(metricAgeDaysThreshold)
×
475
                        if err != nil {
×
476
                                logger.Errorf("Failed to drop old partitions (>%d days) from Postgres: %v", metricAgeDaysThreshold, err)
×
477
                                continue
×
478
                        }
479
                        logger.Infof("Dropped %d old metric partitions...", partsDropped)
×
480
                } else if pgw.MetricSchema == DbStorageSchemaPostgres {
×
481
                        partsToDrop, err := pgw.GetOldTimePartitions(metricAgeDaysThreshold)
×
482
                        if err != nil {
×
483
                                logger.Errorf("Failed to get a listing of old (>%d days) time partitions from Postgres metrics DB - check that the admin.get_old_time_partitions() function is rolled out: %v", metricAgeDaysThreshold, err)
×
484
                                time.Sleep(time.Second * 300)
×
485
                                continue
×
486
                        }
487
                        if len(partsToDrop) > 0 {
×
488
                                logger.Infof("Dropping %d old metric partitions one by one...", len(partsToDrop))
×
489
                                for _, toDrop := range partsToDrop {
×
490
                                        sqlDropTable := `DROP TABLE IF EXISTS ` + pgx.Identifier{toDrop}.Sanitize()
×
491
                                        logger.Debugf("Dropping old metric data partition: %s", toDrop)
×
492

×
493
                                        if _, err := pgw.SinkDb.Exec(pgw.Ctx, sqlDropTable); err != nil {
×
494
                                                logger.Errorf("Failed to drop old partition %s from Postgres metrics DB: %w", toDrop, err)
×
495
                                                time.Sleep(time.Second * 300)
×
496
                                        } else {
×
497
                                                time.Sleep(time.Second * 5)
×
498
                                        }
×
499
                                }
500
                        } else {
×
501
                                logger.Infof("No old metric partitions found to drop...")
×
502
                        }
×
503
                }
504
                select {
×
505
                case <-pgw.Ctx.Done():
×
506
                        return
×
507
                case <-time.After(time.Hour * 12):
×
508
                }
509
        }
510
}
511

512
func (pgw *PostgresWriter) UniqueDbnamesListingMaintainer() {
×
513
        logger := log.GetLogger(pgw.Ctx)
×
514
        // due to metrics deletion the listing can go out of sync (a trigger not really wanted)
×
515
        sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint
×
516
        sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
×
517
        sqlDistinct := `
×
518
        WITH RECURSIVE t(dbname) AS (
×
519
                SELECT MIN(dbname) AS dbname FROM %s
×
520
                UNION
×
521
                SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t )
×
522
        SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
×
523
        sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2 RETURNING *`
×
524
        sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1 RETURNING *`
×
525
        sqlAdd := `
×
526
                INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x
×
527
                WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
×
528
                RETURNING *`
×
529

×
530
        for {
×
531
                select {
×
532
                case <-pgw.Ctx.Done():
×
533
                        return
×
534
                case <-time.After(time.Hour * 24):
×
535
                }
536
                var lock bool
×
537
                logger.Infof("Trying to get metricsDb listing maintaner advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
×
538
                if err := pgw.SinkDb.QueryRow(pgw.Ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil {
×
539
                        logger.Error("Getting metricsDb listing maintaner advisory lock failed:", err)
×
540
                        continue
×
541
                }
542
                if !lock {
×
543
                        logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...")
×
544
                        continue
×
545
                }
546

547
                logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
×
548
                rows, _ := pgw.SinkDb.Query(pgw.Ctx, sqlTopLevelMetrics)
×
549
                allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
×
550
                if err != nil {
×
551
                        logger.Error(err)
×
552
                        continue
×
553
                }
554

555
                for _, tableName := range allDistinctMetricTables {
×
556
                        foundDbnamesMap := make(map[string]bool)
×
557
                        foundDbnamesArr := make([]string, 0)
×
558
                        metricName := strings.Replace(tableName, "public.", "", 1)
×
559

×
560
                        logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
×
561
                        rows, _ := pgw.SinkDb.Query(pgw.Ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
×
562
                        ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
×
563
                        // ret, err := DBExecRead(mainContext, metricDb, fmt.Sprintf(sqlDistinct, tableName, tableName))
×
564
                        if err != nil {
×
565
                                logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for '%s': %s", metricName, err)
×
566
                                break
×
567
                        }
568
                        for _, drDbname := range ret {
×
569
                                foundDbnamesMap[drDbname] = true // "set" behaviour, don't want duplicates
×
570
                        }
×
571

572
                        // delete all that are not known and add all that are not there
573
                        for k := range foundDbnamesMap {
×
574
                                foundDbnamesArr = append(foundDbnamesArr, k)
×
575
                        }
×
576
                        if len(foundDbnamesArr) == 0 { // delete all entries for given metric
×
577
                                logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)
×
578

×
579
                                _, err = pgw.SinkDb.Exec(pgw.Ctx, sqlDeleteAll, metricName)
×
580
                                if err != nil {
×
581
                                        logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
×
582
                                }
×
583
                                continue
×
584
                        }
585
                        cmdTag, err := pgw.SinkDb.Exec(pgw.Ctx, sqlDelete, foundDbnamesArr, metricName)
×
586
                        if err != nil {
×
587
                                logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
×
588
                        } else if cmdTag.RowsAffected() > 0 {
×
589
                                logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
×
590
                        }
×
591
                        cmdTag, err = pgw.SinkDb.Exec(pgw.Ctx, sqlAdd, foundDbnamesArr, metricName)
×
592
                        if err != nil {
×
593
                                logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
×
594
                        } else if cmdTag.RowsAffected() > 0 {
×
595
                                logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
×
596
                        }
×
597
                        time.Sleep(time.Minute)
×
598
                }
599

600
        }
601
}
602

603
func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error) {
×
604
        sqlOldPart := `select admin.drop_old_time_partitions($1, $2)`
×
605
        err = pgw.SinkDb.QueryRow(pgw.Ctx, sqlOldPart, metricAgeDaysThreshold, false).Scan(&res)
×
606
        return
×
607
}
×
608

609
func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error) {
×
610
        sqlGetOldParts := `select admin.get_old_time_partitions($1)`
×
611
        rows, err := pgw.SinkDb.Query(pgw.Ctx, sqlGetOldParts, metricAgeDaysThreshold)
×
612
        if err == nil {
×
613
                return pgx.CollectRows(rows, pgx.RowTo[string])
×
614
        }
×
615
        return nil, err
×
616
}
617

618
func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error {
×
619
        sql := `insert into admin.all_distinct_dbname_metrics
×
620
                        select $1, $2
×
621
                        where not exists (
×
622
                                select * from admin.all_distinct_dbname_metrics where dbname = $1 and metric = $2
×
623
                        )`
×
624
        _, err := pgw.SinkDb.Exec(pgw.Ctx, sql, dbUnique, metric)
×
625
        return err
×
626
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc