• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

raystack / meteor / 5946451595

23 Aug 2023 02:51AM UTC coverage: 83.717% (+10.0%) from 73.754%
5946451595

Pull #482

github

Chief-Rishab
feat: instrument grpc interceptor with OpenTelemetry
Pull Request #482: Feat/auth service account

1886 of 1886 new or added lines in 57 files covered. (100.0%)

6658 of 7953 relevant lines covered (83.72%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.49
/plugins/extractors/mysql/mysql.go
1
package mysql
2

3
import (
4
        "context"
5
        "database/sql"
6
        _ "embed" // used to print the embedded assets
7
        "fmt"
8

9
        _ "github.com/go-sql-driver/mysql"
10
        "github.com/raystack/meteor/models"
11
        v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2"
12
        "github.com/raystack/meteor/plugins"
13
        "github.com/raystack/meteor/plugins/sqlutil"
14
        "github.com/raystack/meteor/registry"
15
        "github.com/raystack/salt/log"
16
        semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
17
        "google.golang.org/protobuf/types/known/anypb"
18
        "google.golang.org/protobuf/types/known/structpb"
19
)
20

21
//go:embed README.md
22
var summary string
23

24
var defaultDBList = []string{
25
        "information_schema",
26
        "mysql",
27
        "performance_schema",
28
        "sys",
29
}
30

31
// Config holds the connection URL for the extractor
32
type Config struct {
33
        ConnectionURL string  `json:"connection_url" yaml:"connection_url" mapstructure:"connection_url" validate:"required"`
34
        Exclude       Exclude `json:"exclude" yaml:"exclude" mapstructure:"exclude"`
35
}
36

37
type Exclude struct {
38
        Databases []string `json:"databases" yaml:"databases" mapstructure:"databases"`
39
        Tables    []string `json:"tables" yaml:"tables" mapstructure:"tables"`
40
}
41

42
var sampleConfig = `
43
connection_url: "admin:pass123@tcp(localhost:3306)/"
44
exclude:
45
  databases:
46
        - database_a
47
        - database_b
48
  tables:
49
        - dataset_c.table_a`
50

51
var info = plugins.Info{
52
        Description:  "Table metadata from MySQL server.",
53
        SampleConfig: sampleConfig,
54
        Summary:      summary,
55
        Tags:         []string{"oss", "extractor"},
56
}
57

58
// Extractor manages the extraction of data from MySQL
59
type Extractor struct {
60
        plugins.BaseExtractor
61
        excludedDbs map[string]bool
62
        excludedTbl map[string]bool
63
        logger      log.Logger
64
        config      Config
65
        db          *sql.DB
66
        emit        plugins.Emit
67
}
68

69
// New returns a pointer to an initialized Extractor Object
70
func New(logger log.Logger) *Extractor {
1✔
71
        e := &Extractor{
1✔
72
                logger: logger,
1✔
73
        }
1✔
74
        e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config)
1✔
75

1✔
76
        return e
1✔
77
}
1✔
78

79
// Init initializes the extractor
80
func (e *Extractor) Init(ctx context.Context, config plugins.Config) (err error) {
1✔
81
        err = e.BaseExtractor.Init(ctx, config)
1✔
82
        if err != nil {
2✔
83
                return err
1✔
84
        }
1✔
85

86
        excludeDBList := append(defaultDBList, e.config.Exclude.Databases...)
1✔
87
        e.excludedDbs = sqlutil.BuildBoolMap(excludeDBList)
1✔
88
        e.excludedTbl = sqlutil.BuildBoolMap(e.config.Exclude.Tables)
1✔
89

1✔
90
        // create mysql client
1✔
91
        e.db, err = sqlutil.OpenWithOtel("mysql", e.config.ConnectionURL, semconv.DBSystemMySQL)
1✔
92
        if err != nil {
1✔
93
                return fmt.Errorf("create a client: %w", err)
×
94
        }
×
95

96
        return nil
1✔
97
}
98

99
// Extract extracts the data from the MySQL server
100
// and collected through the emitter
101
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
1✔
102
        defer e.db.Close()
1✔
103
        e.emit = emit
1✔
104

1✔
105
        dbs, err := sqlutil.FetchDBs(ctx, e.db, e.logger, "SHOW DATABASES;")
1✔
106
        if err != nil {
1✔
107
                return err
×
108
        }
×
109

110
        for _, db := range dbs {
2✔
111
                // skip excluded databases
1✔
112
                if e.isExcludedDB(db) {
2✔
113
                        continue
1✔
114
                }
115
                // extract tables
116
                err := e.extractTables(ctx, db)
1✔
117
                if err != nil {
1✔
118
                        e.logger.Error("failed to get tables, skipping database", "error", err)
×
119
                        continue
×
120
                }
121
        }
122

123
        return nil
1✔
124
}
125

126
// Extract tables from a given database
127
func (e *Extractor) extractTables(ctx context.Context, database string) error {
1✔
128
        // set database
1✔
129
        _, err := e.db.Exec(fmt.Sprintf("USE %s;", database))
1✔
130
        if err != nil {
1✔
131
                return fmt.Errorf("iterate over %s: %w", database, err)
×
132
        }
×
133

134
        // get list of tables
135
        tables, err := sqlutil.FetchTablesInDB(ctx, e.db, database, "SHOW TABLES;")
1✔
136
        for _, tableName := range tables {
2✔
137
                // skip excluded tables
1✔
138
                if e.isExcludedTable(database, tableName) {
1✔
139
                        continue
×
140
                }
141
                if err := e.processTable(ctx, database, tableName); err != nil {
1✔
142
                        return fmt.Errorf("process table: %w", err)
×
143
                }
×
144
        }
145

146
        return err
1✔
147
}
148

149
// processTable builds and push table to emitter
150
func (e *Extractor) processTable(ctx context.Context, database, tableName string) error {
1✔
151
        columns, err := e.extractColumns(ctx, tableName)
1✔
152
        if err != nil {
1✔
153
                return fmt.Errorf("extract columns: %w", err)
×
154
        }
×
155
        table, err := anypb.New(&v1beta2.Table{
1✔
156
                Columns:    columns,
1✔
157
                Attributes: &structpb.Struct{},
1✔
158
        })
1✔
159
        if err != nil {
1✔
160
                return fmt.Errorf("create Any struct: %w", err)
×
161
        }
×
162
        // push table to channel
163
        e.emit(models.NewRecord(&v1beta2.Asset{
1✔
164
                Urn:     models.NewURN("mysql", e.UrnScope, "table", fmt.Sprintf("%s.%s", database, tableName)),
1✔
165
                Name:    tableName,
1✔
166
                Type:    "table",
1✔
167
                Service: "mysql",
1✔
168
                Data:    table,
1✔
169
        }))
1✔
170

1✔
171
        return nil
1✔
172
}
173

174
// Extract columns from a given table
175
func (e *Extractor) extractColumns(ctx context.Context, tableName string) ([]*v1beta2.Column, error) {
1✔
176
        query := `SELECT COLUMN_NAME,column_comment,DATA_TYPE,
1✔
177
                                IS_NULLABLE,IFNULL(CHARACTER_MAXIMUM_LENGTH,0)
1✔
178
                                FROM information_schema.columns
1✔
179
                                WHERE table_name = ?
1✔
180
                                ORDER BY COLUMN_NAME ASC`
1✔
181
        rows, err := e.db.QueryContext(ctx, query, tableName)
1✔
182
        if err != nil {
1✔
183
                return nil, fmt.Errorf("execute query: %w", err)
×
184
        }
×
185
        defer rows.Close()
1✔
186

1✔
187
        var columns []*v1beta2.Column
1✔
188
        for rows.Next() {
2✔
189
                var fieldName, fieldDesc, dataType, isNullableString string
1✔
190
                var length int
1✔
191
                if err = rows.Scan(&fieldName, &fieldDesc, &dataType, &isNullableString, &length); err != nil {
1✔
192
                        e.logger.Error("failed to get fields", "error", err)
×
193
                        continue
×
194
                }
195

196
                columns = append(columns, &v1beta2.Column{
1✔
197
                        Name:        fieldName,
1✔
198
                        DataType:    dataType,
1✔
199
                        Description: fieldDesc,
1✔
200
                        IsNullable:  e.isNullable(isNullableString),
1✔
201
                        Length:      int64(length),
1✔
202
                })
1✔
203
        }
204
        if err := rows.Err(); err != nil {
1✔
205
                return nil, fmt.Errorf("iterate over columns: %w", err)
×
206
        }
×
207

208
        return columns, nil
1✔
209
}
210

211
// isExcludedDB checks if the given db is in the list of excluded databases
212
func (e *Extractor) isExcludedDB(database string) bool {
1✔
213
        _, ok := e.excludedDbs[database]
1✔
214
        return ok
1✔
215
}
1✔
216

217
// isExcludedTable checks if the given table is in the list of excluded tables
218
func (e *Extractor) isExcludedTable(database, tableName string) bool {
1✔
219
        tableName = fmt.Sprintf("%s.%s", database, tableName)
1✔
220
        _, ok := e.excludedTbl[tableName]
1✔
221
        return ok
1✔
222
}
1✔
223

224
// isNullable checks if the given string is null or not
225
func (e *Extractor) isNullable(value string) bool {
1✔
226
        return value == "YES"
1✔
227
}
1✔
228

229
// init register the extractor to the catalog
230
func init() {
1✔
231
        if err := registry.Extractors.Register("mysql", func() plugins.Extractor {
1✔
232
                return New(plugins.GetLog())
×
233
        }); err != nil {
×
234
                panic(err)
×
235
        }
236
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc