• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

elastic / cloudbeat / 16057793696

03 Jul 2025 06:19PM UTC coverage: 76.193%. Remained the same
16057793696

Pull #3424

github

seanrathier
kip agentless again
Pull Request #3424: [Cloud Security] DO NOT MERGE: temporary skip agentless in test env

9326 of 12240 relevant lines covered (76.19%)

16.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.89
/internal/resources/providers/gcplib/inventory/provider.go
1
// Licensed to Elasticsearch B.V. under one or more contributor
2
// license agreements. See the NOTICE file distributed with
3
// this work for additional information regarding copyright
4
// ownership. Elasticsearch B.V. licenses this file to you under
5
// the Apache License, Version 2.0 (the "License"); you may
6
// not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17

18
package inventory
19

20
import (
21
        "context"
22
        "strings"
23
        "sync"
24

25
        asset "cloud.google.com/go/asset/apiv1"
26
        "cloud.google.com/go/asset/apiv1/assetpb"
27
        "github.com/googleapis/gax-go/v2"
28
        "github.com/samber/lo"
29
        "google.golang.org/api/iterator"
30
        "google.golang.org/api/option"
31
        "google.golang.org/protobuf/types/known/structpb"
32

33
        "github.com/elastic/cloudbeat/internal/config"
34
        "github.com/elastic/cloudbeat/internal/infra/clog"
35
        "github.com/elastic/cloudbeat/internal/resources/fetching"
36
        "github.com/elastic/cloudbeat/internal/resources/providers/gcplib/auth"
37
)
38

39
type Provider struct {
40
        log       *clog.Logger
41
        config    auth.GcpFactoryConfig
42
        inventory *AssetsInventoryWrapper
43
        crm       *ResourceManagerWrapper
44
}
45

46
type AssetsInventoryWrapper struct {
47
        Close      func() error
48
        ListAssets func(ctx context.Context, req *assetpb.ListAssetsRequest, opts ...gax.CallOption) Iterator
49
}
50

51
type MonitoringAsset struct {
52
        CloudAccount *fetching.CloudAccountMetadata
53
        LogMetrics   []*ExtendedGcpAsset `json:"log_metrics,omitempty"`
54
        Alerts       []*ExtendedGcpAsset `json:"alerts,omitempty"`
55
}
56

57
type ProjectPoliciesAsset struct {
58
        CloudAccount *fetching.CloudAccountMetadata
59
        Policies     []*ExtendedGcpAsset `json:"policies,omitempty"`
60
}
61

62
type ProjectAssets struct {
63
        CloudAccount *fetching.CloudAccountMetadata
64
        Assets       []*ExtendedGcpAsset
65
}
66

67
type ExtendedGcpAsset struct {
68
        *assetpb.Asset
69
        CloudAccount *fetching.CloudAccountMetadata
70
}
71

72
type ProviderInitializer struct{}
73

74
type dnsPolicyFields struct {
75
        networks      []string
76
        enableLogging bool
77
}
78

79
type Iterator interface {
80
        Next() (*assetpb.Asset, error)
81
}
82

83
type ServiceAPI interface {
84
        ListAssetTypes(ctx context.Context, assetTypes []string, out chan<- *ExtendedGcpAsset)
85
        ListMonitoringAssets(ctx context.Context, out chan<- *MonitoringAsset)
86
        ListProjectsAncestorsPolicies(ctx context.Context, out chan<- *ProjectPoliciesAsset)
87
        ListProjectAssets(ctx context.Context, assetTypes []string, out chan<- *ProjectAssets)
88
        ListNetworkAssets(ctx context.Context, out chan<- *ExtendedGcpAsset)
89
        Clear()
90
        Close() error
91
}
92

93
type ProviderInitializerAPI interface {
94
        Init(ctx context.Context, log *clog.Logger, gcpConfig auth.GcpFactoryConfig, cfg config.GcpConfig) (ServiceAPI, error)
95
}
96

97
func newAssetsInventoryWrapper(ctx context.Context, log *clog.Logger, gcpConfig auth.GcpFactoryConfig, cfg config.GcpConfig) (*AssetsInventoryWrapper, error) {
1✔
98
        limiter := NewAssetsInventoryRateLimiter(log)
1✔
99
        client, err := asset.NewClient(ctx, append(gcpConfig.ClientOpts, option.WithGRPCDialOption(limiter.GetInterceptorDialOption()))...)
1✔
100
        if err != nil {
2✔
101
                return nil, err
1✔
102
        }
1✔
103

104
        return &AssetsInventoryWrapper{
×
105
                Close: client.Close,
×
106
                ListAssets: func(ctx context.Context, req *assetpb.ListAssetsRequest, opts ...gax.CallOption) Iterator {
×
107
                        if req.PageSize == 0 {
×
108
                                req.PageSize = cfg.GcpCallOpt.ListAssetsPageSize
×
109
                        }
×
110
                        return client.ListAssets(ctx, req, append(opts, GAXCallOptionRetrier(log), gax.WithTimeout(cfg.GcpCallOpt.ListAssetsTimeout))...)
×
111
                },
112
        }, nil
113
}
114

115
func (p *ProviderInitializer) Init(ctx context.Context, log *clog.Logger, gcpConfig auth.GcpFactoryConfig, cfg config.GcpConfig) (ServiceAPI, error) {
1✔
116
        assetsInventory, err := newAssetsInventoryWrapper(ctx, log, gcpConfig, cfg)
1✔
117
        if err != nil {
2✔
118
                return nil, err
1✔
119
        }
1✔
120

121
        cloudResourceManager, err := NewResourceManagerWrapper(ctx, log, gcpConfig)
×
122
        if err != nil {
×
123
                return nil, err
×
124
        }
×
125

126
        return &Provider{
×
127
                config:    gcpConfig,
×
128
                log:       log,
×
129
                inventory: assetsInventory,
×
130
                crm:       cloudResourceManager,
×
131
        }, nil
×
132
}
133

134
func (p *Provider) ListAssetTypes(ctx context.Context, assetTypes []string, out chan<- *ExtendedGcpAsset) {
4✔
135
        defer close(out)
4✔
136

4✔
137
        resourceAssetsCh := p.fetchAssets(ctx, assetpb.ContentType_RESOURCE, assetTypes)
4✔
138
        policiesAssetsCh := p.fetchAssets(ctx, assetpb.ContentType_IAM_POLICY, assetTypes)
4✔
139
        assetsCh := p.mergeAssets(ctx, resourceAssetsCh, policiesAssetsCh)
4✔
140

4✔
141
        for asset := range assetsCh {
7✔
142
                select {
3✔
143
                case <-ctx.Done():
×
144
                        return
×
145
                case out <- asset:
3✔
146
                }
147
        }
148
}
149

150
func (p *Provider) ListMonitoringAssets(ctx context.Context, out chan<- *MonitoringAsset) {
1✔
151
        defer close(out)
1✔
152

1✔
153
        projectsCh := p.getParentResources(ctx, p.config.Parent, []string{CrmProjectAssetType})
1✔
154

1✔
155
        for project := range projectsCh {
3✔
156
                if ctx.Err() != nil {
2✔
157
                        return
×
158
                }
×
159
                logsAssetCh := p.getParentResources(ctx, project.Ancestors[0], []string{MonitoringLogMetricAssetType})
2✔
160
                alertsAssetCh := p.getParentResources(ctx, project.Ancestors[0], []string{MonitoringAlertPolicyAssetType})
2✔
161

2✔
162
                var logAssets, alertAssets []*ExtendedGcpAsset
2✔
163
                var wg sync.WaitGroup
2✔
164

2✔
165
                wg.Add(2)
2✔
166
                go func() {
4✔
167
                        defer wg.Done()
2✔
168
                        logAssets = collect(logsAssetCh)
2✔
169
                }()
2✔
170
                go func() {
4✔
171
                        defer wg.Done()
2✔
172
                        alertAssets = collect(alertsAssetCh)
2✔
173
                }()
2✔
174
                wg.Wait()
2✔
175

2✔
176
                p.log.Debugf("Listed %d log metrics and %d alert policies for %v\n", len(logAssets), len(alertAssets), project.Name)
2✔
177
                if len(logAssets) == 0 && len(alertAssets) == 0 {
2✔
178
                        continue
×
179
                }
180
                out <- &MonitoringAsset{
2✔
181
                        LogMetrics:   logAssets,
2✔
182
                        Alerts:       alertAssets,
2✔
183
                        CloudAccount: project.CloudAccount,
2✔
184
                }
2✔
185
        }
186
}
187

188
func (p *Provider) ListProjectsAncestorsPolicies(ctx context.Context, out chan<- *ProjectPoliciesAsset) {
1✔
189
        defer close(out)
1✔
190

1✔
191
        projectsCh := make(chan *ExtendedGcpAsset)
1✔
192
        policiesCache := &sync.Map{}
1✔
193

1✔
194
        go p.getAllAssets(ctx, projectsCh, &assetpb.ListAssetsRequest{
1✔
195
                Parent:      p.config.Parent,
1✔
196
                AssetTypes:  []string{CrmProjectAssetType},
1✔
197
                ContentType: assetpb.ContentType_IAM_POLICY,
1✔
198
        })
1✔
199

1✔
200
        for asset := range projectsCh {
2✔
201
                select {
1✔
202
                case <-ctx.Done():
×
203
                        return
×
204
                case out <- &ProjectPoliciesAsset{
205
                        CloudAccount: asset.CloudAccount,
206
                        Policies:     append([]*ExtendedGcpAsset{asset}, p.getAssetAncestorsPolicies(ctx, asset.Ancestors[1:], policiesCache)...),
207
                }:
1✔
208
                }
209
        }
210
}
211

212
func (p *Provider) ListProjectAssets(ctx context.Context, assetTypes []string, out chan<- *ProjectAssets) {
1✔
213
        defer close(out)
1✔
214

1✔
215
        projectsCh := p.getParentResources(ctx, p.config.Parent, []string{CrmProjectAssetType})
1✔
216

1✔
217
        for project := range projectsCh {
3✔
218
                if ctx.Err() != nil {
2✔
219
                        return
×
220
                }
×
221
                assets := collect(p.getParentResources(ctx, project.Ancestors[0], assetTypes))
2✔
222
                p.log.Debugf("Listed %d resources of type: %v in %v\n", len(assets), assetTypes, project.Name)
2✔
223
                if len(assets) == 0 {
2✔
224
                        continue
×
225
                }
226
                out <- &ProjectAssets{
2✔
227
                        Assets:       assets,
2✔
228
                        CloudAccount: project.CloudAccount,
2✔
229
                }
2✔
230
        }
231
}
232

233
func (p *Provider) ListNetworkAssets(ctx context.Context, out chan<- *ExtendedGcpAsset) {
1✔
234
        defer close(out)
1✔
235

1✔
236
        dnsPolicyAssetCh := p.getParentResources(ctx, p.config.Parent, []string{DnsPolicyAssetType})
1✔
237
        networkAssetCh := p.getParentResources(ctx, p.config.Parent, []string{ComputeNetworkAssetType})
1✔
238

1✔
239
        dnsPoliciesFields := decodeDnsPolicies(lo.Map(collect(dnsPolicyAssetCh), func(asset *ExtendedGcpAsset, _ int) *assetpb.Asset { return asset.Asset }))
2✔
240
        for asset := range networkAssetCh {
3✔
241
                select {
2✔
242
                case <-ctx.Done():
×
243
                        return
×
244
                case out <- enrichNetworkAsset(asset, dnsPoliciesFields):
2✔
245
                }
246
        }
247
}
248

249
func (p *Provider) Close() error {
×
250
        return p.inventory.Close()
×
251
}
×
252

253
func (p *Provider) Clear() {
×
254
        p.crm.Clear()
×
255
}
×
256

257
func (p *Provider) getAssetAncestorsPolicies(ctx context.Context, ancestors []string, cache *sync.Map) []*ExtendedGcpAsset {
1✔
258
        wg := sync.WaitGroup{}
1✔
259
        var assets []*ExtendedGcpAsset
1✔
260
        mu := sync.Mutex{}
1✔
261
        for _, ancestor := range ancestors {
2✔
262
                if value, ok := cache.Load(ancestor); ok {
1✔
263
                        if v, ok := value.([]*ExtendedGcpAsset); ok {
×
264
                                mu.Lock()
×
265
                                assets = append(assets, v...)
×
266
                                mu.Unlock()
×
267
                        }
×
268
                        continue
×
269
                }
270
                prjAncestorPolicyCh := make(chan *ExtendedGcpAsset)
1✔
271
                var assetType string
1✔
272
                if isFolder(ancestor) {
1✔
273
                        assetType = CrmFolderAssetType
×
274
                }
×
275
                if isOrganization(ancestor) {
2✔
276
                        assetType = CrmOrgAssetType
1✔
277
                }
1✔
278
                go p.getAllAssets(ctx, prjAncestorPolicyCh, &assetpb.ListAssetsRequest{
1✔
279
                        Parent:      ancestor,
1✔
280
                        AssetTypes:  []string{assetType},
1✔
281
                        ContentType: assetpb.ContentType_IAM_POLICY,
1✔
282
                })
1✔
283

1✔
284
                wg.Add(1)
1✔
285
                go func() {
2✔
286
                        defer wg.Done()
1✔
287
                        ancestorPolicies := collect(prjAncestorPolicyCh)
1✔
288
                        cache.Store(ancestor, ancestorPolicies)
1✔
289
                        mu.Lock()
1✔
290
                        assets = append(assets, ancestorPolicies...)
1✔
291
                        mu.Unlock()
1✔
292
                }()
1✔
293
        }
294
        wg.Wait()
1✔
295
        p.log.Debugf("Listed %d policies for ancestors: %v\n", len(assets), ancestors)
1✔
296
        return assets
1✔
297
}
298

299
func (p *Provider) getParentResources(ctx context.Context, parent string, assetTypes []string) <-chan *ExtendedGcpAsset {
10✔
300
        ch := make(chan *ExtendedGcpAsset)
10✔
301
        go p.getAllAssets(ctx, ch, &assetpb.ListAssetsRequest{
10✔
302
                Parent:      parent,
10✔
303
                AssetTypes:  assetTypes,
10✔
304
                ContentType: assetpb.ContentType_RESOURCE,
10✔
305
        })
10✔
306
        return ch
10✔
307
}
10✔
308

309
func (p *Provider) fetchAssets(ctx context.Context, contentType assetpb.ContentType, assetTypes []string) <-chan *ExtendedGcpAsset {
8✔
310
        out := make(chan *ExtendedGcpAsset)
8✔
311
        wg := sync.WaitGroup{}
8✔
312
        // Fetch each asset type separately to limit failures to a single type
8✔
313
        for _, assetType := range assetTypes {
16✔
314
                wg.Add(1)
8✔
315
                go func() {
16✔
316
                        defer wg.Done()
8✔
317
                        ch := make(chan *ExtendedGcpAsset)
8✔
318
                        go p.getAllAssets(ctx, ch, &assetpb.ListAssetsRequest{
8✔
319
                                Parent:      p.config.Parent,
8✔
320
                                AssetTypes:  []string{assetType},
8✔
321
                                ContentType: contentType,
8✔
322
                        })
8✔
323
                        for asset := range ch {
12✔
324
                                out <- asset
4✔
325
                        }
4✔
326
                }()
327
        }
328
        go func() {
16✔
329
                wg.Wait()
8✔
330
                close(out)
8✔
331
        }()
8✔
332
        return out
8✔
333
}
334

335
func (p *Provider) getAllAssets(ctx context.Context, out chan<- *ExtendedGcpAsset, req *assetpb.ListAssetsRequest) {
20✔
336
        defer close(out)
20✔
337

20✔
338
        p.log.Infof("Listing %v assets of types: %v for %v\n", req.ContentType, req.AssetTypes, req.Parent)
20✔
339
        it := p.inventory.ListAssets(ctx, &assetpb.ListAssetsRequest{
20✔
340
                Parent:      req.Parent,
20✔
341
                AssetTypes:  req.AssetTypes,
20✔
342
                ContentType: req.ContentType,
20✔
343
        })
20✔
344
        for {
59✔
345
                response, err := it.Next()
39✔
346
                if err == iterator.Done {
55✔
347
                        p.log.Infof("Finished fetching GCP %v of types: %v for %v\n", req.ContentType, req.AssetTypes, req.Parent)
16✔
348
                        return
16✔
349
                }
16✔
350

351
                if err != nil {
27✔
352
                        p.log.Errorf("Error fetching GCP %v of types: %v for %v: %v\n", req.ContentType, req.AssetTypes, req.Parent, err)
4✔
353
                        return
4✔
354
                }
4✔
355

356
                p.log.Debugf("Fetched GCP %v of type %v: %v\n", req.ContentType, response.AssetType, response.Name)
19✔
357
                out <- p.newGcpExtendedAsset(ctx, response)
19✔
358
        }
359
}
360

361
// merge by asset name. send assets with both resource & policy if both channels are open.
362
// if one channel closes, send remaining assets from the other. finally, flush remaining assets.
363
//
364
//revive:disable-next-line
365
func (p *Provider) mergeAssets(ctx context.Context, resourceCh, policyCh <-chan *ExtendedGcpAsset) <-chan *ExtendedGcpAsset {
4✔
366
        out := make(chan *ExtendedGcpAsset)
4✔
367

4✔
368
        go func() {
8✔
369
                defer close(out)
4✔
370
                assetStore := make(map[string]*ExtendedGcpAsset)
4✔
371
                rch, pch := resourceCh, policyCh
4✔
372
                for rch != nil || pch != nil || len(assetStore) > 0 {
16✔
373
                        select {
12✔
374
                        case <-ctx.Done():
×
375
                                return
×
376
                        case asset, ok := <-rch:
6✔
377
                                if ok {
8✔
378
                                        mergeAssetContentType(assetStore, asset)
2✔
379
                                } else {
6✔
380
                                        rch = nil
4✔
381
                                }
4✔
382
                        case asset, ok := <-pch:
6✔
383
                                if ok {
8✔
384
                                        mergeAssetContentType(assetStore, asset)
2✔
385
                                } else {
6✔
386
                                        pch = nil
4✔
387
                                }
4✔
388
                        }
389
                        for id, a := range assetStore {
16✔
390
                                hasPolicy := a.IamPolicy != nil
4✔
391
                                hasResource := a.Resource != nil
4✔
392
                                hasBoth := hasPolicy && hasResource
4✔
393
                                if hasBoth || (rch == nil && hasPolicy) || (pch == nil && hasResource) {
7✔
394
                                        out <- a
3✔
395
                                        delete(assetStore, id)
3✔
396
                                }
3✔
397
                        }
398
                }
399
        }()
400

401
        return out
4✔
402
}
403

404
func (p *Provider) newGcpExtendedAsset(ctx context.Context, asset *assetpb.Asset) *ExtendedGcpAsset {
19✔
405
        return &ExtendedGcpAsset{
19✔
406
                Asset:        asset,
19✔
407
                CloudAccount: p.crm.GetCloudMetadata(ctx, asset),
19✔
408
        }
19✔
409
}
19✔
410

411
func mergeAssetContentType(store map[string]*ExtendedGcpAsset, asset *ExtendedGcpAsset) {
4✔
412
        if existing, ok := store[asset.Name]; ok {
5✔
413
                if asset.Resource != nil {
1✔
414
                        existing.Resource = asset.Resource
×
415
                }
×
416
                if asset.IamPolicy != nil {
2✔
417
                        existing.IamPolicy = asset.IamPolicy
1✔
418
                }
1✔
419
        } else {
3✔
420
                store[asset.Name] = asset
3✔
421
        }
3✔
422
}
423

424
func enrichNetworkAsset(asset *ExtendedGcpAsset, dnsPoliciesFields []*dnsPolicyFields) *ExtendedGcpAsset {
2✔
425
        networkAssetFields := asset.GetResource().GetData().GetFields()
2✔
426
        networkIdentifier := strings.TrimPrefix(asset.GetName(), "//compute.googleapis.com")
2✔
427
        dnsPolicy := findDnsPolicyByNetwork(dnsPoliciesFields, networkIdentifier)
2✔
428

2✔
429
        if dnsPolicy != nil {
3✔
430
                networkAssetFields["enabledDnsLogging"] = &structpb.Value{Kind: &structpb.Value_BoolValue{BoolValue: dnsPolicy.enableLogging}}
1✔
431
        }
1✔
432
        return asset
2✔
433
}
434

435
// findDnsPolicyByNetwork finds DNS policy by network identifier
436
func findDnsPolicyByNetwork(dnsPolicies []*dnsPolicyFields, networkIdentifier string) *dnsPolicyFields {
2✔
437
        for _, dnsPolicy := range dnsPolicies {
4✔
438
                if lo.SomeBy(dnsPolicy.networks, func(networkUrl string) bool {
4✔
439
                        return strings.HasSuffix(networkUrl, networkIdentifier)
2✔
440
                }) {
3✔
441
                        return dnsPolicy
1✔
442
                }
1✔
443
        }
444
        return nil
1✔
445
}
446

447
func decodeDnsPolicies(dnsPolicyAssets []*assetpb.Asset) []*dnsPolicyFields {
1✔
448
        dnsPolicies := make([]*dnsPolicyFields, 0)
1✔
449
        for _, dnsPolicyAsset := range dnsPolicyAssets {
2✔
450
                fields := new(dnsPolicyFields)
1✔
451
                dnsPolicyData := dnsPolicyAsset.GetResource().GetData().GetFields()
1✔
452

1✔
453
                if attachedNetworks, exist := dnsPolicyData["networks"]; exist {
2✔
454
                        networks := attachedNetworks.GetListValue().GetValues()
1✔
455
                        for _, network := range networks {
2✔
456
                                if networkUrl, found := network.GetStructValue().GetFields()["networkUrl"]; found {
2✔
457
                                        fields.networks = append(fields.networks, networkUrl.GetStringValue())
1✔
458
                                }
1✔
459
                        }
460
                }
461

462
                if enableLogging, exist := dnsPolicyData["enableLogging"]; exist {
2✔
463
                        fields.enableLogging = enableLogging.GetBoolValue()
1✔
464
                }
1✔
465

466
                dnsPolicies = append(dnsPolicies, fields)
1✔
467
        }
468

469
        return dnsPolicies
1✔
470
}
471

472
func isFolder(parent string) bool {
1✔
473
        return strings.HasPrefix(parent, "folders")
1✔
474
}
1✔
475

476
func isOrganization(parent string) bool {
20✔
477
        return strings.HasPrefix(parent, "organizations")
20✔
478
}
20✔
479

480
func collect[T any](ch <-chan T) []T {
8✔
481
        res := make([]T, 0)
8✔
482
        for item := range ch {
16✔
483
                res = append(res, item)
8✔
484
        }
8✔
485
        return res
8✔
486
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc