• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

elastic / cloudbeat / 16114132341

07 Jul 2025 10:09AM UTC coverage: 76.124% (-0.07%) from 76.193%
16114132341

Pull #3430

github

orestisfl
Add mocks
Pull Request #3430: Add OpenTelemetry

147 of 205 new or added lines in 7 files covered. (71.71%)

2 existing lines in 1 file now uncovered.

9463 of 12431 relevant lines covered (76.12%)

16.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.34
/internal/resources/fetching/manager/manager.go
1
// Licensed to Elasticsearch B.V. under one or more contributor
2
// license agreements. See the NOTICE file distributed with
3
// this work for additional information regarding copyright
4
// ownership. Elasticsearch B.V. licenses this file to you under
5
// the Apache License, Version 2.0 (the "License"); you may
6
// not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17

18
package manager
19

20
import (
21
        "context"
22
        "fmt"
23
        "sync"
24
        "time"
25

26
        "go.opentelemetry.io/otel/attribute"
27
        "go.opentelemetry.io/otel/codes"
28
        "go.opentelemetry.io/otel/trace"
29

30
        "github.com/elastic/cloudbeat/internal/infra/clog"
31
        "github.com/elastic/cloudbeat/internal/infra/observability"
32
        "github.com/elastic/cloudbeat/internal/resources/fetching/cycle"
33
        "github.com/elastic/cloudbeat/internal/resources/fetching/registry"
34
)
35

36
const scopeName = "github.com/elastic/cloudbeat/internal/resources/fetching/manager"
37

38
type Manager struct {
39
        log *clog.Logger
40

41
        // Duration of a single fetcher
42
        timeout time.Duration
43

44
        // Duration between two consecutive cycles
45
        interval time.Duration
46

47
        fetcherRegistry registry.Registry
48

49
        ctx    context.Context //nolint:containedctx
50
        cancel context.CancelFunc
51
}
52

53
func NewManager(ctx context.Context, log *clog.Logger, interval time.Duration, timeout time.Duration, fetchers registry.Registry) (*Manager, error) {
11✔
54
        ctx, cancel := context.WithCancel(ctx)
11✔
55

11✔
56
        return &Manager{
11✔
57
                log:             log,
11✔
58
                timeout:         timeout,
11✔
59
                interval:        interval,
11✔
60
                fetcherRegistry: fetchers,
11✔
61
                ctx:             ctx,
11✔
62
                cancel:          cancel,
11✔
63
        }, nil
11✔
64
}
11✔
65

66
// Run starts all configured fetchers to collect resources.
67
func (m *Manager) Run() {
10✔
68
        go m.fetchAndSleep(m.ctx)
10✔
69
}
10✔
70

71
func (m *Manager) Stop() {
5✔
72
        m.cancel()
5✔
73
        m.fetcherRegistry.Stop()
5✔
74
}
5✔
75

76
func (m *Manager) fetchAndSleep(ctx context.Context) {
10✔
77
        counter, err := observability.MeterFromContext(ctx, scopeName).Int64Counter("cloudbeat.fetcher.manager.cycles")
10✔
78
        if err != nil {
10✔
NEW
79
                m.log.Errorf("Failed to create fetcher manager cycles counter: %v", err)
×
NEW
80
        }
×
81

82
        // set immediate exec for first time run
83
        timer := time.NewTimer(0)
10✔
84
        defer timer.Stop()
10✔
85

10✔
86
        for {
30✔
87
                select {
20✔
88
                case <-ctx.Done():
10✔
89
                        m.log.Info("Fetchers manager canceled")
10✔
90
                        return
10✔
91
                case <-timer.C:
10✔
92
                        // update the interval
10✔
93
                        timer.Reset(m.interval)
10✔
94
                        counter.Add(ctx, 1)
10✔
95
                        // this is blocking so the stop will not be called until all the fetchers are finished
10✔
96
                        // in case there is a blocking fetcher it will halt (til the m.timeout)
10✔
97
                        go m.fetchIteration(ctx)
10✔
98
                }
99
        }
100
}
101

102
// fetchIteration waits for all the registered fetchers and trigger them to fetch relevant resources.
103
// The function must not get called in parallel.
104
func (m *Manager) fetchIteration(ctx context.Context) {
10✔
105
        ctx, span := observability.StartSpan(
10✔
106
                ctx,
10✔
107
                scopeName,
10✔
108
                "Fetch Iteration",
10✔
109
                trace.WithAttributes(attribute.String("transaction.type", "request")),
10✔
110
        )
10✔
111
        defer span.End()
10✔
112
        logger := m.log.WithSpanContext(span.SpanContext())
10✔
113

10✔
114
        m.fetcherRegistry.Update()
10✔
115
        logger.Infof("Manager triggered fetching for %d fetchers", len(m.fetcherRegistry.Keys()))
10✔
116

10✔
117
        start := time.Now()
10✔
118

10✔
119
        seq := time.Now().Unix()
10✔
120
        logger.Infof("Cycle %d has started", seq)
10✔
121
        wg := &sync.WaitGroup{}
10✔
122
        for _, key := range m.fetcherRegistry.Keys() {
20✔
123
                wg.Add(1)
10✔
124
                go func(k string) {
20✔
125
                        defer wg.Done()
10✔
126
                        err := m.fetchSingle(ctx, k, cycle.Metadata{Sequence: seq})
10✔
127
                        if err != nil {
12✔
128
                                logger.Errorf("Error running fetcher for key %s: %v", k, err)
2✔
129
                        }
2✔
130
                }(key)
131
        }
132

133
        wg.Wait()
10✔
134
        logger.Infof("Manager finished waiting and sending data after %d milliseconds", time.Since(start).Milliseconds())
10✔
135
        logger.Infof("Cycle %d resource fetching has ended", seq)
10✔
136
}
137

138
func (m *Manager) fetchSingle(ctx context.Context, k string, cycleMetadata cycle.Metadata) error {
11✔
139
        if !m.fetcherRegistry.ShouldRun(k) {
12✔
140
                return nil
1✔
141
        }
1✔
142

143
        ctx, span := observability.StartSpan(ctx, scopeName, "Fetch "+k)
10✔
144
        defer span.End()
10✔
145

10✔
146
        ctx, cancel := context.WithTimeout(ctx, m.timeout)
10✔
147
        defer cancel()
10✔
148

10✔
149
        // The buffer is required to avoid go-routine leaks in a case a fetcher timed out
10✔
150
        errCh := make(chan error, 1)
10✔
151

10✔
152
        go func() {
20✔
153
                defer close(errCh)
10✔
154
                errCh <- m.fetchProtected(ctx, k, cycleMetadata)
10✔
155
        }()
10✔
156

157
        select {
10✔
158
        case <-ctx.Done():
1✔
159
                switch ctx.Err() {
1✔
160
                case context.DeadlineExceeded:
1✔
161
                        return fmt.Errorf("fetcher %s reached a timeout after %v seconds", k, m.timeout.Seconds())
1✔
162
                case context.Canceled:
×
163
                        return fmt.Errorf("fetcher %s %s", k, ctx.Err().Error())
×
164
                default:
×
165
                        return fmt.Errorf("fetcher %s failed with an unknown error: %v", k, ctx.Err())
×
166
                }
167

168
        case err := <-errCh:
9✔
169
                if err != nil {
11✔
170
                        span.RecordError(err)
2✔
171
                        span.SetStatus(codes.Error, err.Error())
2✔
172
                }
2✔
173
                return err
9✔
174
        }
175
}
176

177
// fetchProtected protect the fetching goroutine from getting panic
178
func (m *Manager) fetchProtected(ctx context.Context, k string, metadata cycle.Metadata) (err error) {
10✔
179
        defer func() {
20✔
180
                if r := recover(); r != nil {
13✔
181
                        err = fmt.Errorf("fetcher %s recovered from panic: %v", k, r)
3✔
182
                }
3✔
183
        }()
184

185
        return m.fetcherRegistry.Run(ctx, k, metadata)
10✔
186
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc