• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

goto / meteor / 12372470055

17 Dec 2024 11:49AM UTC coverage: 81.903% (-0.02%) from 81.926%
12372470055

Pull #77

github

haveiss
fix(run): return correct exit code on recipe failure
Pull Request #77: fix(run): return correct exit code on recipe failure

7146 of 8725 relevant lines covered (81.9%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.26
/plugins/extractors/couchdb/couchdb.go
1
package couchdb
2

3
import (
4
        "context"
5
        _ "embed" // used to print the embedded assets
6
        "fmt"
7
        "reflect"
8

9
        _ "github.com/go-kivik/couchdb"
10
        "github.com/go-kivik/kivik"
11
        "github.com/goto/meteor/models"
12
        v1beta2 "github.com/goto/meteor/models/gotocompany/assets/v1beta2"
13
        "github.com/goto/meteor/plugins"
14
        "github.com/goto/meteor/registry"
15
        "github.com/goto/salt/log"
16
        "google.golang.org/protobuf/types/known/anypb"
17
        "google.golang.org/protobuf/types/known/structpb"
18
)
19

20
//go:embed README.md
21
var summary string
22

23
var defaultDBList = []string{
24
        "_global_changes",
25
        "_replicator",
26
        "_users",
27
}
28

29
// Config holds the connection URL for the extractor
30
type Config struct {
31
        ConnectionURL string `mapstructure:"connection_url" validate:"required"`
32
}
33

34
var sampleConfig = `connection_url: http://admin:pass123@localhost:3306/`
35

36
var info = plugins.Info{
37
        Description:  "Table metadata from CouchDB server,",
38
        SampleConfig: sampleConfig,
39
        Summary:      summary,
40
        Tags:         []string{"oss", "extractor"},
41
}
42

43
// Extractor manages the extraction of data from CouchDB
44
type Extractor struct {
45
        plugins.BaseExtractor
46
        client      *kivik.Client
47
        db          *kivik.DB
48
        excludedDbs map[string]bool
49
        logger      log.Logger
50
        config      Config
51
        emit        plugins.Emit
52
}
53

54
// New returns a pointer to an initialized Extractor Object
55
func New(logger log.Logger) *Extractor {
1✔
56
        e := &Extractor{
1✔
57
                logger: logger,
1✔
58
        }
1✔
59
        e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config)
1✔
60

1✔
61
        return e
1✔
62
}
1✔
63

64
// Initialise the Extractor with Configurations
65
func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
1✔
66
        if err := e.BaseExtractor.Init(ctx, config); err != nil {
2✔
67
                return err
1✔
68
        }
1✔
69

70
        // build excluded database list
71
        e.buildExcludedDBs()
1✔
72

1✔
73
        // create client
1✔
74
        var err error
1✔
75
        e.client, err = kivik.New("couch", e.config.ConnectionURL)
1✔
76
        if err != nil {
1✔
77
                return err
×
78
        }
×
79

80
        return nil
1✔
81
}
82

83
// Extract extracts the data from the CouchDB server
84
// and collected through the out channel
85
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
1✔
86
        defer e.client.Close(ctx)
1✔
87
        e.emit = emit
1✔
88

1✔
89
        res, err := e.client.AllDBs(ctx)
1✔
90
        if err != nil {
1✔
91
                return err
×
92
        }
×
93

94
        for _, dbName := range res {
2✔
95
                if err := e.extractTables(ctx, dbName); err != nil {
1✔
96
                        return err
×
97
                }
×
98
        }
99
        return nil
1✔
100
}
101

102
// Extract tables from a given database
103
func (e *Extractor) extractTables(ctx context.Context, dbName string) error {
1✔
104
        // skip if database is default
1✔
105
        if e.isExcludedDB(dbName) {
1✔
106
                return nil
×
107
        }
×
108
        e.db = e.client.DB(ctx, dbName)
1✔
109

1✔
110
        // extract documents
1✔
111
        rows, err := e.db.AllDocs(ctx)
1✔
112
        if err != nil {
1✔
113
                return err
×
114
        }
×
115
        defer rows.Close()
1✔
116

1✔
117
        // process each rows
1✔
118
        for rows.Next() {
2✔
119
                docID := rows.ID()
1✔
120
                if err := e.processTable(ctx, dbName, docID); err != nil {
1✔
121
                        return err
×
122
                }
×
123
        }
124
        if err := rows.Err(); err != nil {
1✔
125
                return fmt.Errorf("iterate over tables: %w", err)
×
126
        }
×
127

128
        return nil
1✔
129
}
130

131
// Build and push document to output channel
132
func (e *Extractor) processTable(ctx context.Context, dbName, docID string) error {
1✔
133
        columns, err := e.extractColumns(ctx, docID)
1✔
134
        if err != nil {
1✔
135
                return err
×
136
        }
×
137
        table, err := anypb.New(&v1beta2.Table{
1✔
138
                Columns:    columns,
1✔
139
                Attributes: &structpb.Struct{}, // ensure attributes don't get overwritten if present
1✔
140
        })
1✔
141
        if err != nil {
1✔
142
                return fmt.Errorf("create Any struct: %w", err)
×
143
        }
×
144
        // push table to channel
145
        e.emit(models.NewRecord(&v1beta2.Asset{
1✔
146
                Urn:     models.NewURN("couchdb", e.UrnScope, "table", fmt.Sprintf("%s.%s", dbName, docID)),
1✔
147
                Name:    docID,
1✔
148
                Type:    "table",
1✔
149
                Service: "couchdb",
1✔
150
                Data:    table,
1✔
151
        }))
1✔
152

1✔
153
        return nil
1✔
154
}
155

156
// Extract columns from a given table
157
func (e *Extractor) extractColumns(ctx context.Context, docID string) ([]*v1beta2.Column, error) {
1✔
158
        size, rev, err := e.db.GetMeta(ctx, docID)
1✔
159
        if err != nil {
1✔
160
                return nil, err
×
161
        }
×
162
        row := e.db.Get(ctx, docID)
1✔
163
        var fields map[string]interface{}
1✔
164
        if err := row.ScanDoc(&fields); err != nil {
1✔
165
                return nil, err
×
166
        }
×
167

168
        var columns []*v1beta2.Column
1✔
169
        for k := range fields {
2✔
170
                if k == "_id" || k == "_rev" {
2✔
171
                        continue
1✔
172
                }
173

174
                columns = append(columns, &v1beta2.Column{
1✔
175
                        Name:        k,
1✔
176
                        DataType:    reflect.ValueOf(fields[k]).Kind().String(),
1✔
177
                        Description: rev,
1✔
178
                        Length:      size,
1✔
179
                })
1✔
180
        }
181
        return columns, nil
1✔
182
}
183

184
func (e *Extractor) buildExcludedDBs() {
1✔
185
        excludedMap := make(map[string]bool)
1✔
186
        for _, db := range defaultDBList {
2✔
187
                excludedMap[db] = true
1✔
188
        }
1✔
189

190
        e.excludedDbs = excludedMap
1✔
191
}
192

193
func (e *Extractor) isExcludedDB(database string) bool {
1✔
194
        _, ok := e.excludedDbs[database]
1✔
195
        return ok
1✔
196
}
1✔
197

198
// Register the extractor to catalog
199
func init() {
1✔
200
        if err := registry.Extractors.Register("couchdb", func() plugins.Extractor {
1✔
201
                return New(plugins.GetLog())
×
202
        }); err != nil {
×
203
                panic(err)
×
204
        }
205
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc