• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

elastic / cloudbeat / 12692202887

09 Jan 2025 02:42PM UTC coverage: 76.015% (+0.5%) from 75.552%
12692202887

Pull #2879

github

web-flow
Merge branch 'main' into refactor-asset-inventory
Pull Request #2879: Refactor asset inventory

181 of 190 new or added lines in 17 files covered. (95.26%)

2 existing lines in 2 files now uncovered.

8595 of 11307 relevant lines covered (76.01%)

16.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.25
/internal/inventory/inventory.go
1
// Licensed to Elasticsearch B.V. under one or more contributor
2
// license agreements. See the NOTICE file distributed with
3
// this work for additional information regarding copyright
4
// ownership. Elasticsearch B.V. licenses this file to you under
5
// the Apache License, Version 2.0 (the "License"); you may
6
// not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17

18
package inventory
19

20
import (
21
        "context"
22
        "fmt"
23
        "strings"
24
        "time"
25

26
        "github.com/elastic/beats/v7/libbeat/beat"
27
        libevents "github.com/elastic/beats/v7/libbeat/beat/events"
28
        "github.com/elastic/elastic-agent-libs/logp"
29
        "github.com/elastic/elastic-agent-libs/mapstr"
30
        "github.com/samber/lo"
31
)
32

33
const (
34
        indexTemplate = "logs-cloud_asset_inventory.asset_inventory-%s_%s-default"
35
        minimalPeriod = 30 * time.Second
36
)
37

38
type AssetInventory struct {
39
        fetchers            []AssetFetcher
40
        publisher           AssetPublisher
41
        bufferFlushInterval time.Duration
42
        bufferMaxSize       int
43
        period              time.Duration
44
        logger              *logp.Logger
45
        assetCh             chan AssetEvent
46
        now                 func() time.Time
47
}
48

49
type AssetFetcher interface {
50
        Fetch(ctx context.Context, assetChannel chan<- AssetEvent)
51
}
52

53
type AssetPublisher interface {
54
        PublishAll([]beat.Event)
55
}
56

57
func NewAssetInventory(logger *logp.Logger, fetchers []AssetFetcher, publisher AssetPublisher, now func() time.Time, period time.Duration) AssetInventory {
1✔
58
        if period < minimalPeriod {
2✔
59
                period = minimalPeriod
1✔
60
        }
1✔
61
        logger.Infof("Initializing Asset Inventory POC with period of %s", period)
1✔
62
        return AssetInventory{
1✔
63
                logger:    logger,
1✔
64
                fetchers:  fetchers,
1✔
65
                publisher: publisher,
1✔
66
                // move to a configuration parameter
1✔
67
                bufferFlushInterval: 10 * time.Second,
1✔
68
                bufferMaxSize:       1600,
1✔
69
                period:              period,
1✔
70
                assetCh:             make(chan AssetEvent),
1✔
71
                now:                 now,
1✔
72
        }
1✔
73
}
74

75
func (a *AssetInventory) Run(ctx context.Context) {
2✔
76
        a.runAllFetchersOnce(ctx)
2✔
77

2✔
78
        assetsBuffer := make([]AssetEvent, 0, a.bufferMaxSize)
2✔
79
        flushTicker := time.NewTicker(a.bufferFlushInterval)
2✔
80
        fetcherPeriod := time.NewTicker(a.period)
2✔
81
        for {
65✔
82
                select {
63✔
83
                case <-ctx.Done():
1✔
84
                        a.logger.Warnf("Asset Inventory context is done: %v", ctx.Err())
1✔
85
                        a.publish(assetsBuffer)
1✔
86
                        return
1✔
87

88
                case <-fetcherPeriod.C:
1✔
89
                        a.runAllFetchersOnce(ctx)
1✔
90

91
                case <-flushTicker.C:
59✔
92
                        if len(assetsBuffer) == 0 {
118✔
93
                                a.logger.Debugf("Interval reached without events")
59✔
94
                                continue
59✔
95
                        }
96

97
                        a.logger.Infof("Asset Inventory buffer is being flushed (assets %d)", len(assetsBuffer))
×
98
                        a.publish(assetsBuffer)
×
99
                        assetsBuffer = assetsBuffer[:0] // clear keeping cap
×
100

101
                case assetToPublish := <-a.assetCh:
2✔
102
                        assetsBuffer = append(assetsBuffer, assetToPublish)
2✔
103

2✔
104
                        if len(assetsBuffer) == a.bufferMaxSize {
4✔
105
                                a.logger.Infof("Asset Inventory buffer is being flushed (assets %d)", len(assetsBuffer))
2✔
106
                                a.publish(assetsBuffer)
2✔
107
                                assetsBuffer = assetsBuffer[:0] // clear keeping cap
2✔
108
                        }
2✔
109
                }
110
        }
111
}
112

113
// runAllFetchersOnce runs every fetcher to collect assets to assetCh ONCE. It
114
// should be called every cycle, once every `a.period`.
115
func (a *AssetInventory) runAllFetchersOnce(ctx context.Context) {
4✔
116
        a.logger.Debug("Running all fetchers once")
4✔
117
        for _, fetcher := range a.fetchers {
12✔
118
                go func(fetcher AssetFetcher) {
16✔
119
                        fetcher.Fetch(ctx, a.assetCh)
8✔
120
                }(fetcher)
8✔
121
        }
122
}
123

124
func (a *AssetInventory) publish(assets []AssetEvent) {
3✔
125
        events := lo.Map(assets, func(e AssetEvent, _ int) beat.Event {
5✔
126
                var relatedEntity []string
2✔
127
                relatedEntity = append(relatedEntity, e.Entity.Id)
2✔
128
                if len(e.Entity.relatedEntityId) > 0 {
2✔
NEW
129
                        relatedEntity = append(relatedEntity, e.Entity.relatedEntityId...)
×
UNCOV
130
                }
×
131
                return beat.Event{
2✔
132
                        Meta:      mapstr.M{libevents.FieldMetaIndex: generateIndex(e.Entity)},
2✔
133
                        Timestamp: a.now(),
2✔
134
                        Fields: mapstr.M{
2✔
135
                                "entity":         e.Entity,
2✔
136
                                "event":          e.Event,
2✔
137
                                "cloud":          e.Cloud,
2✔
138
                                "host":           e.Host,
2✔
139
                                "network":        e.Network,
2✔
140
                                "user":           e.User,
2✔
141
                                "Attributes":     e.RawAttributes,
2✔
142
                                "labels":         e.Labels,
2✔
143
                                "related.entity": relatedEntity,
2✔
144
                        },
2✔
145
                }
2✔
146
        })
147

148
        a.publisher.PublishAll(events)
3✔
149
}
150

151
func generateIndex(a Entity) string {
2✔
152
        return fmt.Sprintf(indexTemplate, slugfy(string(a.Category)), slugfy(string(a.Type)))
2✔
153
}
2✔
154

155
func slugfy(s string) string {
4✔
156
        chunks := strings.Split(s, " ")
4✔
157
        clean := make([]string, len(chunks))
4✔
158
        for i, c := range chunks {
9✔
159
                clean[i] = strings.ToLower(c)
5✔
160
        }
5✔
161
        return strings.Join(clean, "_")
4✔
162
}
163

164
func (a *AssetInventory) Stop() {
1✔
165
        close(a.assetCh)
1✔
166
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc