• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

raystack / meteor / 6291986723

24 Sep 2023 07:30PM UTC coverage: 83.215% (+9.5%) from 73.754%
6291986723

push

github

web-flow
chore: fix bigquery plugin test (#484)

6782 of 8150 relevant lines covered (83.21%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.74
/plugins/extractors/postgres/postgres.go
1
package postgres
2

3
import (
4
        "context"
5
        "database/sql"
6
        _ "embed" // // used to print the embedded assets
7
        "fmt"
8
        "net/url"
9
        "strings"
10

11
        _ "github.com/lib/pq" // register postgres driver
12
        "github.com/raystack/meteor/models"
13
        v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2"
14
        "github.com/raystack/meteor/plugins"
15
        "github.com/raystack/meteor/plugins/sqlutil"
16
        "github.com/raystack/meteor/registry"
17
        "github.com/raystack/meteor/utils"
18
        "github.com/raystack/salt/log"
19
        semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
20
        "google.golang.org/protobuf/types/known/anypb"
21
        "google.golang.org/protobuf/types/known/structpb"
22
)
23

24
//go:embed README.md
25
var summary string
26

27
var defaultDBList = []string{"information_schema", "root", "postgres"}
28

29
// Config holds the set of configuration options for the extractor
30
type Config struct {
31
        ConnectionURL string `json:"connection_url" yaml:"connection_url" mapstructure:"connection_url" validate:"required"`
32
        Exclude       string `json:"exclude" yaml:"exclude" mapstructure:"exclude"`
33
}
34

35
var sampleConfig = `
36
connection_url: "postgres://admin:pass123@localhost:3306/postgres?sslmode=disable"
37
exclude: testDB,secondaryDB`
38

39
var info = plugins.Info{
40
        Description:  "Table metadata and metrics from Postgres SQL sever.",
41
        SampleConfig: sampleConfig,
42
        Summary:      summary,
43
        Tags:         []string{"oss", "extractor"},
44
}
45

46
// Extractor manages the extraction of data from the extractor
47
type Extractor struct {
48
        plugins.BaseExtractor
49
        excludedDbs map[string]bool
50
        logger      log.Logger
51
        config      Config
52
        db          *sql.DB
53

54
        // These below values are used to recreate a connection for each database
55
        host     string
56
        username string
57
        password string
58
        sslmode  string
59
}
60

61
// New returns a pointer to an initialized Extractor Object
62
func New(logger log.Logger) *Extractor {
1✔
63
        e := &Extractor{
1✔
64
                logger: logger,
1✔
65
        }
1✔
66
        e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config)
1✔
67

1✔
68
        return e
1✔
69
}
1✔
70

71
// Init initializes the extractor
72
func (e *Extractor) Init(ctx context.Context, config plugins.Config) (err error) {
1✔
73
        err = e.BaseExtractor.Init(ctx, config)
1✔
74
        if err != nil {
2✔
75
                return err
1✔
76
        }
1✔
77

78
        // build excluded database list
79
        excludeList := append(defaultDBList, strings.Split(e.config.Exclude, ",")...)
1✔
80
        e.excludedDbs = sqlutil.BuildBoolMap(excludeList)
1✔
81

1✔
82
        // Create database connection
1✔
83
        e.db, err = sqlutil.OpenWithOtel("postgres", e.config.ConnectionURL, semconv.DBSystemPostgreSQL)
1✔
84
        if err != nil {
1✔
85
                return fmt.Errorf("create a client: %w", err)
×
86
        }
×
87

88
        if err := e.extractConnectionComponents(e.config.ConnectionURL); err != nil {
1✔
89
                return fmt.Errorf("split host from connection string: %w", err)
×
90
        }
×
91

92
        return nil
1✔
93
}
94

95
// Extract collects metadata from the source. Metadata is collected through the emitter
96
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
1✔
97
        defer e.db.Close()
1✔
98

1✔
99
        // Get list of databases
1✔
100
        dbs, err := sqlutil.FetchDBs(ctx, e.db, e.logger, "SELECT datname FROM pg_database WHERE datistemplate = false;")
1✔
101
        if err != nil {
1✔
102
                return fmt.Errorf("fetch databases: %w", err)
×
103
        }
×
104

105
        // Iterate through all tables and databases
106
        for _, database := range dbs {
2✔
107
                // skip dbs meant to be excluded
1✔
108
                if e.isExcludedDB(database) {
2✔
109
                        continue
1✔
110
                }
111
                // Open a new connection to the given database to collect
112
                // tables information without this default database
113
                // information will be returned
114

115
                db, err := e.connection(database)
1✔
116
                if err != nil {
1✔
117
                        e.logger.Error("failed to connect, skipping database", "error", err)
×
118
                        continue
×
119
                }
120
                query := `SELECT table_name
1✔
121
                FROM information_schema.tables
1✔
122
                WHERE table_schema = 'public'
1✔
123
                ORDER BY table_name;`
1✔
124

1✔
125
                _, err = db.Exec(fmt.Sprintf("SET search_path TO %s, public;", database))
1✔
126
                if err != nil {
1✔
127
                        e.logger.Error("failed to get tables, skipping database", "error", err)
×
128
                        continue
×
129
                }
130
                tables, err := sqlutil.FetchTablesInDB(ctx, db, database, query)
1✔
131
                if err != nil {
1✔
132
                        e.logger.Error("failed to get tables, skipping database", "error", err)
×
133
                        continue
×
134
                }
135

136
                for _, table := range tables {
2✔
137
                        result, err := e.getTableMetadata(ctx, db, database, table)
1✔
138
                        if err != nil {
1✔
139
                                e.logger.Error("failed to get table metadata, skipping table", "error", err)
×
140
                                continue
×
141
                        }
142
                        // Publish metadata to channel
143
                        emit(models.NewRecord(result))
1✔
144
                }
145
        }
146

147
        return nil
1✔
148
}
149

150
// Prepares the list of tables and the attached metadata
151
func (e *Extractor) getTableMetadata(ctx context.Context, db *sql.DB, dbName, tableName string) (*v1beta2.Asset, error) {
1✔
152
        columns, err := e.getColumnMetadata(ctx, db, tableName)
1✔
153
        if err != nil {
1✔
154
                return nil, err
×
155
        }
×
156

157
        usrPrivilegeInfo, err := e.userPrivilegesInfo(ctx, db, dbName, tableName)
1✔
158
        if err != nil {
1✔
159
                e.logger.Warn("unable to fetch user privileges info", "err", err, "table", fmt.Sprintf("%s.%s", dbName, tableName))
×
160
        }
×
161

162
        table, err := anypb.New(&v1beta2.Table{
1✔
163
                Columns:    columns,
1✔
164
                Attributes: usrPrivilegeInfo,
1✔
165
        })
1✔
166
        if err != nil {
1✔
167
                return nil, fmt.Errorf("create Any struct: %w", err)
×
168
        }
×
169
        return &v1beta2.Asset{
1✔
170
                Urn:     models.NewURN("postgres", e.UrnScope, "table", fmt.Sprintf("%s.%s", dbName, tableName)),
1✔
171
                Name:    tableName,
1✔
172
                Service: "postgres",
1✔
173
                Type:    "table",
1✔
174
                Data:    table,
1✔
175
        }, nil
1✔
176
}
177

178
// Prepares the list of columns and the attached metadata
179
func (e *Extractor) getColumnMetadata(ctx context.Context, db *sql.DB, tableName string) ([]*v1beta2.Column, error) {
1✔
180
        sqlStr := `SELECT COLUMN_NAME,DATA_TYPE,
1✔
181
                                IS_NULLABLE,coalesce(CHARACTER_MAXIMUM_LENGTH,0)
1✔
182
                                FROM information_schema.columns
1✔
183
                                WHERE TABLE_NAME = '%s' ORDER BY COLUMN_NAME ASC;`
1✔
184
        rows, err := db.QueryContext(ctx, fmt.Sprintf(sqlStr, tableName))
1✔
185
        if err != nil {
1✔
186
                return nil, fmt.Errorf("execute query: %w", err)
×
187
        }
×
188
        defer rows.Close()
1✔
189

1✔
190
        var result []*v1beta2.Column
1✔
191
        for rows.Next() {
2✔
192
                var fieldName, dataType, isNullableString string
1✔
193
                var length int
1✔
194
                if err = rows.Scan(&fieldName, &dataType, &isNullableString, &length); err != nil {
1✔
195
                        e.logger.Error("failed to get fields", "error", err)
×
196
                        continue
×
197
                }
198
                result = append(result, &v1beta2.Column{
1✔
199
                        Name:       fieldName,
1✔
200
                        DataType:   dataType,
1✔
201
                        IsNullable: isNullable(isNullableString),
1✔
202
                        Length:     int64(length),
1✔
203
                })
1✔
204
        }
205
        if err := rows.Err(); err != nil {
1✔
206
                return nil, fmt.Errorf("iterate over table columns: %w", err)
×
207
        }
×
208

209
        return result, nil
1✔
210
}
211

212
func (e *Extractor) userPrivilegesInfo(ctx context.Context, db *sql.DB, dbName, tableName string) (*structpb.Struct, error) {
1✔
213
        query := `SELECT grantee, string_agg(privilege_type, ',') 
1✔
214
        FROM information_schema.role_table_grants 
1✔
215
        WHERE table_name='%s' AND table_catalog='%s'
1✔
216
        GROUP BY grantee;`
1✔
217

1✔
218
        rows, err := db.QueryContext(ctx, fmt.Sprintf(query, tableName, dbName))
1✔
219
        if err != nil {
1✔
220
                return nil, fmt.Errorf("execute query: %w", err)
×
221
        }
×
222
        defer rows.Close()
1✔
223

1✔
224
        var usrs []interface{}
1✔
225
        for rows.Next() {
2✔
226
                var grantee, privilege_type string
1✔
227
                if err := rows.Scan(&grantee, &privilege_type); err != nil {
1✔
228
                        e.logger.Error("failed to get fields", "error", err)
×
229
                        continue
×
230
                }
231

232
                usrs = append(usrs, map[string]interface{}{
1✔
233
                        "user":            grantee,
1✔
234
                        "privilege_types": ConvertStringListToInterface(strings.Split(privilege_type, ",")),
1✔
235
                })
1✔
236
        }
237
        if err := rows.Err(); err != nil {
1✔
238
                return nil, fmt.Errorf("iterate over user privileges: %w", err)
×
239
        }
×
240

241
        grants := map[string]interface{}{
1✔
242
                "grants": usrs,
1✔
243
        }
1✔
244
        return utils.TryParseMapToProto(grants), nil
1✔
245
}
246

247
// Convert nullable string to a boolean
248
func isNullable(value string) bool {
1✔
249
        return value == "YES"
1✔
250
}
1✔
251

252
// connection generates a connection string
253
func (e *Extractor) connection(database string) (*sql.DB, error) {
1✔
254
        connStr := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=%s", e.username, e.password, e.host, database, e.sslmode)
1✔
255
        return sql.Open("postgres", connStr)
1✔
256
}
1✔
257

258
// extractConnectionComponents extracts the components from the connection URL
259
func (e *Extractor) extractConnectionComponents(connectionURL string) error {
1✔
260
        connectionStr, err := url.Parse(connectionURL)
1✔
261
        if err != nil {
1✔
262
                return fmt.Errorf("parse connection URL: %w", err)
×
263
        }
×
264
        e.host = connectionStr.Host
1✔
265
        e.username = connectionStr.User.Username()
1✔
266
        e.password, _ = connectionStr.User.Password()
1✔
267
        e.sslmode = connectionStr.Query().Get("sslmode")
1✔
268

1✔
269
        return nil
1✔
270
}
271

272
// isExcludedDB checks if the given db is in the list of excluded databases
273
func (e *Extractor) isExcludedDB(database string) bool {
1✔
274
        _, ok := e.excludedDbs[database]
1✔
275
        return ok
1✔
276
}
1✔
277

278
// Register the extractor to catalog
279
func init() {
1✔
280
        if err := registry.Extractors.Register("postgres", func() plugins.Extractor {
1✔
281
                return New(plugins.GetLog())
×
282
        }); err != nil {
×
283
                panic(err)
×
284
        }
285
}
286

287
func ConvertStringListToInterface(s []string) []interface{} {
1✔
288
        out := make([]interface{}, len(s))
1✔
289
        for i, v := range s {
2✔
290
                out[i] = v
1✔
291
        }
1✔
292
        return out
1✔
293
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc