• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

raystack / meteor / 24614713551

18 Apr 2026 09:49PM UTC coverage: 74.086% (+0.09%) from 73.994%
24614713551

push

github

ravisuhag
feat(bigquery): add label-based table exclusion

Tables matching any configured exclude label (key-value pair) are
skipped during extraction. The check runs after table metadata is
fetched since labels are part of TableMetadata.

Closes #460

10 of 17 new or added lines in 1 file covered. (58.82%)

62 existing lines in 2 files now uncovered.

6727 of 9080 relevant lines covered (74.09%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.61
/plugins/extractors/github/github.go
1
package github
2

3
import (
4
        "context"
5
        _ "embed"
6
        "fmt"
7
        "path/filepath"
8
        "strings"
9

10
        gh "github.com/google/go-github/v68/github"
11
        "github.com/raystack/meteor/models"
12
        meteorv1beta1 "github.com/raystack/meteor/models/raystack/meteor/v1beta1"
13
        "github.com/raystack/meteor/plugins"
14
        "github.com/raystack/meteor/registry"
15
        log "github.com/raystack/salt/observability/logger"
16
        "golang.org/x/oauth2"
17
        "google.golang.org/protobuf/types/known/structpb"
18
)
19

20
//go:embed README.md
21
var summary string
22

23
type Config struct {
24
        Org     string     `json:"org" yaml:"org" mapstructure:"org" validate:"required"`
25
        Token   string     `json:"token" yaml:"token" mapstructure:"token" validate:"required"`
26
        Extract []string   `json:"extract" yaml:"extract" mapstructure:"extract"`
27
        Docs    DocsConfig `json:"docs" yaml:"docs" mapstructure:"docs"`
28
}
29

30
type DocsConfig struct {
31
        Repos   []string `json:"repos" yaml:"repos" mapstructure:"repos"`
32
        Paths   []string `json:"paths" yaml:"paths" mapstructure:"paths"`
33
        Pattern string   `json:"pattern" yaml:"pattern" mapstructure:"pattern"`
34
}
35

36
var sampleConfig = `
37
org: raystack
38
token: github_token
39
# extract specifies which entity types to extract.
40
# Defaults to all: ["users", "repositories", "teams", "documents", "collaborators"]
41
extract:
42
  - users
43
  - repositories
44
  - teams
45
  - documents
46
  - collaborators
47
# docs configures document extraction (only used when "documents" is in extract).
48
docs:
49
  # repos limits which repositories to scan. If empty, scans all org repos.
50
  repos: []
51
  # paths specifies directory paths to scan for documents. Defaults to ["docs"].
52
  paths:
53
    - docs
54
  # pattern is a glob pattern to match files. Defaults to "*.md".
55
  pattern: "*.md"`
56

57
var info = plugins.Info{
58
        Description:  "Extract metadata from a GitHub organisation including users, repositories, teams, documents, and collaborator permissions.",
59
        SampleConfig: sampleConfig,
60
        Summary:      summary,
61
        Tags:         []string{"platform", "extractor"},
62
}
63

64
type Extractor struct {
65
        plugins.BaseExtractor
66
        logger  log.Logger
67
        config  Config
68
        client  *gh.Client
69
        extract map[string]bool
70
}
71

72
func New(logger log.Logger) *Extractor {
1✔
73
        e := &Extractor{logger: logger}
1✔
74
        e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config)
1✔
75
        return e
1✔
76
}
1✔
77

78
func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
1✔
79
        if err := e.BaseExtractor.Init(ctx, config); err != nil {
2✔
80
                return err
1✔
81
        }
1✔
82

83
        ts := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: e.config.Token})
1✔
84
        tc := oauth2.NewClient(ctx, ts)
1✔
85
        e.client = gh.NewClient(tc)
1✔
86

1✔
87
        e.extract = map[string]bool{
1✔
88
                "users":         true,
1✔
89
                "repositories":  true,
1✔
90
                "teams":         true,
1✔
91
                "documents":     true,
1✔
92
                "collaborators": true,
1✔
93
        }
1✔
94
        if len(e.config.Extract) > 0 {
2✔
95
                e.extract = make(map[string]bool, len(e.config.Extract))
1✔
96
                for _, v := range e.config.Extract {
2✔
97
                        e.extract[v] = true
1✔
98
                }
1✔
99
        }
100

101
        return nil
1✔
102
}
103

104
// SetBaseURL overrides the GitHub API base URL (used for testing).
105
func (e *Extractor) SetBaseURL(url string) {
1✔
106
        e.client.BaseURL, _ = e.client.BaseURL.Parse(url + "/api/v3/")
1✔
107
}
1✔
108

109
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
1✔
110
        if e.extract["users"] {
2✔
111
                if err := e.extractUsers(ctx, emit); err != nil {
2✔
112
                        return fmt.Errorf("extract users: %w", err)
1✔
113
                }
1✔
114
        }
115
        if e.extract["repositories"] {
2✔
116
                if err := e.extractRepositories(ctx, emit); err != nil {
1✔
UNCOV
117
                        return fmt.Errorf("extract repositories: %w", err)
×
UNCOV
118
                }
×
119
        }
120
        if e.extract["teams"] {
2✔
121
                if err := e.extractTeams(ctx, emit); err != nil {
1✔
UNCOV
122
                        return fmt.Errorf("extract teams: %w", err)
×
UNCOV
123
                }
×
124
        }
125
        if e.extract["documents"] {
2✔
126
                if err := e.extractDocuments(ctx, emit); err != nil {
1✔
UNCOV
127
                        return fmt.Errorf("extract documents: %w", err)
×
UNCOV
128
                }
×
129
        }
130
        if e.extract["collaborators"] {
2✔
131
                if err := e.extractCollaborators(ctx, emit); err != nil {
1✔
UNCOV
132
                        return fmt.Errorf("extract collaborators: %w", err)
×
UNCOV
133
                }
×
134
        }
135
        return nil
1✔
136
}
137

138
func (e *Extractor) extractUsers(ctx context.Context, emit plugins.Emit) error {
1✔
139
        opts := &gh.ListMembersOptions{
1✔
140
                ListOptions: gh.ListOptions{PerPage: 100},
1✔
141
        }
1✔
142
        for {
2✔
143
                members, resp, err := e.client.Organizations.ListMembers(ctx, e.config.Org, opts)
1✔
144
                if err != nil {
2✔
145
                        return fmt.Errorf("list members: %w", err)
1✔
146
                }
1✔
147

148
                for _, member := range members {
2✔
149
                        usr, _, err := e.client.Users.Get(ctx, member.GetLogin())
1✔
150
                        if err != nil {
2✔
151
                                e.logger.Warn("failed to fetch user, skipping", "login", member.GetLogin(), "error", err)
1✔
152
                                continue
1✔
153
                        }
154
                        emit(e.buildUserRecord(usr))
1✔
155
                }
156

157
                if resp.NextPage == 0 {
2✔
158
                        break
1✔
159
                }
160
                opts.Page = resp.NextPage
1✔
161
        }
162
        return nil
1✔
163
}
164

165
func (e *Extractor) buildUserRecord(usr *gh.User) models.Record {
1✔
166
        urn := models.NewURN("github", e.UrnScope, "user", usr.GetNodeID())
1✔
167
        props := map[string]any{
1✔
168
                "email":      usr.GetEmail(),
1✔
169
                "username":   usr.GetLogin(),
1✔
170
                "full_name":  usr.GetName(),
1✔
171
                "company":    usr.GetCompany(),
1✔
172
                "location":   usr.GetLocation(),
1✔
173
                "bio":        usr.GetBio(),
1✔
174
                "avatar_url": usr.GetAvatarURL(),
1✔
175
                "html_url":   usr.GetHTMLURL(),
1✔
176
                "status":     "active",
1✔
177
        }
1✔
178

1✔
179
        entity := models.NewEntity(urn, "user", usr.GetName(), "github", props)
1✔
180
        var edges []*meteorv1beta1.Edge
1✔
181
        edges = append(edges, &meteorv1beta1.Edge{
1✔
182
                SourceUrn: urn,
1✔
183
                TargetUrn: models.NewURN("github", e.UrnScope, "org", e.config.Org),
1✔
184
                Type:      "member_of",
1✔
185
                Source:    "github",
1✔
186
        })
1✔
187
        return models.NewRecord(entity, edges...)
1✔
188
}
1✔
189

190
func (e *Extractor) extractRepositories(ctx context.Context, emit plugins.Emit) error {
1✔
191
        opts := &gh.RepositoryListByOrgOptions{
1✔
192
                ListOptions: gh.ListOptions{PerPage: 100},
1✔
193
        }
1✔
194
        for {
2✔
195
                repos, resp, err := e.client.Repositories.ListByOrg(ctx, e.config.Org, opts)
1✔
196
                if err != nil {
1✔
UNCOV
197
                        return fmt.Errorf("list repositories: %w", err)
×
UNCOV
198
                }
×
199

200
                for _, repo := range repos {
2✔
201
                        emit(e.buildRepoRecord(repo))
1✔
202
                }
1✔
203

204
                if resp.NextPage == 0 {
2✔
205
                        break
1✔
206
                }
UNCOV
207
                opts.Page = resp.NextPage
×
208
        }
209
        return nil
1✔
210
}
211

212
func (e *Extractor) buildRepoRecord(repo *gh.Repository) models.Record {
1✔
213
        urn := models.NewURN("github", e.UrnScope, "repository", repo.GetNodeID())
1✔
214
        props := map[string]any{
1✔
215
                "full_name":     repo.GetFullName(),
1✔
216
                "description":   repo.GetDescription(),
1✔
217
                "html_url":      repo.GetHTMLURL(),
1✔
218
                "language":      repo.GetLanguage(),
1✔
219
                "visibility":    repo.GetVisibility(),
1✔
220
                "default_branch": repo.GetDefaultBranch(),
1✔
221
                "archived":      repo.GetArchived(),
1✔
222
                "fork":          repo.GetFork(),
1✔
223
                "stargazers":    repo.GetStargazersCount(),
1✔
224
                "forks":         repo.GetForksCount(),
1✔
225
                "open_issues":   repo.GetOpenIssuesCount(),
1✔
226
        }
1✔
227
        if len(repo.Topics) > 0 {
2✔
228
                props["topics"] = repo.Topics
1✔
229
        }
1✔
230

231
        entity := models.NewEntity(urn, "repository", repo.GetName(), "github", props)
1✔
232

1✔
233
        var edges []*meteorv1beta1.Edge
1✔
234
        if owner := repo.GetOwner(); owner != nil {
2✔
235
                edges = append(edges, models.OwnerEdge(
1✔
236
                        urn,
1✔
237
                        models.NewURN("github", e.UrnScope, "user", owner.GetNodeID()),
1✔
238
                        "github",
1✔
239
                ))
1✔
240
        }
1✔
241

242
        return models.NewRecord(entity, edges...)
1✔
243
}
244

245
func (e *Extractor) extractTeams(ctx context.Context, emit plugins.Emit) error {
1✔
246
        opts := &gh.ListOptions{PerPage: 100}
1✔
247
        for {
2✔
248
                teams, resp, err := e.client.Teams.ListTeams(ctx, e.config.Org, opts)
1✔
249
                if err != nil {
1✔
UNCOV
250
                        return fmt.Errorf("list teams: %w", err)
×
UNCOV
251
                }
×
252

253
                for _, team := range teams {
2✔
254
                        record, err := e.buildTeamRecord(ctx, team)
1✔
255
                        if err != nil {
1✔
UNCOV
256
                                e.logger.Warn("failed to build team record, skipping", "team", team.GetSlug(), "error", err)
×
257
                                continue
×
258
                        }
259
                        emit(record)
1✔
260
                }
261

262
                if resp.NextPage == 0 {
2✔
263
                        break
1✔
264
                }
UNCOV
265
                opts.Page = resp.NextPage
×
266
        }
267
        return nil
1✔
268
}
269

270
func (e *Extractor) buildTeamRecord(ctx context.Context, team *gh.Team) (models.Record, error) {
1✔
271
        urn := models.NewURN("github", e.UrnScope, "team", team.GetNodeID())
1✔
272
        props := map[string]any{
1✔
273
                "slug":        team.GetSlug(),
1✔
274
                "description": team.GetDescription(),
1✔
275
                "privacy":     team.GetPrivacy(),
1✔
276
                "permission":  team.GetPermission(),
1✔
277
                "html_url":    fmt.Sprintf("https://github.com/orgs/%s/teams/%s", e.config.Org, team.GetSlug()),
1✔
278
        }
1✔
279

1✔
280
        entity := models.NewEntity(urn, "team", team.GetName(), "github", props)
1✔
281

1✔
282
        var edges []*meteorv1beta1.Edge
1✔
283

1✔
284
        // Fetch team members and create member_of edges.
1✔
285
        memberOpts := &gh.TeamListTeamMembersOptions{
1✔
286
                ListOptions: gh.ListOptions{PerPage: 100},
1✔
287
        }
1✔
288
        for {
2✔
289
                members, resp, err := e.client.Teams.ListTeamMembersBySlug(ctx, e.config.Org, team.GetSlug(), memberOpts)
1✔
290
                if err != nil {
1✔
UNCOV
291
                        return models.Record{}, fmt.Errorf("list team members for %s: %w", team.GetSlug(), err)
×
UNCOV
292
                }
×
293

294
                for _, member := range members {
2✔
295
                        edges = append(edges, &meteorv1beta1.Edge{
1✔
296
                                SourceUrn: models.NewURN("github", e.UrnScope, "user", member.GetNodeID()),
1✔
297
                                TargetUrn: urn,
1✔
298
                                Type:      "member_of",
1✔
299
                                Source:    "github",
1✔
300
                        })
1✔
301
                }
1✔
302

303
                if resp.NextPage == 0 {
2✔
304
                        break
1✔
305
                }
UNCOV
306
                memberOpts.Page = resp.NextPage
×
307
        }
308

309
        return models.NewRecord(entity, edges...), nil
1✔
310
}
311

312
func (e *Extractor) extractDocuments(ctx context.Context, emit plugins.Emit) error {
1✔
313
        paths := e.config.Docs.Paths
1✔
314
        if len(paths) == 0 {
2✔
315
                paths = []string{"docs"}
1✔
316
        }
1✔
317
        pattern := e.config.Docs.Pattern
1✔
318
        if pattern == "" {
2✔
319
                pattern = "*.md"
1✔
320
        }
1✔
321

322
        repos, err := e.listDocRepos(ctx)
1✔
323
        if err != nil {
1✔
UNCOV
324
                return err
×
UNCOV
325
        }
×
326

327
        for _, repo := range repos {
2✔
328
                repoURN := models.NewURN("github", e.UrnScope, "repository", repo.GetNodeID())
1✔
329
                for _, dir := range paths {
2✔
330
                        if err := e.extractDocsFromPath(ctx, emit, repo, repoURN, dir, pattern); err != nil {
2✔
331
                                e.logger.Warn("failed to extract docs from path, skipping",
1✔
332
                                        "repo", repo.GetFullName(), "path", dir, "error", err)
1✔
333
                        }
1✔
334
                }
335
        }
336
        return nil
1✔
337
}
338

339
func (e *Extractor) listDocRepos(ctx context.Context) ([]*gh.Repository, error) {
1✔
340
        if len(e.config.Docs.Repos) > 0 {
2✔
341
                var repos []*gh.Repository
1✔
342
                for _, name := range e.config.Docs.Repos {
2✔
343
                        repo, _, err := e.client.Repositories.Get(ctx, e.config.Org, name)
1✔
344
                        if err != nil {
1✔
UNCOV
345
                                e.logger.Warn("failed to get repo for docs, skipping", "repo", name, "error", err)
×
UNCOV
346
                                continue
×
347
                        }
348
                        repos = append(repos, repo)
1✔
349
                }
350
                return repos, nil
1✔
351
        }
352

353
        // Fall back to all org repos.
354
        var all []*gh.Repository
1✔
355
        opts := &gh.RepositoryListByOrgOptions{
1✔
356
                ListOptions: gh.ListOptions{PerPage: 100},
1✔
357
        }
1✔
358
        for {
2✔
359
                repos, resp, err := e.client.Repositories.ListByOrg(ctx, e.config.Org, opts)
1✔
360
                if err != nil {
1✔
UNCOV
361
                        return nil, fmt.Errorf("list repositories for docs: %w", err)
×
UNCOV
362
                }
×
363
                all = append(all, repos...)
1✔
364
                if resp.NextPage == 0 {
2✔
365
                        break
1✔
366
                }
UNCOV
367
                opts.Page = resp.NextPage
×
368
        }
369
        return all, nil
1✔
370
}
371

372
func (e *Extractor) extractDocsFromPath(ctx context.Context, emit plugins.Emit, repo *gh.Repository, repoURN, dir, pattern string) error {
1✔
373
        _, dirContents, _, err := e.client.Repositories.GetContents(ctx, e.config.Org, repo.GetName(), dir, nil)
1✔
374
        if err != nil {
2✔
375
                return fmt.Errorf("get contents of %s: %w", dir, err)
1✔
376
        }
1✔
377

378
        for _, entry := range dirContents {
2✔
379
                switch entry.GetType() {
1✔
380
                case "file":
1✔
381
                        matched, _ := filepath.Match(pattern, entry.GetName())
1✔
382
                        if !matched {
2✔
383
                                continue
1✔
384
                        }
385
                        if err := e.emitDocument(ctx, emit, repo, repoURN, entry); err != nil {
1✔
UNCOV
386
                                e.logger.Warn("failed to emit document, skipping",
×
UNCOV
387
                                        "repo", repo.GetFullName(), "path", entry.GetPath(), "error", err)
×
UNCOV
388
                        }
×
389
                case "dir":
1✔
390
                        if err := e.extractDocsFromPath(ctx, emit, repo, repoURN, entry.GetPath(), pattern); err != nil {
1✔
UNCOV
391
                                e.logger.Warn("failed to recurse into directory, skipping",
×
UNCOV
392
                                        "repo", repo.GetFullName(), "path", entry.GetPath(), "error", err)
×
UNCOV
393
                        }
×
394
                }
395
        }
396
        return nil
1✔
397
}
398

399
func (e *Extractor) emitDocument(ctx context.Context, emit plugins.Emit, repo *gh.Repository, repoURN string, entry *gh.RepositoryContent) error {
1✔
400
        // Fetch full file content (the directory listing doesn't include content).
1✔
401
        file, _, _, err := e.client.Repositories.GetContents(ctx, e.config.Org, repo.GetName(), entry.GetPath(), nil)
1✔
402
        if err != nil {
1✔
UNCOV
403
                return fmt.Errorf("get file %s: %w", entry.GetPath(), err)
×
UNCOV
404
        }
×
405

406
        content, err := file.GetContent()
1✔
407
        if err != nil {
1✔
UNCOV
408
                return fmt.Errorf("decode content of %s: %w", entry.GetPath(), err)
×
UNCOV
409
        }
×
410

411
        name := strings.TrimSuffix(entry.GetName(), filepath.Ext(entry.GetName()))
1✔
412
        urn := models.NewURN("github", e.UrnScope, "document", file.GetSHA())
1✔
413

1✔
414
        props := map[string]any{
1✔
415
                "path":      file.GetPath(),
1✔
416
                "file_name": file.GetName(),
1✔
417
                "content":   content,
1✔
418
                "html_url":  file.GetHTMLURL(),
1✔
419
                "repo":      repo.GetFullName(),
1✔
420
                "size":      file.GetSize(),
1✔
421
                "sha":       file.GetSHA(),
1✔
422
        }
1✔
423

1✔
424
        entity := models.NewEntity(urn, "document", name, "github", props)
1✔
425

1✔
426
        edges := []*meteorv1beta1.Edge{
1✔
427
                {
1✔
428
                        SourceUrn: urn,
1✔
429
                        TargetUrn: repoURN,
1✔
430
                        Type:      "belongs_to",
1✔
431
                        Source:    "github",
1✔
432
                },
1✔
433
        }
1✔
434

1✔
435
        emit(models.NewRecord(entity, edges...))
1✔
436
        return nil
1✔
437
}
438

439
func (e *Extractor) extractCollaborators(ctx context.Context, emit plugins.Emit) error {
1✔
440
        repoOpts := &gh.RepositoryListByOrgOptions{
1✔
441
                ListOptions: gh.ListOptions{PerPage: 100},
1✔
442
        }
1✔
443
        for {
2✔
444
                repos, resp, err := e.client.Repositories.ListByOrg(ctx, e.config.Org, repoOpts)
1✔
445
                if err != nil {
1✔
UNCOV
446
                        return fmt.Errorf("list repositories: %w", err)
×
UNCOV
447
                }
×
448

449
                for _, repo := range repos {
2✔
450
                        if err := e.extractRepoCollaborators(ctx, emit, repo); err != nil {
2✔
451
                                e.logger.Warn("failed to extract collaborators, skipping",
1✔
452
                                        "repo", repo.GetFullName(), "error", err)
1✔
453
                        }
1✔
454
                }
455

456
                if resp.NextPage == 0 {
2✔
457
                        break
1✔
458
                }
UNCOV
459
                repoOpts.Page = resp.NextPage
×
460
        }
461
        return nil
1✔
462
}
463

464
func (e *Extractor) extractRepoCollaborators(ctx context.Context, emit plugins.Emit, repo *gh.Repository) error {
1✔
465
        repoURN := models.NewURN("github", e.UrnScope, "repository", repo.GetNodeID())
1✔
466
        opts := &gh.ListCollaboratorsOptions{
1✔
467
                ListOptions: gh.ListOptions{PerPage: 100},
1✔
468
        }
1✔
469

1✔
470
        var edges []*meteorv1beta1.Edge
1✔
471
        for {
2✔
472
                collaborators, resp, err := e.client.Repositories.ListCollaborators(ctx, e.config.Org, repo.GetName(), opts)
1✔
473
                if err != nil {
2✔
474
                        return fmt.Errorf("list collaborators for %s: %w", repo.GetName(), err)
1✔
475
                }
1✔
476

477
                for _, collab := range collaborators {
2✔
478
                        userURN := models.NewURN("github", e.UrnScope, "user", collab.GetNodeID())
1✔
479
                        props, _ := structpb.NewStruct(map[string]any{
1✔
480
                                "permission": resolvePermission(collab.GetPermissions()),
1✔
481
                        })
1✔
482
                        edges = append(edges, &meteorv1beta1.Edge{
1✔
483
                                SourceUrn:  userURN,
1✔
484
                                TargetUrn:  repoURN,
1✔
485
                                Type:       "has_access_to",
1✔
486
                                Source:     "github",
1✔
487
                                Properties: props,
1✔
488
                        })
1✔
489
                }
1✔
490

491
                if resp.NextPage == 0 {
2✔
492
                        break
1✔
493
                }
UNCOV
494
                opts.Page = resp.NextPage
×
495
        }
496

497
        if len(edges) > 0 {
2✔
498
                entity := models.NewEntity(repoURN, "repository", repo.GetName(), "github", nil)
1✔
499
                emit(models.NewRecord(entity, edges...))
1✔
500
        }
1✔
501
        return nil
1✔
502
}
503

504
// resolvePermission returns the highest permission level from the permissions map.
505
func resolvePermission(perms map[string]bool) string {
1✔
506
        for _, level := range []string{"admin", "maintain", "push", "triage", "pull"} {
2✔
507
                if perms[level] {
2✔
508
                        return level
1✔
509
                }
1✔
510
        }
UNCOV
511
        return "pull"
×
512
}
513

514
func init() {
1✔
515
        if err := registry.Extractors.Register("github", func() plugins.Extractor {
1✔
UNCOV
516
                return New(plugins.GetLog())
×
UNCOV
517
        }); err != nil {
×
UNCOV
518
                panic(err)
×
519
        }
520
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc