• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

elastic / cloudbeat / 17074625425

19 Aug 2025 03:40PM UTC coverage: 76.119% (-0.03%) from 76.151%
17074625425

Pull #3495

github

web-flow
Bump google-github-actions/setup-gcloud in /.github/workflows

Bumps [google-github-actions/setup-gcloud](https://github.com/google-github-actions/setup-gcloud) from 2.1.4 to 2.1.5.
- [Release notes](https://github.com/google-github-actions/setup-gcloud/releases)
- [Changelog](https://github.com/google-github-actions/setup-gcloud/blob/main/CHANGELOG.md)
- [Commits](https://github.com/google-github-actions/setup-gcloud/compare/77e7a554d...6a7c903a7)

---
updated-dependencies:
- dependency-name: google-github-actions/setup-gcloud
  dependency-version: 2.1.5
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #3495: Bump google-github-actions/setup-gcloud from 2.1.4 to 2.1.5 in /.github/workflows

9470 of 12441 relevant lines covered (76.12%)

16.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.74
/internal/resources/providers/gcplib/inventory/provider.go
1
// Licensed to Elasticsearch B.V. under one or more contributor
2
// license agreements. See the NOTICE file distributed with
3
// this work for additional information regarding copyright
4
// ownership. Elasticsearch B.V. licenses this file to you under
5
// the Apache License, Version 2.0 (the "License"); you may
6
// not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17

18
package inventory
19

20
import (
21
        "context"
22
        "strings"
23
        "sync"
24

25
        asset "cloud.google.com/go/asset/apiv1"
26
        "cloud.google.com/go/asset/apiv1/assetpb"
27
        "github.com/googleapis/gax-go/v2"
28
        "github.com/samber/lo"
29
        "google.golang.org/api/iterator"
30
        "google.golang.org/api/option"
31
        "google.golang.org/protobuf/types/known/structpb"
32

33
        "github.com/elastic/cloudbeat/internal/config"
34
        "github.com/elastic/cloudbeat/internal/infra/clog"
35
        "github.com/elastic/cloudbeat/internal/resources/fetching"
36
        "github.com/elastic/cloudbeat/internal/resources/providers/gcplib/auth"
37
)
38

39
type Provider struct {
40
        log       *clog.Logger
41
        config    auth.GcpFactoryConfig
42
        inventory *AssetsInventoryWrapper
43
        crm       *ResourceManagerWrapper
44
}
45

46
type AssetsInventoryWrapper struct {
47
        Close      func() error
48
        ListAssets func(ctx context.Context, req *assetpb.ListAssetsRequest, opts ...gax.CallOption) Iterator
49
}
50

51
type MonitoringAsset struct {
52
        CloudAccount *fetching.CloudAccountMetadata
53
        LogMetrics   []*ExtendedGcpAsset `json:"log_metrics,omitempty"`
54
        Alerts       []*ExtendedGcpAsset `json:"alerts,omitempty"`
55
}
56

57
type ProjectPoliciesAsset struct {
58
        CloudAccount *fetching.CloudAccountMetadata
59
        Policies     []*ExtendedGcpAsset `json:"policies,omitempty"`
60
}
61

62
type ProjectAssets struct {
63
        CloudAccount *fetching.CloudAccountMetadata
64
        Assets       []*ExtendedGcpAsset
65
}
66

67
type ExtendedGcpAsset struct {
68
        *assetpb.Asset
69
        CloudAccount *fetching.CloudAccountMetadata
70
}
71

72
type ProviderInitializer struct{}
73

74
type dnsPolicyFields struct {
75
        networks      []string
76
        enableLogging bool
77
}
78

79
type Iterator interface {
80
        Next() (*assetpb.Asset, error)
81
}
82

83
type ServiceAPI interface {
84
        ListAssetTypes(ctx context.Context, assetTypes []string, out chan<- *ExtendedGcpAsset)
85
        ListMonitoringAssets(ctx context.Context, out chan<- *MonitoringAsset)
86
        ListProjectsAncestorsPolicies(ctx context.Context, out chan<- *ProjectPoliciesAsset)
87
        ListProjectAssets(ctx context.Context, assetTypes []string, out chan<- *ProjectAssets)
88
        ListNetworkAssets(ctx context.Context, out chan<- *ExtendedGcpAsset)
89
        Clear()
90
        Close() error
91
}
92

93
type ProviderInitializerAPI interface {
94
        Init(ctx context.Context, log *clog.Logger, gcpConfig auth.GcpFactoryConfig, cfg config.GcpConfig) (ServiceAPI, error)
95
}
96

97
func newAssetsInventoryWrapper(ctx context.Context, log *clog.Logger, gcpConfig auth.GcpFactoryConfig, cfg config.GcpConfig) (*AssetsInventoryWrapper, error) {
1✔
98
        limiter := NewAssetsInventoryRateLimiter(log)
1✔
99
        client, err := asset.NewClient(ctx, append(gcpConfig.ClientOpts, option.WithGRPCDialOption(limiter.GetInterceptorDialOption()))...)
1✔
100
        if err != nil {
2✔
101
                return nil, err
1✔
102
        }
1✔
103

104
        return &AssetsInventoryWrapper{
×
105
                Close: client.Close,
×
106
                ListAssets: func(ctx context.Context, req *assetpb.ListAssetsRequest, opts ...gax.CallOption) Iterator {
×
107
                        if req.PageSize == 0 {
×
108
                                req.PageSize = cfg.GcpCallOpt.ListAssetsPageSize
×
109
                        }
×
110
                        return client.ListAssets(ctx, req, append(opts, GAXCallOptionRetrier(log), gax.WithTimeout(cfg.GcpCallOpt.ListAssetsTimeout))...)
×
111
                },
112
        }, nil
113
}
114

115
func (p *ProviderInitializer) Init(ctx context.Context, log *clog.Logger, gcpConfig auth.GcpFactoryConfig, cfg config.GcpConfig) (ServiceAPI, error) {
1✔
116
        inv, err := newAssetsInventoryWrapper(ctx, log, gcpConfig, cfg)
1✔
117
        if err != nil {
2✔
118
                return nil, err
1✔
119
        }
1✔
120

121
        crm, err := NewResourceManagerWrapper(ctx, log, gcpConfig)
×
122
        if err != nil {
×
123
                return nil, err
×
124
        }
×
125

126
        return &Provider{
×
127
                config:    gcpConfig,
×
128
                log:       log,
×
129
                inventory: inv,
×
130
                crm:       crm,
×
131
        }, nil
×
132
}
133

134
func (p *Provider) ListAssetTypes(ctx context.Context, assetTypes []string, out chan<- *ExtendedGcpAsset) {
4✔
135
        defer close(out)
4✔
136

4✔
137
        for _, t := range assetTypes {
8✔
138
                if ctx.Err() != nil {
4✔
139
                        return
×
140
                }
×
141

142
                resCh := p.getAssets(ctx, p.config.Parent, []string{t}, assetpb.ContentType_RESOURCE)
4✔
143
                polCh := p.getAssets(ctx, p.config.Parent, []string{t}, assetpb.ContentType_IAM_POLICY)
4✔
144

4✔
145
                for a := range p.mergeAssets(ctx, resCh, polCh) {
7✔
146
                        out <- a
3✔
147
                }
3✔
148
        }
149
}
150

151
func (p *Provider) ListMonitoringAssets(ctx context.Context, out chan<- *MonitoringAsset) {
1✔
152
        defer close(out)
1✔
153

1✔
154
        projectsCh := p.getParentResources(ctx, p.config.Parent, []string{CrmProjectAssetType})
1✔
155

1✔
156
        for project := range projectsCh {
3✔
157
                if ctx.Err() != nil {
2✔
158
                        return
×
159
                }
×
160
                logsAssetCh := p.getParentResources(ctx, project.Ancestors[0], []string{MonitoringLogMetricAssetType})
2✔
161
                alertsAssetCh := p.getParentResources(ctx, project.Ancestors[0], []string{MonitoringAlertPolicyAssetType})
2✔
162

2✔
163
                var logAssets, alertAssets []*ExtendedGcpAsset
2✔
164
                var wg sync.WaitGroup
2✔
165

2✔
166
                wg.Add(2)
2✔
167
                go func() {
4✔
168
                        defer wg.Done()
2✔
169
                        logAssets = collect(logsAssetCh)
2✔
170
                }()
2✔
171
                go func() {
4✔
172
                        defer wg.Done()
2✔
173
                        alertAssets = collect(alertsAssetCh)
2✔
174
                }()
2✔
175
                wg.Wait()
2✔
176

2✔
177
                p.log.Debugf("Listed %d log metrics and %d alert policies for %v\n", len(logAssets), len(alertAssets), project.Name)
2✔
178
                if len(logAssets) == 0 && len(alertAssets) == 0 {
2✔
179
                        continue
×
180
                }
181
                out <- &MonitoringAsset{
2✔
182
                        LogMetrics:   logAssets,
2✔
183
                        Alerts:       alertAssets,
2✔
184
                        CloudAccount: project.CloudAccount,
2✔
185
                }
2✔
186
        }
187
}
188

189
func (p *Provider) ListProjectsAncestorsPolicies(ctx context.Context, out chan<- *ProjectPoliciesAsset) {
1✔
190
        defer close(out)
1✔
191

1✔
192
        projectsCh := make(chan *ExtendedGcpAsset)
1✔
193
        policiesCache := &sync.Map{}
1✔
194

1✔
195
        go p.getAllAssets(ctx, projectsCh, &assetpb.ListAssetsRequest{
1✔
196
                Parent:      p.config.Parent,
1✔
197
                AssetTypes:  []string{CrmProjectAssetType},
1✔
198
                ContentType: assetpb.ContentType_IAM_POLICY,
1✔
199
        })
1✔
200

1✔
201
        for asset := range projectsCh {
2✔
202
                select {
1✔
203
                case <-ctx.Done():
×
204
                        return
×
205
                case out <- &ProjectPoliciesAsset{
206
                        CloudAccount: asset.CloudAccount,
207
                        Policies:     append([]*ExtendedGcpAsset{asset}, p.getAssetAncestorsPolicies(ctx, asset.Ancestors[1:], policiesCache)...),
208
                }:
1✔
209
                }
210
        }
211
}
212

213
func (p *Provider) ListProjectAssets(ctx context.Context, assetTypes []string, out chan<- *ProjectAssets) {
1✔
214
        defer close(out)
1✔
215

1✔
216
        projectsCh := p.getParentResources(ctx, p.config.Parent, []string{CrmProjectAssetType})
1✔
217

1✔
218
        for project := range projectsCh {
3✔
219
                if ctx.Err() != nil {
2✔
220
                        return
×
221
                }
×
222
                assets := collect(p.getParentResources(ctx, project.Ancestors[0], assetTypes))
2✔
223
                p.log.Debugf("Listed %d resources of type: %v in %v\n", len(assets), assetTypes, project.Name)
2✔
224
                if len(assets) == 0 {
2✔
225
                        continue
×
226
                }
227
                out <- &ProjectAssets{
2✔
228
                        Assets:       assets,
2✔
229
                        CloudAccount: project.CloudAccount,
2✔
230
                }
2✔
231
        }
232
}
233

234
func (p *Provider) ListNetworkAssets(ctx context.Context, out chan<- *ExtendedGcpAsset) {
1✔
235
        defer close(out)
1✔
236

1✔
237
        dnsPolicyAssetCh := p.getParentResources(ctx, p.config.Parent, []string{DnsPolicyAssetType})
1✔
238
        networkAssetCh := p.getParentResources(ctx, p.config.Parent, []string{ComputeNetworkAssetType})
1✔
239

1✔
240
        dnsPoliciesFields := decodeDnsPolicies(lo.Map(collect(dnsPolicyAssetCh), func(asset *ExtendedGcpAsset, _ int) *assetpb.Asset { return asset.Asset }))
2✔
241
        for asset := range networkAssetCh {
3✔
242
                select {
2✔
243
                case <-ctx.Done():
×
244
                        return
×
245
                case out <- enrichNetworkAsset(asset, dnsPoliciesFields):
2✔
246
                }
247
        }
248
}
249

250
func (p *Provider) Close() error {
×
251
        return p.inventory.Close()
×
252
}
×
253

254
func (p *Provider) Clear() {
×
255
        p.crm.Clear()
×
256
}
×
257

258
func (p *Provider) getAssetAncestorsPolicies(ctx context.Context, ancestors []string, cache *sync.Map) []*ExtendedGcpAsset {
1✔
259
        wg := sync.WaitGroup{}
1✔
260
        var assets []*ExtendedGcpAsset
1✔
261
        mu := sync.Mutex{}
1✔
262
        for _, ancestor := range ancestors {
2✔
263
                if value, ok := cache.Load(ancestor); ok {
1✔
264
                        if v, ok := value.([]*ExtendedGcpAsset); ok {
×
265
                                mu.Lock()
×
266
                                assets = append(assets, v...)
×
267
                                mu.Unlock()
×
268
                        }
×
269
                        continue
×
270
                }
271
                prjAncestorPolicyCh := make(chan *ExtendedGcpAsset)
1✔
272
                var assetType string
1✔
273
                if isFolder(ancestor) {
1✔
274
                        assetType = CrmFolderAssetType
×
275
                }
×
276
                if isOrganization(ancestor) {
2✔
277
                        assetType = CrmOrgAssetType
1✔
278
                }
1✔
279
                go p.getAllAssets(ctx, prjAncestorPolicyCh, &assetpb.ListAssetsRequest{
1✔
280
                        Parent:      ancestor,
1✔
281
                        AssetTypes:  []string{assetType},
1✔
282
                        ContentType: assetpb.ContentType_IAM_POLICY,
1✔
283
                })
1✔
284

1✔
285
                wg.Add(1)
1✔
286
                go func() {
2✔
287
                        defer wg.Done()
1✔
288
                        ancestorPolicies := collect(prjAncestorPolicyCh)
1✔
289
                        cache.Store(ancestor, ancestorPolicies)
1✔
290
                        mu.Lock()
1✔
291
                        assets = append(assets, ancestorPolicies...)
1✔
292
                        mu.Unlock()
1✔
293
                }()
1✔
294
        }
295
        wg.Wait()
1✔
296
        p.log.Debugf("Listed %d policies for ancestors: %v\n", len(assets), ancestors)
1✔
297
        return assets
1✔
298
}
299

300
func (p *Provider) getParentResources(ctx context.Context, parent string, assetTypes []string) <-chan *ExtendedGcpAsset {
10✔
301
        return p.getAssets(ctx, parent, assetTypes, assetpb.ContentType_RESOURCE)
10✔
302
}
10✔
303

304
func (p *Provider) getAssets(ctx context.Context, parent string, assetTypes []string, contentType assetpb.ContentType) <-chan *ExtendedGcpAsset {
18✔
305
        ch := make(chan *ExtendedGcpAsset)
18✔
306
        go p.getAllAssets(ctx, ch, &assetpb.ListAssetsRequest{
18✔
307
                Parent:      parent,
18✔
308
                AssetTypes:  assetTypes,
18✔
309
                ContentType: contentType,
18✔
310
        })
18✔
311
        return ch
18✔
312
}
18✔
313

314
func (p *Provider) mergeAssets(ctx context.Context, resCh, polCh <-chan *ExtendedGcpAsset) <-chan *ExtendedGcpAsset {
4✔
315
        out := make(chan *ExtendedGcpAsset)
4✔
316
        store := make(map[string]*ExtendedGcpAsset)
4✔
317

4✔
318
        go func() {
8✔
319
                defer close(out)
4✔
320

4✔
321
                for polCh != nil || resCh != nil {
16✔
322
                        select {
12✔
323
                        case <-ctx.Done():
×
324
                                return
×
325

326
                        case a, ok := <-polCh:
6✔
327
                                if !ok {
10✔
328
                                        polCh = nil
4✔
329
                                        flushAssets(out, store, resCh, polCh)
4✔
330
                                        continue
4✔
331
                                }
332
                                mergeAsset(out, store, a, resCh, polCh)
2✔
333

334
                        case a, ok := <-resCh:
6✔
335
                                if !ok {
10✔
336
                                        resCh = nil
4✔
337
                                        flushAssets(out, store, resCh, polCh)
4✔
338
                                        continue
4✔
339
                                }
340
                                mergeAsset(out, store, a, resCh, polCh)
2✔
341
                        }
342
                }
343

344
                flushAssets(out, store, resCh, polCh)
4✔
345
        }()
346

347
        return out
4✔
348
}
349

350
func mergeAsset(
351
        out chan<- *ExtendedGcpAsset,
352
        store map[string]*ExtendedGcpAsset,
353
        a *ExtendedGcpAsset,
354
        resCh, polCh <-chan *ExtendedGcpAsset,
355
) {
4✔
356
        asset, found := store[a.Name]
4✔
357
        if !found {
7✔
358
                asset = a
3✔
359
                store[a.Name] = asset
3✔
360
        } else {
4✔
361
                if a.Resource != nil {
2✔
362
                        asset.Resource = a.Resource
1✔
363
                }
1✔
364
                if a.IamPolicy != nil {
1✔
365
                        asset.IamPolicy = a.IamPolicy
×
366
                }
×
367
        }
368

369
        if isAssetReady(asset, resCh, polCh) {
7✔
370
                out <- asset
3✔
371
                delete(store, a.Name)
3✔
372
        }
3✔
373
}
374

375
func isAssetReady(a *ExtendedGcpAsset, resCh, polCh <-chan *ExtendedGcpAsset) bool {
4✔
376
        resChOpen := resCh != nil
4✔
377
        polChOpen := polCh != nil
4✔
378
        return (a.Resource != nil && a.IamPolicy != nil) ||
4✔
379
                (!polChOpen && a.Resource != nil) ||
4✔
380
                (!resChOpen && a.IamPolicy != nil)
4✔
381
}
4✔
382

383
func flushAssets(
384
        out chan<- *ExtendedGcpAsset,
385
        store map[string]*ExtendedGcpAsset,
386
        resCh, polCh <-chan *ExtendedGcpAsset,
387
) {
12✔
388
        for name, a := range store {
12✔
389
                if isAssetReady(a, resCh, polCh) {
×
390
                        out <- a
×
391
                        delete(store, name)
×
392
                }
×
393
        }
394
}
395

396
func (p *Provider) getAllAssets(ctx context.Context, out chan<- *ExtendedGcpAsset, req *assetpb.ListAssetsRequest) {
20✔
397
        defer close(out)
20✔
398

20✔
399
        p.log.Infof("Listing %v assets of types: %v for %v\n", req.ContentType, req.AssetTypes, req.Parent)
20✔
400
        it := p.inventory.ListAssets(ctx, &assetpb.ListAssetsRequest{
20✔
401
                Parent:      req.Parent,
20✔
402
                AssetTypes:  req.AssetTypes,
20✔
403
                ContentType: req.ContentType,
20✔
404
        })
20✔
405
        for {
59✔
406
                if ctx.Err() != nil {
39✔
407
                        return
×
408
                }
×
409
                response, err := it.Next()
39✔
410
                if err == iterator.Done {
55✔
411
                        p.log.Infof("Finished fetching GCP %v of types: %v for %v\n", req.ContentType, req.AssetTypes, req.Parent)
16✔
412
                        return
16✔
413
                }
16✔
414

415
                if err != nil {
27✔
416
                        p.log.Errorf("Error fetching GCP %v of types: %v for %v: %v\n", req.ContentType, req.AssetTypes, req.Parent, err)
4✔
417
                        return
4✔
418
                }
4✔
419

420
                p.log.Debugf("Fetched GCP %v of type %v: %v\n", req.ContentType, response.AssetType, response.Name)
19✔
421
                out <- p.newGcpExtendedAsset(ctx, response)
19✔
422
        }
423
}
424

425
func (p *Provider) newGcpExtendedAsset(ctx context.Context, asset *assetpb.Asset) *ExtendedGcpAsset {
19✔
426
        return &ExtendedGcpAsset{
19✔
427
                Asset:        asset,
19✔
428
                CloudAccount: p.crm.GetCloudMetadata(ctx, asset),
19✔
429
        }
19✔
430
}
19✔
431

432
func enrichNetworkAsset(asset *ExtendedGcpAsset, dnsPoliciesFields []*dnsPolicyFields) *ExtendedGcpAsset {
2✔
433
        networkAssetFields := asset.GetResource().GetData().GetFields()
2✔
434
        networkIdentifier := strings.TrimPrefix(asset.GetName(), "//compute.googleapis.com")
2✔
435
        dnsPolicy := findDnsPolicyByNetwork(dnsPoliciesFields, networkIdentifier)
2✔
436

2✔
437
        if dnsPolicy != nil {
3✔
438
                networkAssetFields["enabledDnsLogging"] = &structpb.Value{Kind: &structpb.Value_BoolValue{BoolValue: dnsPolicy.enableLogging}}
1✔
439
        }
1✔
440
        return asset
2✔
441
}
442

443
// findDnsPolicyByNetwork finds DNS policy by network identifier
444
func findDnsPolicyByNetwork(dnsPolicies []*dnsPolicyFields, networkIdentifier string) *dnsPolicyFields {
2✔
445
        for _, dnsPolicy := range dnsPolicies {
4✔
446
                if lo.SomeBy(dnsPolicy.networks, func(networkUrl string) bool {
4✔
447
                        return strings.HasSuffix(networkUrl, networkIdentifier)
2✔
448
                }) {
3✔
449
                        return dnsPolicy
1✔
450
                }
1✔
451
        }
452
        return nil
1✔
453
}
454

455
func decodeDnsPolicies(dnsPolicyAssets []*assetpb.Asset) []*dnsPolicyFields {
1✔
456
        dnsPolicies := make([]*dnsPolicyFields, 0)
1✔
457
        for _, dnsPolicyAsset := range dnsPolicyAssets {
2✔
458
                fields := new(dnsPolicyFields)
1✔
459
                dnsPolicyData := dnsPolicyAsset.GetResource().GetData().GetFields()
1✔
460

1✔
461
                if attachedNetworks, exist := dnsPolicyData["networks"]; exist {
2✔
462
                        networks := attachedNetworks.GetListValue().GetValues()
1✔
463
                        for _, network := range networks {
2✔
464
                                if networkUrl, found := network.GetStructValue().GetFields()["networkUrl"]; found {
2✔
465
                                        fields.networks = append(fields.networks, networkUrl.GetStringValue())
1✔
466
                                }
1✔
467
                        }
468
                }
469

470
                if enableLogging, exist := dnsPolicyData["enableLogging"]; exist {
2✔
471
                        fields.enableLogging = enableLogging.GetBoolValue()
1✔
472
                }
1✔
473

474
                dnsPolicies = append(dnsPolicies, fields)
1✔
475
        }
476

477
        return dnsPolicies
1✔
478
}
479

480
func isFolder(parent string) bool {
1✔
481
        return strings.HasPrefix(parent, "folders")
1✔
482
}
1✔
483

484
func isOrganization(parent string) bool {
20✔
485
        return strings.HasPrefix(parent, "organizations")
20✔
486
}
20✔
487

488
func collect[T any](ch <-chan T) []T {
8✔
489
        res := make([]T, 0)
8✔
490
        for item := range ch {
16✔
491
                res = append(res, item)
8✔
492
        }
8✔
493
        return res
8✔
494
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc