• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

goto / meteor / 5664540450

26 Jul 2023 04:08AM UTC coverage: 84.621% (+0.02%) from 84.599%
5664540450

push

github

web-flow
feat: instrument otel in sql based extractors (#24)

- Use *Context variants for executing queries.
- Upgrade go-ora version to fix failing test.

126 of 126 new or added lines in 8 files covered. (100.0%)

6394 of 7556 relevant lines covered (84.62%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.74
/plugins/extractors/postgres/postgres.go
1
package postgres
2

3
import (
4
        "context"
5
        "database/sql"
6
        _ "embed" // // used to print the embedded assets
7
        "fmt"
8
        "net/url"
9
        "strings"
10

11
        // used to register the postgres driver
12

13
        "github.com/goto/meteor/models"
14
        v1beta2 "github.com/goto/meteor/models/gotocompany/assets/v1beta2"
15
        "github.com/goto/meteor/plugins"
16
        "github.com/goto/meteor/plugins/sqlutil"
17
        "github.com/goto/meteor/registry"
18
        "github.com/goto/meteor/utils"
19
        "github.com/goto/salt/log"
20
        _ "github.com/lib/pq" // register postgres driver
21
        semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
22
        "google.golang.org/protobuf/types/known/anypb"
23
        "google.golang.org/protobuf/types/known/structpb"
24
)
25

26
//go:embed README.md
27
var summary string
28

29
var defaultDBList = []string{"information_schema", "root", "postgres"}
30

31
// Config holds the set of configuration options for the extractor
32
type Config struct {
33
        ConnectionURL string `mapstructure:"connection_url" validate:"required"`
34
        Exclude       string `mapstructure:"exclude"`
35
}
36

37
var sampleConfig = `
38
connection_url: "postgres://admin:pass123@localhost:3306/postgres?sslmode=disable"
39
exclude: testDB,secondaryDB`
40

41
var info = plugins.Info{
42
        Description:  "Table metadata and metrics from Postgres SQL sever.",
43
        SampleConfig: sampleConfig,
44
        Summary:      summary,
45
        Tags:         []string{"oss", "extractor"},
46
}
47

48
// Extractor manages the extraction of data from the extractor
49
type Extractor struct {
50
        plugins.BaseExtractor
51
        excludedDbs map[string]bool
52
        logger      log.Logger
53
        config      Config
54
        db          *sql.DB
55

56
        // These below values are used to recreate a connection for each database
57
        host     string
58
        username string
59
        password string
60
        sslmode  string
61
}
62

63
// New returns a pointer to an initialized Extractor Object
64
func New(logger log.Logger) *Extractor {
1✔
65
        e := &Extractor{
1✔
66
                logger: logger,
1✔
67
        }
1✔
68
        e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config)
1✔
69

1✔
70
        return e
1✔
71
}
1✔
72

73
// Init initializes the extractor
74
func (e *Extractor) Init(ctx context.Context, config plugins.Config) (err error) {
1✔
75
        err = e.BaseExtractor.Init(ctx, config)
1✔
76
        if err != nil {
2✔
77
                return err
1✔
78
        }
1✔
79

80
        // build excluded database list
81
        excludeList := append(defaultDBList, strings.Split(e.config.Exclude, ",")...)
1✔
82
        e.excludedDbs = sqlutil.BuildBoolMap(excludeList)
1✔
83

1✔
84
        // Create database connection
1✔
85
        e.db, err = sqlutil.OpenWithOtel("postgres", e.config.ConnectionURL, semconv.DBSystemPostgreSQL)
1✔
86
        if err != nil {
1✔
87
                return fmt.Errorf("create a client: %w", err)
×
88
        }
×
89

90
        if err := e.extractConnectionComponents(e.config.ConnectionURL); err != nil {
1✔
91
                return fmt.Errorf("split host from connection string: %w", err)
×
92
        }
×
93

94
        return nil
1✔
95
}
96

97
// Extract collects metadata from the source. Metadata is collected through the emitter
98
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
1✔
99
        defer e.db.Close()
1✔
100

1✔
101
        // Get list of databases
1✔
102
        dbs, err := sqlutil.FetchDBs(ctx, e.db, e.logger, "SELECT datname FROM pg_database WHERE datistemplate = false;")
1✔
103
        if err != nil {
1✔
104
                return fmt.Errorf("fetch databases: %w", err)
×
105
        }
×
106

107
        // Iterate through all tables and databases
108
        for _, database := range dbs {
2✔
109
                // skip dbs meant to be excluded
1✔
110
                if e.isExcludedDB(database) {
2✔
111
                        continue
1✔
112
                }
113
                // Open a new connection to the given database to collect
114
                // tables information without this default database
115
                // information will be returned
116

117
                db, err := e.connection(database)
1✔
118
                if err != nil {
1✔
119
                        e.logger.Error("failed to connect, skipping database", "error", err)
×
120
                        continue
×
121
                }
122
                query := `SELECT table_name
1✔
123
                FROM information_schema.tables
1✔
124
                WHERE table_schema = 'public'
1✔
125
                ORDER BY table_name;`
1✔
126

1✔
127
                _, err = db.Exec(fmt.Sprintf("SET search_path TO %s, public;", database))
1✔
128
                if err != nil {
1✔
129
                        e.logger.Error("failed to get tables, skipping database", "error", err)
×
130
                        continue
×
131
                }
132
                tables, err := sqlutil.FetchTablesInDB(ctx, db, database, query)
1✔
133
                if err != nil {
1✔
134
                        e.logger.Error("failed to get tables, skipping database", "error", err)
×
135
                        continue
×
136
                }
137

138
                for _, table := range tables {
2✔
139
                        result, err := e.getTableMetadata(ctx, db, database, table)
1✔
140
                        if err != nil {
1✔
141
                                e.logger.Error("failed to get table metadata, skipping table", "error", err)
×
142
                                continue
×
143
                        }
144
                        // Publish metadata to channel
145
                        emit(models.NewRecord(result))
1✔
146
                }
147
        }
148

149
        return nil
1✔
150
}
151

152
// Prepares the list of tables and the attached metadata
153
func (e *Extractor) getTableMetadata(ctx context.Context, db *sql.DB, dbName, tableName string) (*v1beta2.Asset, error) {
1✔
154
        columns, err := e.getColumnMetadata(ctx, db, tableName)
1✔
155
        if err != nil {
1✔
156
                return nil, err
×
157
        }
×
158

159
        usrPrivilegeInfo, err := e.userPrivilegesInfo(ctx, db, dbName, tableName)
1✔
160
        if err != nil {
1✔
161
                e.logger.Warn("unable to fetch user privileges info", "err", err, "table", fmt.Sprintf("%s.%s", dbName, tableName))
×
162
        }
×
163

164
        table, err := anypb.New(&v1beta2.Table{
1✔
165
                Columns:    columns,
1✔
166
                Attributes: usrPrivilegeInfo,
1✔
167
        })
1✔
168
        if err != nil {
1✔
169
                return nil, fmt.Errorf("create Any struct: %w", err)
×
170
        }
×
171
        return &v1beta2.Asset{
1✔
172
                Urn:     models.NewURN("postgres", e.UrnScope, "table", fmt.Sprintf("%s.%s", dbName, tableName)),
1✔
173
                Name:    tableName,
1✔
174
                Service: "postgres",
1✔
175
                Type:    "table",
1✔
176
                Data:    table,
1✔
177
        }, nil
1✔
178
}
179

180
// Prepares the list of columns and the attached metadata
181
func (e *Extractor) getColumnMetadata(ctx context.Context, db *sql.DB, tableName string) ([]*v1beta2.Column, error) {
1✔
182
        sqlStr := `SELECT COLUMN_NAME,DATA_TYPE,
1✔
183
                                IS_NULLABLE,coalesce(CHARACTER_MAXIMUM_LENGTH,0)
1✔
184
                                FROM information_schema.columns
1✔
185
                                WHERE TABLE_NAME = '%s' ORDER BY COLUMN_NAME ASC;`
1✔
186
        rows, err := db.QueryContext(ctx, fmt.Sprintf(sqlStr, tableName))
1✔
187
        if err != nil {
1✔
188
                return nil, fmt.Errorf("execute query: %w", err)
×
189
        }
×
190
        defer rows.Close()
1✔
191

1✔
192
        var result []*v1beta2.Column
1✔
193
        for rows.Next() {
2✔
194
                var fieldName, dataType, isNullableString string
1✔
195
                var length int
1✔
196
                if err = rows.Scan(&fieldName, &dataType, &isNullableString, &length); err != nil {
1✔
197
                        e.logger.Error("failed to get fields", "error", err)
×
198
                        continue
×
199
                }
200
                result = append(result, &v1beta2.Column{
1✔
201
                        Name:       fieldName,
1✔
202
                        DataType:   dataType,
1✔
203
                        IsNullable: isNullable(isNullableString),
1✔
204
                        Length:     int64(length),
1✔
205
                })
1✔
206
        }
207
        if err := rows.Err(); err != nil {
1✔
208
                return nil, fmt.Errorf("iterate over table columns: %w", err)
×
209
        }
×
210

211
        return result, nil
1✔
212
}
213

214
func (e *Extractor) userPrivilegesInfo(ctx context.Context, db *sql.DB, dbName, tableName string) (*structpb.Struct, error) {
1✔
215
        query := `SELECT grantee, string_agg(privilege_type, ',') 
1✔
216
        FROM information_schema.role_table_grants 
1✔
217
        WHERE table_name='%s' AND table_catalog='%s'
1✔
218
        GROUP BY grantee;`
1✔
219

1✔
220
        rows, err := db.QueryContext(ctx, fmt.Sprintf(query, tableName, dbName))
1✔
221
        if err != nil {
1✔
222
                return nil, fmt.Errorf("execute query: %w", err)
×
223
        }
×
224
        defer rows.Close()
1✔
225

1✔
226
        var usrs []interface{}
1✔
227
        for rows.Next() {
2✔
228
                var grantee, privilege_type string
1✔
229
                if err := rows.Scan(&grantee, &privilege_type); err != nil {
1✔
230
                        e.logger.Error("failed to get fields", "error", err)
×
231
                        continue
×
232
                }
233

234
                usrs = append(usrs, map[string]interface{}{
1✔
235
                        "user":            grantee,
1✔
236
                        "privilege_types": ConvertStringListToInterface(strings.Split(privilege_type, ",")),
1✔
237
                })
1✔
238
        }
239
        if err := rows.Err(); err != nil {
1✔
240
                return nil, fmt.Errorf("iterate over user privileges: %w", err)
×
241
        }
×
242

243
        grants := map[string]interface{}{
1✔
244
                "grants": usrs,
1✔
245
        }
1✔
246
        return utils.TryParseMapToProto(grants), nil
1✔
247
}
248

249
// Convert nullable string to a boolean
250
func isNullable(value string) bool {
1✔
251
        return value == "YES"
1✔
252
}
1✔
253

254
// connection generates a connection string
255
func (e *Extractor) connection(database string) (*sql.DB, error) {
1✔
256
        connStr := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=%s", e.username, e.password, e.host, database, e.sslmode)
1✔
257
        return sql.Open("postgres", connStr)
1✔
258
}
1✔
259

260
// extractConnectionComponents extracts the components from the connection URL
261
func (e *Extractor) extractConnectionComponents(connectionURL string) error {
1✔
262
        connectionStr, err := url.Parse(connectionURL)
1✔
263
        if err != nil {
1✔
264
                return fmt.Errorf("parse connection URL: %w", err)
×
265
        }
×
266
        e.host = connectionStr.Host
1✔
267
        e.username = connectionStr.User.Username()
1✔
268
        e.password, _ = connectionStr.User.Password()
1✔
269
        e.sslmode = connectionStr.Query().Get("sslmode")
1✔
270

1✔
271
        return nil
1✔
272
}
273

274
// isExcludedDB checks if the given db is in the list of excluded databases
275
func (e *Extractor) isExcludedDB(database string) bool {
1✔
276
        _, ok := e.excludedDbs[database]
1✔
277
        return ok
1✔
278
}
1✔
279

280
// Register the extractor to catalog
281
func init() {
1✔
282
        if err := registry.Extractors.Register("postgres", func() plugins.Extractor {
1✔
283
                return New(plugins.GetLog())
×
284
        }); err != nil {
×
285
                panic(err)
×
286
        }
287
}
288

289
func ConvertStringListToInterface(s []string) []interface{} {
1✔
290
        out := make([]interface{}, len(s))
1✔
291
        for i, v := range s {
2✔
292
                out[i] = v
1✔
293
        }
1✔
294
        return out
1✔
295
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc