• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

elastic / cloudbeat / 15581407102

11 Jun 2025 09:37AM UTC coverage: 76.015%. Remained the same
15581407102

Pull #3359

github

cloudsecmachine
chore: Update version.asciidoc with Golang version 1.24.4

Made with ❤️️ by updatecli
Pull Request #3359: [updatecli] 8.19 - Update Golang version to 1.24.4

9178 of 12074 relevant lines covered (76.01%)

16.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.44
/internal/resources/providers/gcplib/inventory/provider.go
1
// Licensed to Elasticsearch B.V. under one or more contributor
2
// license agreements. See the NOTICE file distributed with
3
// this work for additional information regarding copyright
4
// ownership. Elasticsearch B.V. licenses this file to you under
5
// the Apache License, Version 2.0 (the "License"); you may
6
// not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17

18
package inventory
19

20
import (
21
        "context"
22
        "strings"
23
        "sync"
24

25
        asset "cloud.google.com/go/asset/apiv1"
26
        "cloud.google.com/go/asset/apiv1/assetpb"
27
        "github.com/googleapis/gax-go/v2"
28
        "github.com/samber/lo"
29
        "google.golang.org/api/iterator"
30
        "google.golang.org/api/option"
31
        "google.golang.org/protobuf/types/known/structpb"
32

33
        "github.com/elastic/cloudbeat/internal/infra/clog"
34
        "github.com/elastic/cloudbeat/internal/resources/fetching"
35
        "github.com/elastic/cloudbeat/internal/resources/providers/gcplib/auth"
36
)
37

38
type Provider struct {
39
        log       *clog.Logger
40
        config    auth.GcpFactoryConfig
41
        inventory *AssetsInventoryWrapper
42
        crm       *ResourceManagerWrapper
43
}
44

45
type AssetsInventoryWrapper struct {
46
        Close      func() error
47
        ListAssets func(ctx context.Context, req *assetpb.ListAssetsRequest, opts ...gax.CallOption) Iterator
48
}
49

50
type MonitoringAsset struct {
51
        CloudAccount *fetching.CloudAccountMetadata
52
        LogMetrics   []*ExtendedGcpAsset `json:"log_metrics,omitempty"`
53
        Alerts       []*ExtendedGcpAsset `json:"alerts,omitempty"`
54
}
55

56
type ProjectPoliciesAsset struct {
57
        CloudAccount *fetching.CloudAccountMetadata
58
        Policies     []*ExtendedGcpAsset `json:"policies,omitempty"`
59
}
60

61
type ProjectAssets struct {
62
        CloudAccount *fetching.CloudAccountMetadata
63
        Assets       []*ExtendedGcpAsset
64
}
65

66
type ExtendedGcpAsset struct {
67
        *assetpb.Asset
68
        CloudAccount *fetching.CloudAccountMetadata
69
}
70

71
type ProviderInitializer struct{}
72

73
type dnsPolicyFields struct {
74
        networks      []string
75
        enableLogging bool
76
}
77

78
type Iterator interface {
79
        Next() (*assetpb.Asset, error)
80
}
81

82
type ServiceAPI interface {
83
        ListAssetTypes(ctx context.Context, assetTypes []string, out chan<- *ExtendedGcpAsset)
84
        ListMonitoringAssets(ctx context.Context, out chan<- *MonitoringAsset)
85
        ListProjectsAncestorsPolicies(ctx context.Context, out chan<- *ProjectPoliciesAsset)
86
        ListProjectAssets(ctx context.Context, assetTypes []string, out chan<- *ProjectAssets)
87
        ListNetworkAssets(ctx context.Context, out chan<- *ExtendedGcpAsset)
88
        Clear()
89
        Close() error
90
}
91

92
type ProviderInitializerAPI interface {
93
        Init(ctx context.Context, log *clog.Logger, gcpConfig auth.GcpFactoryConfig) (ServiceAPI, error)
94
}
95

96
func newAssetsInventoryWrapper(ctx context.Context, log *clog.Logger, gcpConfig auth.GcpFactoryConfig) (*AssetsInventoryWrapper, error) {
1✔
97
        limiter := NewAssetsInventoryRateLimiter(log)
1✔
98
        client, err := asset.NewClient(ctx, append(gcpConfig.ClientOpts, option.WithGRPCDialOption(limiter.GetInterceptorDialOption()))...)
1✔
99
        if err != nil {
2✔
100
                return nil, err
1✔
101
        }
1✔
102

103
        return &AssetsInventoryWrapper{
×
104
                Close: client.Close,
×
105
                ListAssets: func(ctx context.Context, req *assetpb.ListAssetsRequest, opts ...gax.CallOption) Iterator {
×
106
                        return client.ListAssets(ctx, req, append(opts, RetryOnResourceExhausted)...)
×
107
                },
×
108
        }, nil
109
}
110

111
func (p *ProviderInitializer) Init(ctx context.Context, log *clog.Logger, gcpConfig auth.GcpFactoryConfig) (ServiceAPI, error) {
1✔
112
        assetsInventory, err := newAssetsInventoryWrapper(ctx, log, gcpConfig)
1✔
113
        if err != nil {
2✔
114
                return nil, err
1✔
115
        }
1✔
116

117
        cloudResourceManager, err := NewResourceManagerWrapper(ctx, log, gcpConfig)
×
118
        if err != nil {
×
119
                return nil, err
×
120
        }
×
121

122
        return &Provider{
×
123
                config:    gcpConfig,
×
124
                log:       log,
×
125
                inventory: assetsInventory,
×
126
                crm:       cloudResourceManager,
×
127
        }, nil
×
128
}
129

130
func (p *Provider) ListAssetTypes(ctx context.Context, assetTypes []string, out chan<- *ExtendedGcpAsset) {
4✔
131
        defer close(out)
4✔
132

4✔
133
        resourceAssetsCh := p.fetchAssets(ctx, assetpb.ContentType_RESOURCE, assetTypes)
4✔
134
        policiesAssetsCh := p.fetchAssets(ctx, assetpb.ContentType_IAM_POLICY, assetTypes)
4✔
135
        assetsCh := p.mergeAssets(ctx, resourceAssetsCh, policiesAssetsCh)
4✔
136

4✔
137
        for asset := range assetsCh {
7✔
138
                select {
3✔
139
                case <-ctx.Done():
×
140
                        return
×
141
                case out <- asset:
3✔
142
                }
143
        }
144
}
145

146
func (p *Provider) ListMonitoringAssets(ctx context.Context, out chan<- *MonitoringAsset) {
1✔
147
        defer close(out)
1✔
148

1✔
149
        projectsCh := p.getParentResources(ctx, p.config.Parent, []string{CrmProjectAssetType})
1✔
150

1✔
151
        for project := range projectsCh {
3✔
152
                if ctx.Err() != nil {
2✔
153
                        return
×
154
                }
×
155
                logsAssetCh := p.getParentResources(ctx, project.Ancestors[0], []string{MonitoringLogMetricAssetType})
2✔
156
                alertsAssetCh := p.getParentResources(ctx, project.Ancestors[0], []string{MonitoringAlertPolicyAssetType})
2✔
157

2✔
158
                var logAssets, alertAssets []*ExtendedGcpAsset
2✔
159
                var wg sync.WaitGroup
2✔
160

2✔
161
                wg.Add(2)
2✔
162
                go func() {
4✔
163
                        defer wg.Done()
2✔
164
                        logAssets = collect(logsAssetCh)
2✔
165
                }()
2✔
166
                go func() {
4✔
167
                        defer wg.Done()
2✔
168
                        alertAssets = collect(alertsAssetCh)
2✔
169
                }()
2✔
170
                wg.Wait()
2✔
171

2✔
172
                p.log.Debugf("Listed %d log metrics and %d alert policies for %v\n", len(logAssets), len(alertAssets), project.Name)
2✔
173
                if len(logAssets) == 0 && len(alertAssets) == 0 {
2✔
174
                        continue
×
175
                }
176
                out <- &MonitoringAsset{
2✔
177
                        LogMetrics:   logAssets,
2✔
178
                        Alerts:       alertAssets,
2✔
179
                        CloudAccount: project.CloudAccount,
2✔
180
                }
2✔
181
        }
182
}
183

184
func (p *Provider) ListProjectsAncestorsPolicies(ctx context.Context, out chan<- *ProjectPoliciesAsset) {
1✔
185
        defer close(out)
1✔
186

1✔
187
        projectsCh := make(chan *ExtendedGcpAsset)
1✔
188
        policiesCache := &sync.Map{}
1✔
189

1✔
190
        go p.getAllAssets(ctx, projectsCh, &assetpb.ListAssetsRequest{
1✔
191
                Parent:      p.config.Parent,
1✔
192
                AssetTypes:  []string{CrmProjectAssetType},
1✔
193
                ContentType: assetpb.ContentType_IAM_POLICY,
1✔
194
        })
1✔
195

1✔
196
        for asset := range projectsCh {
2✔
197
                select {
1✔
198
                case <-ctx.Done():
×
199
                        return
×
200
                case out <- &ProjectPoliciesAsset{
201
                        CloudAccount: asset.CloudAccount,
202
                        Policies:     append([]*ExtendedGcpAsset{asset}, p.getAssetAncestorsPolicies(ctx, asset.Ancestors[1:], policiesCache)...),
203
                }:
1✔
204
                }
205
        }
206
}
207

208
func (p *Provider) ListProjectAssets(ctx context.Context, assetTypes []string, out chan<- *ProjectAssets) {
1✔
209
        defer close(out)
1✔
210

1✔
211
        projectsCh := p.getParentResources(ctx, p.config.Parent, []string{CrmProjectAssetType})
1✔
212

1✔
213
        for project := range projectsCh {
3✔
214
                if ctx.Err() != nil {
2✔
215
                        return
×
216
                }
×
217
                assets := collect(p.getParentResources(ctx, project.Ancestors[0], assetTypes))
2✔
218
                p.log.Debugf("Listed %d resources of type: %v in %v\n", len(assets), assetTypes, project.Name)
2✔
219
                if len(assets) == 0 {
2✔
220
                        continue
×
221
                }
222
                out <- &ProjectAssets{
2✔
223
                        Assets:       assets,
2✔
224
                        CloudAccount: project.CloudAccount,
2✔
225
                }
2✔
226
        }
227
}
228

229
func (p *Provider) ListNetworkAssets(ctx context.Context, out chan<- *ExtendedGcpAsset) {
1✔
230
        defer close(out)
1✔
231

1✔
232
        dnsPolicyAssetCh := p.getParentResources(ctx, p.config.Parent, []string{DnsPolicyAssetType})
1✔
233
        networkAssetCh := p.getParentResources(ctx, p.config.Parent, []string{ComputeNetworkAssetType})
1✔
234

1✔
235
        dnsPoliciesFields := decodeDnsPolicies(lo.Map(collect(dnsPolicyAssetCh), func(asset *ExtendedGcpAsset, _ int) *assetpb.Asset { return asset.Asset }))
2✔
236
        for asset := range networkAssetCh {
3✔
237
                select {
2✔
238
                case <-ctx.Done():
×
239
                        return
×
240
                case out <- enrichNetworkAsset(asset, dnsPoliciesFields):
2✔
241
                }
242
        }
243
}
244

245
func (p *Provider) Close() error {
×
246
        return p.inventory.Close()
×
247
}
×
248

249
func (p *Provider) Clear() {
×
250
        p.crm.Clear()
×
251
}
×
252

253
func (p *Provider) getAssetAncestorsPolicies(ctx context.Context, ancestors []string, cache *sync.Map) []*ExtendedGcpAsset {
1✔
254
        wg := sync.WaitGroup{}
1✔
255
        var assets []*ExtendedGcpAsset
1✔
256
        mu := sync.Mutex{}
1✔
257
        for _, ancestor := range ancestors {
2✔
258
                if value, ok := cache.Load(ancestor); ok {
1✔
259
                        if v, ok := value.([]*ExtendedGcpAsset); ok {
×
260
                                mu.Lock()
×
261
                                assets = append(assets, v...)
×
262
                                mu.Unlock()
×
263
                        }
×
264
                        continue
×
265
                }
266
                prjAncestorPolicyCh := make(chan *ExtendedGcpAsset)
1✔
267
                var assetType string
1✔
268
                if isFolder(ancestor) {
1✔
269
                        assetType = CrmFolderAssetType
×
270
                }
×
271
                if isOrganization(ancestor) {
2✔
272
                        assetType = CrmOrgAssetType
1✔
273
                }
1✔
274
                go p.getAllAssets(ctx, prjAncestorPolicyCh, &assetpb.ListAssetsRequest{
1✔
275
                        Parent:      ancestor,
1✔
276
                        AssetTypes:  []string{assetType},
1✔
277
                        ContentType: assetpb.ContentType_IAM_POLICY,
1✔
278
                })
1✔
279

1✔
280
                wg.Add(1)
1✔
281
                go func() {
2✔
282
                        defer wg.Done()
1✔
283
                        ancestorPolicies := collect(prjAncestorPolicyCh)
1✔
284
                        cache.Store(ancestor, ancestorPolicies)
1✔
285
                        mu.Lock()
1✔
286
                        assets = append(assets, ancestorPolicies...)
1✔
287
                        mu.Unlock()
1✔
288
                }()
1✔
289
        }
290
        wg.Wait()
1✔
291
        p.log.Debugf("Listed %d policies for ancestors: %v\n", len(assets), ancestors)
1✔
292
        return assets
1✔
293
}
294

295
func (p *Provider) getParentResources(ctx context.Context, parent string, assetTypes []string) <-chan *ExtendedGcpAsset {
10✔
296
        ch := make(chan *ExtendedGcpAsset)
10✔
297
        go p.getAllAssets(ctx, ch, &assetpb.ListAssetsRequest{
10✔
298
                Parent:      parent,
10✔
299
                AssetTypes:  assetTypes,
10✔
300
                ContentType: assetpb.ContentType_RESOURCE,
10✔
301
        })
10✔
302
        return ch
10✔
303
}
10✔
304

305
func (p *Provider) fetchAssets(ctx context.Context, contentType assetpb.ContentType, assetTypes []string) <-chan *ExtendedGcpAsset {
8✔
306
        out := make(chan *ExtendedGcpAsset)
8✔
307
        wg := sync.WaitGroup{}
8✔
308
        // Fetch each asset type separately to limit failures to a single type
8✔
309
        for _, assetType := range assetTypes {
16✔
310
                wg.Add(1)
8✔
311
                go func() {
16✔
312
                        defer wg.Done()
8✔
313
                        ch := make(chan *ExtendedGcpAsset)
8✔
314
                        go p.getAllAssets(ctx, ch, &assetpb.ListAssetsRequest{
8✔
315
                                Parent:      p.config.Parent,
8✔
316
                                AssetTypes:  []string{assetType},
8✔
317
                                ContentType: contentType,
8✔
318
                        })
8✔
319
                        for asset := range ch {
12✔
320
                                out <- asset
4✔
321
                        }
4✔
322
                }()
323
        }
324
        go func() {
16✔
325
                wg.Wait()
8✔
326
                close(out)
8✔
327
        }()
8✔
328
        return out
8✔
329
}
330

331
func (p *Provider) getAllAssets(ctx context.Context, out chan<- *ExtendedGcpAsset, req *assetpb.ListAssetsRequest) {
20✔
332
        defer close(out)
20✔
333

20✔
334
        p.log.Infof("Listing %v assets of types: %v for %v\n", req.ContentType, req.AssetTypes, req.Parent)
20✔
335
        it := p.inventory.ListAssets(ctx, &assetpb.ListAssetsRequest{
20✔
336
                Parent:      req.Parent,
20✔
337
                AssetTypes:  req.AssetTypes,
20✔
338
                ContentType: req.ContentType,
20✔
339
        })
20✔
340
        for {
59✔
341
                response, err := it.Next()
39✔
342
                if err == iterator.Done {
55✔
343
                        p.log.Infof("Finished fetching GCP %v of types: %v for %v\n", req.ContentType, req.AssetTypes, req.Parent)
16✔
344
                        return
16✔
345
                }
16✔
346

347
                if err != nil {
27✔
348
                        p.log.Errorf("Error fetching GCP %v of types: %v for %v: %v\n", req.ContentType, req.AssetTypes, req.Parent, err)
4✔
349
                        return
4✔
350
                }
4✔
351

352
                p.log.Debugf("Fetched GCP %v of type %v: %v\n", req.ContentType, response.AssetType, response.Name)
19✔
353
                out <- p.newGcpExtendedAsset(ctx, response)
19✔
354
        }
355
}
356

357
// merge by asset name. send assets with both resource & policy if both channels are open.
358
// if one channel closes, send remaining assets from the other. finally, flush remaining assets.
359
//
360
//revive:disable-next-line
361
func (p *Provider) mergeAssets(ctx context.Context, resourceCh, policyCh <-chan *ExtendedGcpAsset) <-chan *ExtendedGcpAsset {
4✔
362
        out := make(chan *ExtendedGcpAsset)
4✔
363

4✔
364
        go func() {
8✔
365
                defer close(out)
4✔
366
                assetStore := make(map[string]*ExtendedGcpAsset)
4✔
367
                rch, pch := resourceCh, policyCh
4✔
368
                for rch != nil || pch != nil || len(assetStore) > 0 {
16✔
369
                        select {
12✔
370
                        case <-ctx.Done():
×
371
                                return
×
372
                        case asset, ok := <-rch:
6✔
373
                                if ok {
8✔
374
                                        mergeAssetContentType(assetStore, asset)
2✔
375
                                } else {
6✔
376
                                        rch = nil
4✔
377
                                }
4✔
378
                        case asset, ok := <-pch:
6✔
379
                                if ok {
8✔
380
                                        mergeAssetContentType(assetStore, asset)
2✔
381
                                } else {
6✔
382
                                        pch = nil
4✔
383
                                }
4✔
384
                        }
385
                        for id, a := range assetStore {
16✔
386
                                hasPolicy := a.IamPolicy != nil
4✔
387
                                hasResource := a.Resource != nil
4✔
388
                                hasBoth := hasPolicy && hasResource
4✔
389
                                if hasBoth || (rch == nil && hasPolicy) || (pch == nil && hasResource) {
7✔
390
                                        out <- a
3✔
391
                                        delete(assetStore, id)
3✔
392
                                }
3✔
393
                        }
394
                }
395
        }()
396

397
        return out
4✔
398
}
399

400
func (p *Provider) newGcpExtendedAsset(ctx context.Context, asset *assetpb.Asset) *ExtendedGcpAsset {
19✔
401
        return &ExtendedGcpAsset{
19✔
402
                Asset:        asset,
19✔
403
                CloudAccount: p.crm.GetCloudMetadata(ctx, asset),
19✔
404
        }
19✔
405
}
19✔
406

407
func mergeAssetContentType(store map[string]*ExtendedGcpAsset, asset *ExtendedGcpAsset) {
4✔
408
        if existing, ok := store[asset.Name]; ok {
5✔
409
                if asset.Resource != nil {
2✔
410
                        existing.Resource = asset.Resource
1✔
411
                }
1✔
412
                if asset.IamPolicy != nil {
1✔
413
                        existing.IamPolicy = asset.IamPolicy
×
414
                }
×
415
        } else {
3✔
416
                store[asset.Name] = asset
3✔
417
        }
3✔
418
}
419

420
func enrichNetworkAsset(asset *ExtendedGcpAsset, dnsPoliciesFields []*dnsPolicyFields) *ExtendedGcpAsset {
2✔
421
        networkAssetFields := asset.GetResource().GetData().GetFields()
2✔
422
        networkIdentifier := strings.TrimPrefix(asset.GetName(), "//compute.googleapis.com")
2✔
423
        dnsPolicy := findDnsPolicyByNetwork(dnsPoliciesFields, networkIdentifier)
2✔
424

2✔
425
        if dnsPolicy != nil {
3✔
426
                networkAssetFields["enabledDnsLogging"] = &structpb.Value{Kind: &structpb.Value_BoolValue{BoolValue: dnsPolicy.enableLogging}}
1✔
427
        }
1✔
428
        return asset
2✔
429
}
430

431
// findDnsPolicyByNetwork finds DNS policy by network identifier
432
func findDnsPolicyByNetwork(dnsPolicies []*dnsPolicyFields, networkIdentifier string) *dnsPolicyFields {
2✔
433
        for _, dnsPolicy := range dnsPolicies {
4✔
434
                if lo.SomeBy(dnsPolicy.networks, func(networkUrl string) bool {
4✔
435
                        return strings.HasSuffix(networkUrl, networkIdentifier)
2✔
436
                }) {
3✔
437
                        return dnsPolicy
1✔
438
                }
1✔
439
        }
440
        return nil
1✔
441
}
442

443
func decodeDnsPolicies(dnsPolicyAssets []*assetpb.Asset) []*dnsPolicyFields {
1✔
444
        dnsPolicies := make([]*dnsPolicyFields, 0)
1✔
445
        for _, dnsPolicyAsset := range dnsPolicyAssets {
2✔
446
                fields := new(dnsPolicyFields)
1✔
447
                dnsPolicyData := dnsPolicyAsset.GetResource().GetData().GetFields()
1✔
448

1✔
449
                if attachedNetworks, exist := dnsPolicyData["networks"]; exist {
2✔
450
                        networks := attachedNetworks.GetListValue().GetValues()
1✔
451
                        for _, network := range networks {
2✔
452
                                if networkUrl, found := network.GetStructValue().GetFields()["networkUrl"]; found {
2✔
453
                                        fields.networks = append(fields.networks, networkUrl.GetStringValue())
1✔
454
                                }
1✔
455
                        }
456
                }
457

458
                if enableLogging, exist := dnsPolicyData["enableLogging"]; exist {
2✔
459
                        fields.enableLogging = enableLogging.GetBoolValue()
1✔
460
                }
1✔
461

462
                dnsPolicies = append(dnsPolicies, fields)
1✔
463
        }
464

465
        return dnsPolicies
1✔
466
}
467

468
func isFolder(parent string) bool {
1✔
469
        return strings.HasPrefix(parent, "folders")
1✔
470
}
1✔
471

472
func isOrganization(parent string) bool {
19✔
473
        return strings.HasPrefix(parent, "organizations")
19✔
474
}
19✔
475

476
func collect[T any](ch <-chan T) []T {
8✔
477
        res := make([]T, 0)
8✔
478
        for item := range ch {
16✔
479
                res = append(res, item)
8✔
480
        }
8✔
481
        return res
8✔
482
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc