• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mindersec / minder / 12594378252

03 Jan 2025 07:33AM UTC coverage: 54.838%. First build
12594378252

Pull #4831

github

web-flow
Merge 20f38c1c4 into a5d977465
Pull Request #4831: Record Metrics for Reminder

0 of 149 new or added lines in 3 files covered. (0.0%)

16974 of 30953 relevant lines covered (54.84%)

37.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.66
/internal/reminder/reminder.go
1
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
2
// SPDX-License-Identifier: Apache-2.0
3

4
// Package reminder sends reminders to the minder server to process entities in background.
5
package reminder
6

7
import (
8
        "context"
9
        "errors"
10
        "fmt"
11
        "github.com/mindersec/minder/internal/reminder/metrics"
12
        "sync"
13
        "time"
14

15
        "github.com/ThreeDotsLabs/watermill/message"
16
        "github.com/google/uuid"
17
        "github.com/rs/zerolog"
18

19
        "github.com/mindersec/minder/internal/db"
20
        "github.com/mindersec/minder/internal/events/common"
21
        remindermessages "github.com/mindersec/minder/internal/reminder/messages"
22
        reminderconfig "github.com/mindersec/minder/pkg/config/reminder"
23
        "github.com/mindersec/minder/pkg/eventer/constants"
24
)
25

26
// Interface is an interface over the reminder service
27
type Interface interface {
28
        // Start starts the reminder by sending reminders at regular intervals
29
        Start(ctx context.Context) error
30

31
        // Stop stops the reminder service
32
        Stop()
33
}
34

35
// reminder sends reminders to the minder server to process entities in background.
36
type reminder struct {
37
        store    db.Store
38
        cfg      *reminderconfig.Config
39
        stop     chan struct{}
40
        stopOnce sync.Once
41

42
        repositoryCursor uuid.UUID
43

44
        ticker *time.Ticker
45

46
        eventPublisher message.Publisher
47
        eventDBCloser  common.DriverCloser
48

49
        metricsProvider *metrics.Provider
50
}
51

52
// NewReminder creates a new reminder instance
53
func NewReminder(ctx context.Context, store db.Store, config *reminderconfig.Config) (Interface, error) {
×
54
        r := &reminder{
×
55
                store: store,
×
56
                cfg:   config,
×
57
                stop:  make(chan struct{}),
×
58
        }
×
59

×
60
        // Set to a random UUID to start
×
61
        r.repositoryCursor = uuid.New()
×
62
        logger := zerolog.Ctx(ctx)
×
63
        logger.Info().Msgf("initial repository cursor: %s", r.repositoryCursor)
×
64

×
65
        pub, cl, err := r.setupSQLPublisher(ctx)
×
66
        if err != nil {
×
67
                return nil, err
×
68
        }
×
69

70
        r.eventPublisher = pub
×
71
        r.eventDBCloser = cl
×
NEW
72

×
NEW
73
        metricsProvider, err := metrics.NewProvider(&config.MetricsConfig)
×
NEW
74
        if err != nil {
×
NEW
75
                return nil, fmt.Errorf("error creating metrics provider: %w", err)
×
NEW
76
        }
×
77

NEW
78
        r.metricsProvider = metricsProvider
×
79
        return r, nil
×
80
}
81

82
// Start starts the reminder by sending reminders at regular intervals
83
func (r *reminder) Start(ctx context.Context) error {
×
84
        logger := zerolog.Ctx(ctx)
×
85
        select {
×
86
        case <-r.stop:
×
87
                return errors.New("reminder stopped, cannot start again")
×
88
        default:
×
89
        }
90

NEW
91
        err := r.metricsProvider.Start(ctx)
×
NEW
92
        if err != nil {
×
NEW
93
                return fmt.Errorf("error starting metrics provider: %w", err)
×
NEW
94
        }
×
95

96
        interval := r.cfg.RecurrenceConfig.Interval
×
97
        if interval <= 0 {
×
98
                return fmt.Errorf("invalid interval: %s", r.cfg.RecurrenceConfig.Interval)
×
99
        }
×
100

101
        r.ticker = time.NewTicker(interval)
×
102
        defer r.Stop()
×
103

×
104
        for {
×
105
                select {
×
106
                case <-ctx.Done():
×
107
                        logger.Info().Msg("reminder stopped")
×
108
                        return nil
×
109
                case <-r.stop:
×
110
                        logger.Info().Msg("reminder stopped")
×
111
                        return nil
×
112
                case <-r.ticker.C:
×
113
                        // In-case sending reminders i.e. iterating over entities consumes more time than the
×
114
                        // interval, the ticker will adjust the time interval or drop ticks to make up for
×
115
                        // slow receivers.
×
116
                        if err := r.sendReminders(ctx); err != nil {
×
117
                                logger.Error().Err(err).Msg("reconciliation request unsuccessful")
×
118
                        }
×
119
                }
120
        }
121
}
122

123
// Stop stops the reminder service
124
// Stopping the reminder service closes the stop channel and stops the ticker (if not nil).
125
// It also closes the event publisher database connection which means that only reminders
126
// that were sent successfully will be processed. Any reminders that were not sent will be lost.
127
// Stopping the reminder service while the service is starting up may cause the ticker to not be
128
// stopped as ticker might not have been created yet. Ticker will be stopped after Start returns
129
// as defer statement in Start will stop the ticker.
130
func (r *reminder) Stop() {
×
131
        if r.ticker != nil {
×
132
                defer r.ticker.Stop()
×
133
        }
×
134
        r.stopOnce.Do(func() {
×
135
                close(r.stop)
×
136
                r.eventDBCloser()
×
NEW
137

×
NEW
138
                err := r.metricsProvider.Shutdown(context.Background())
×
NEW
139
                if err != nil {
×
NEW
140
                        zerolog.Ctx(context.Background()).Error().Err(err).Msg("error shutting down metrics provider")
×
NEW
141
                }
×
142
        })
143
}
144

145
func (r *reminder) sendReminders(ctx context.Context) error {
×
146
        logger := zerolog.Ctx(ctx)
×
147

×
148
        // Fetch a batch of repositories
×
149
        repos, err := r.getRepositoryBatch(ctx)
×
150
        if err != nil {
×
151
                return fmt.Errorf("error fetching repository batch: %w", err)
×
152
        }
×
153

154
        if len(repos) == 0 {
×
155
                logger.Debug().Msg("no repositories to send reminders for")
×
156
                return nil
×
157
        }
×
158

159
        logger.Info().Msgf("created repository batch of size: %d", len(repos))
×
160

×
161
        messages, err := createReminderMessages(ctx, repos)
×
162
        if err != nil {
×
163
                return fmt.Errorf("error creating reminder messages: %w", err)
×
164
        }
×
165

NEW
166
        remMetrics := r.metricsProvider.Metrics()
×
NEW
167
        if remMetrics != nil {
×
NEW
168
                remMetrics.RecordBatch(ctx, int64(len(repos)))
×
NEW
169
        }
×
170

171
        err = r.eventPublisher.Publish(constants.TopicQueueRepoReminder, messages...)
×
172
        if err != nil {
×
173
                return fmt.Errorf("error publishing messages: %w", err)
×
174
        }
×
175

176
        repoIds := make([]uuid.UUID, len(repos))
×
177
        for _, repo := range repos {
×
178
                repoIds = append(repoIds, repo.ID)
×
NEW
179
                if remMetrics != nil {
×
NEW
180
                        // sendDelay = Now() - ReminderLastSent - MinElapsed
×
NEW
181
                        reminderLastSent := repo.ReminderLastSent
×
NEW
182
                        if reminderLastSent.Valid {
×
NEW
183
                                remMetrics.SendDelay.Record(ctx, (time.Now().Sub(reminderLastSent.Time) - r.cfg.RecurrenceConfig.MinElapsed).Seconds())
×
NEW
184
                        } else {
×
NEW
185
                                // TODO: Should the send delay be zero if the reminder has never been sent?
×
NEW
186
                                remMetrics.SendDelay.Record(ctx, 0)
×
NEW
187
                                //remMetrics.SendDelay.Record(ctx, r.cfg.RecurrenceConfig.MinElapsed.Seconds())
×
NEW
188
                        }
×
189
                }
190
        }
191

192
        err = r.store.UpdateReminderLastSentForRepositories(ctx, repoIds)
×
193
        if err != nil {
×
194
                return fmt.Errorf("reminders published but error updating last sent time: %w", err)
×
195
        }
×
196

197
        return nil
×
198
}
199

200
func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, error) {
6✔
201
        logger := zerolog.Ctx(ctx)
6✔
202

6✔
203
        logger.Debug().Msgf("fetching repositories after cursor: %s", r.repositoryCursor)
6✔
204
        repos, err := r.store.ListRepositoriesAfterID(ctx, db.ListRepositoriesAfterIDParams{
6✔
205
                ID:    r.repositoryCursor,
6✔
206
                Limit: int64(r.cfg.RecurrenceConfig.BatchSize),
6✔
207
        })
6✔
208
        if err != nil {
7✔
209
                return nil, err
1✔
210
        }
1✔
211

212
        eligibleRepos, err := r.getEligibleRepositories(ctx, repos)
5✔
213
        if err != nil {
5✔
214
                return nil, err
×
215
        }
×
216
        logger.Debug().Msgf("%d/%d repositories are eligible for reminders", len(eligibleRepos), len(repos))
5✔
217

5✔
218
        r.updateRepositoryCursor(ctx, repos)
5✔
219

5✔
220
        return eligibleRepos, nil
5✔
221
}
222

223
func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repository) ([]db.Repository, error) {
5✔
224
        eligibleRepos := make([]db.Repository, 0, len(repos))
5✔
225

5✔
226
        // We have a slice of repositories, but the sqlc-generated code wants a slice of UUIDs,
5✔
227
        // and similarly returns slices of ID -> date (in possibly different order), so we need
5✔
228
        // to do a bunch of mapping here.
5✔
229
        repoIds := make([]uuid.UUID, 0, len(repos))
5✔
230
        for _, repo := range repos {
15✔
231
                repoIds = append(repoIds, repo.ID)
10✔
232
        }
10✔
233
        oldestRuleEvals, err := r.store.ListOldestRuleEvaluationsByRepositoryId(ctx, repoIds)
5✔
234
        if err != nil {
5✔
235
                return nil, err
×
236
        }
×
237
        idToLastUpdate := make(map[uuid.UUID]time.Time, len(oldestRuleEvals))
5✔
238
        for _, times := range oldestRuleEvals {
15✔
239
                idToLastUpdate[times.RepositoryID] = times.OldestLastUpdated
10✔
240
        }
10✔
241

242
        cutoff := time.Now().Add(-1 * r.cfg.RecurrenceConfig.MinElapsed)
5✔
243
        for _, repo := range repos {
15✔
244
                if t, ok := idToLastUpdate[repo.ID]; ok && t.Before(cutoff) {
19✔
245
                        eligibleRepos = append(eligibleRepos, repo)
9✔
246
                }
9✔
247
        }
248

249
        return eligibleRepos, nil
5✔
250
}
251

252
func (r *reminder) updateRepositoryCursor(ctx context.Context, repos []db.Repository) {
5✔
253
        logger := zerolog.Ctx(ctx)
5✔
254

5✔
255
        if len(repos) == 0 {
6✔
256
                r.repositoryCursor = uuid.Nil
1✔
257
        } else {
5✔
258
                r.repositoryCursor = repos[len(repos)-1].ID
4✔
259
                r.adjustCursorForEndOfList(ctx)
4✔
260
        }
4✔
261

262
        logger.Debug().Msgf("updated repository cursor to: %s", r.repositoryCursor)
5✔
263
}
264

265
func (r *reminder) adjustCursorForEndOfList(ctx context.Context) {
4✔
266
        logger := zerolog.Ctx(ctx)
4✔
267
        // Check if the cursor is the last element in the db
4✔
268
        exists, err := r.store.RepositoryExistsAfterID(ctx, r.repositoryCursor)
4✔
269
        if err != nil {
5✔
270
                logger.Error().Err(err).Msgf("unable to check if repository exists after cursor: %s"+
1✔
271
                        ", resetting cursor to zero uuid", r.repositoryCursor)
1✔
272
                r.repositoryCursor = uuid.Nil
1✔
273
                return
1✔
274
        }
1✔
275

276
        if !exists {
4✔
277
                logger.Info().Msgf("cursor %s is at the end of the list, resetting cursor to zero uuid",
1✔
278
                        r.repositoryCursor)
1✔
279
                r.repositoryCursor = uuid.Nil
1✔
280
        }
1✔
281
}
282

283
func createReminderMessages(ctx context.Context, repos []db.Repository) ([]*message.Message, error) {
×
284
        logger := zerolog.Ctx(ctx)
×
285

×
286
        messages := make([]*message.Message, 0, len(repos))
×
287
        for _, repo := range repos {
×
288
                repoReconcileMessage, err := remindermessages.NewEntityReminderMessage(
×
289
                        repo.ProviderID, repo.ID, repo.ProjectID,
×
290
                )
×
291
                if err != nil {
×
292
                        return nil, fmt.Errorf("error creating reminder message: %w", err)
×
293
                }
×
294

295
                logger.Debug().
×
296
                        Str("repo", repo.ID.String()).
×
297
                        Msg("created reminder message")
×
298

×
299
                messages = append(messages, repoReconcileMessage)
×
300
        }
301

302
        return messages, nil
×
303
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc