• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mindersec / minder / 12667140100

08 Jan 2025 08:51AM UTC coverage: 55.876% (-0.003%) from 55.879%
12667140100

Pull #5267

github

web-flow
Merge 40d2ebe2b into 1f462fdb3
Pull Request #5267: build(deps): bump google.golang.org/protobuf from 1.36.1 to 1.36.2 in /tools

17208 of 30797 relevant lines covered (55.88%)

38.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.73
/internal/eea/eea.go
1
// SPDX-FileCopyrightText: Copyright 2023 The Minder Authors
2
// SPDX-License-Identifier: Apache-2.0
3

4
// Package eea provides objects and event handlers for the EEA. EEA stands for
5
// Event Execution Aggregator. The EEA is responsible for aggregating events
6
// from the webhook and making sure we don't send too many events to the
7
// executor engine.
8
package eea
9

10
import (
11
        "context"
12
        "database/sql"
13
        "errors"
14
        "fmt"
15

16
        "github.com/ThreeDotsLabs/watermill/message"
17
        "github.com/google/uuid"
18
        "github.com/rs/zerolog"
19

20
        "github.com/mindersec/minder/internal/db"
21
        "github.com/mindersec/minder/internal/engine/entities"
22
        "github.com/mindersec/minder/internal/entities/properties/service"
23
        pbinternal "github.com/mindersec/minder/internal/proto"
24
        "github.com/mindersec/minder/internal/providers/manager"
25
        minderv1 "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
26
        serverconfig "github.com/mindersec/minder/pkg/config/server"
27
        "github.com/mindersec/minder/pkg/eventer/constants"
28
        "github.com/mindersec/minder/pkg/eventer/interfaces"
29
)
30

31
// EEA is the Event Execution Aggregator
32
type EEA struct {
33
        querier db.Store
34
        evt     interfaces.Publisher
35
        cfg     *serverconfig.AggregatorConfig
36

37
        entityFetcher service.PropertiesService
38
        provMan       manager.ProviderManager
39
}
40

41
// NewEEA creates a new EEA
42
func NewEEA(querier db.Store, evt interfaces.Publisher, cfg *serverconfig.AggregatorConfig,
43
        ef service.PropertiesService, provMan manager.ProviderManager) *EEA {
8✔
44
        return &EEA{
8✔
45
                querier:       querier,
8✔
46
                evt:           evt,
8✔
47
                cfg:           cfg,
8✔
48
                entityFetcher: ef,
8✔
49
                provMan:       provMan,
8✔
50
        }
8✔
51
}
8✔
52

53
// Register implements the Consumer interface.
54
func (e *EEA) Register(r interfaces.Registrar) {
1✔
55
        r.Register(constants.TopicQueueEntityFlush, e.FlushMessageHandler)
1✔
56
}
1✔
57

58
// AggregateMiddleware will pass on the event to the executor engine
59
// if the event is ready to be executed. Else it'll cache
60
// the event until it's ready to be executed.
61
func (e *EEA) AggregateMiddleware(h message.HandlerFunc) message.HandlerFunc {
2✔
62
        return func(msg *message.Message) ([]*message.Message, error) {
103✔
63
                msg, err := e.aggregate(msg)
101✔
64
                if err != nil {
101✔
65
                        return nil, fmt.Errorf("error aggregating event: %w", err)
×
66
                }
×
67

68
                if msg == nil {
200✔
69
                        return nil, nil
99✔
70
                }
99✔
71

72
                return h(msg)
2✔
73
        }
74
}
75

76
// nolint:gocyclo // TODO: hacking in the TODO about foreign keys pushed this over the limit.
77
func (e *EEA) aggregate(msg *message.Message) (*message.Message, error) {
101✔
78
        ctx := msg.Context()
101✔
79
        inf, err := entities.ParseEntityEvent(msg)
101✔
80
        if err != nil {
101✔
81
                return nil, fmt.Errorf("error unmarshalling payload: %w", err)
×
82
        }
×
83

84
        projectID := inf.ProjectID
101✔
85

101✔
86
        logger := zerolog.Ctx(ctx).With().
101✔
87
                Str("component", "EEA").
101✔
88
                // This is added for consistency with how watermill
101✔
89
                // tracks message UUID when logging.
101✔
90
                Str("message_uuid", msg.UUID).
101✔
91
                Str("entity", inf.Type.ToString()).
101✔
92
                Logger()
101✔
93

101✔
94
        entityID, err := inf.GetID()
101✔
95
        if err != nil {
101✔
96
                logger.Debug().AnErr("error getting entity ID", err).Msgf("Entity ID was not set for event %s", inf.Type)
×
97
                // Nothing we can do after this.
×
98
                return nil, nil
×
99
        }
×
100

101
        logger = logger.With().Str("entity_id", entityID.String()).Logger()
101✔
102

101✔
103
        tx, err := e.querier.BeginTransaction()
101✔
104
        if err != nil {
101✔
105
                return nil, fmt.Errorf("error beginning transaction: %w", err)
×
106
        }
×
107
        qtx := e.querier.GetQuerierWithTransaction(tx)
101✔
108

101✔
109
        // We'll only attempt to lock if the entity exists.
101✔
110
        _, err = qtx.GetEntityByID(ctx, entityID)
101✔
111
        if err != nil {
101✔
112
                // explicit rollback if entity had an issue.
×
113
                _ = e.querier.Rollback(tx)
×
114
                if errors.Is(err, sql.ErrNoRows) {
×
115
                        logger.Debug().Msg("entity not found")
×
116
                        return nil, nil
×
117
                }
×
118
                return nil, fmt.Errorf("error getting entity: %w", err)
×
119
        }
120

121
        res, err := qtx.LockIfThresholdNotExceeded(ctx, db.LockIfThresholdNotExceededParams{
101✔
122
                Entity:           entities.EntityTypeToDB(inf.Type),
101✔
123
                EntityInstanceID: entityID,
101✔
124
                ProjectID:        projectID,
101✔
125
                Interval:         fmt.Sprintf("%d", e.cfg.LockInterval),
101✔
126
        })
101✔
127
        if err == nil {
103✔
128
                if err := tx.Commit(); err != nil {
2✔
129
                        return nil, fmt.Errorf("error committing transaction: %w", err)
×
130
                }
×
131
        } else {
99✔
132
                _ = e.querier.Rollback(tx)
99✔
133
        }
99✔
134

135
        // if nothing was retrieved from the database, then we can assume
136
        // that the event is not ready to be executed.
137
        if err != nil && errors.Is(err, sql.ErrNoRows) {
200✔
138
                logger.Info().Msg("executor not ready to process event. Queuing in flush cache.")
99✔
139

99✔
140
                _, err := e.querier.EnqueueFlush(ctx, db.EnqueueFlushParams{
99✔
141
                        Entity:           entities.EntityTypeToDB(inf.Type),
99✔
142
                        EntityInstanceID: entityID,
99✔
143
                        ProjectID:        projectID,
99✔
144
                })
99✔
145
                if err != nil {
197✔
146
                        // We already have this item in the queue.
98✔
147
                        if errors.Is(err, sql.ErrNoRows) {
196✔
148
                                return nil, nil
98✔
149
                        }
98✔
150
                        return nil, fmt.Errorf("error enqueuing flush: %w", err)
×
151
                }
152

153
                return nil, nil
1✔
154
        } else if err != nil {
2✔
155
                logger.Err(err).Msg("error locking event")
×
156
                return nil, fmt.Errorf("error locking: %w", err)
×
157
        }
×
158

159
        logger.Info().Str("execution_id", res.LockedBy.String()).Msg("event ready to be executed")
2✔
160
        msg.Metadata.Set(entities.ExecutionIDKey, res.LockedBy.String())
2✔
161

2✔
162
        return msg, nil
2✔
163
}
164

165
// FlushMessageHandler will flush the cache of events to the executor engine
166
// if the event is ready to be executed.
167
func (e *EEA) FlushMessageHandler(msg *message.Message) error {
53✔
168
        ctx := msg.Context()
53✔
169

53✔
170
        inf, err := entities.ParseEntityEvent(msg)
53✔
171
        if err != nil {
53✔
172
                return fmt.Errorf("error unmarshalling payload: %w", err)
×
173
        }
×
174

175
        eID, err := inf.GetID()
53✔
176
        if err != nil {
53✔
177
                return fmt.Errorf("error getting entity ID: %w", err)
×
178
        }
×
179

180
        logger := zerolog.Ctx(ctx).With().
53✔
181
                Str("component", "EEA").
53✔
182
                Str("function", "FlushMessageHandler").
53✔
183
                // This is added for consistency with how watermill
53✔
184
                // tracks message UUID when logging.
53✔
185
                Str("message_uuid", msg.UUID).
53✔
186
                Str("entity", inf.Type.ToString()).Logger()
53✔
187

53✔
188
        logger.Debug().Msg("flushing event")
53✔
189

53✔
190
        _, err = e.querier.FlushCache(ctx, eID)
53✔
191
        // Nothing to do here. If we can't flush the cache, it means
53✔
192
        // that the event has already been executed.
53✔
193
        if err != nil && errors.Is(err, sql.ErrNoRows) {
53✔
194
                zerolog.Ctx(ctx).Debug().Msg("no flushing needed")
×
195
                return nil
×
196
        } else if err != nil {
101✔
197
                return fmt.Errorf("error flushing cache: %w", err)
48✔
198
        }
48✔
199

200
        logger.Debug().Msg("re-publishing event because of flush")
5✔
201

5✔
202
        // Now that we've flushed the event, let's try to publish it again
5✔
203
        // which means, go through the locking process again.
5✔
204
        if err := inf.Publish(e.evt); err != nil {
5✔
205
                return fmt.Errorf("error publishing execute event: %w", err)
×
206
        }
×
207

208
        return nil
5✔
209
}
210

211
// FlushAll will flush all events in the cache to the executor engine
212
func (e *EEA) FlushAll(ctx context.Context) error {
7✔
213
        caches, err := e.querier.ListFlushCache(ctx)
7✔
214
        if err != nil {
8✔
215
                return fmt.Errorf("error listing flush cache: %w", err)
1✔
216
        }
1✔
217

218
        for _, cache := range caches {
11✔
219
                cache := cache
5✔
220

5✔
221
                eiw, err := e.buildEntityWrapper(ctx, cache.Entity,
5✔
222
                        cache.ProjectID, cache.EntityInstanceID)
5✔
223
                if err != nil {
6✔
224
                        if errors.Is(err, sql.ErrNoRows) || errors.Is(err, service.ErrEntityNotFound) {
2✔
225
                                continue
1✔
226
                        }
227
                        return fmt.Errorf("error building entity wrapper: %w", err)
×
228
                }
229

230
                msg, err := eiw.BuildMessage()
4✔
231
                if err != nil {
4✔
232
                        return fmt.Errorf("error building message: %w", err)
×
233
                }
×
234

235
                msg.SetContext(ctx)
4✔
236

4✔
237
                if err := e.FlushMessageHandler(msg); err != nil {
4✔
238
                        return fmt.Errorf("error flushing messages: %w", err)
×
239
                }
×
240
        }
241

242
        return nil
6✔
243
}
244

245
func (e *EEA) buildEntityWrapper(
246
        ctx context.Context,
247
        entity db.Entities,
248
        projID uuid.UUID,
249
        entityID uuid.UUID,
250
) (*entities.EntityInfoWrapper, error) {
5✔
251
        switch entity {
5✔
252
        case db.EntitiesRepository:
2✔
253
                return e.buildRepositoryInfoWrapper(ctx, entityID, projID)
2✔
254
        case db.EntitiesArtifact:
2✔
255
                return e.buildArtifactInfoWrapper(ctx, entityID, projID)
2✔
256
        case db.EntitiesPullRequest:
1✔
257
                return e.buildPullRequestInfoWrapper(ctx, entityID, projID)
1✔
258
        case db.EntitiesBuildEnvironment, db.EntitiesRelease,
259
                db.EntitiesPipelineRun, db.EntitiesTaskRun, db.EntitiesBuild:
×
260
                return nil, fmt.Errorf("entity type %q not yet supported", entity)
×
261
        default:
×
262
                return nil, fmt.Errorf("unknown entity type: %q", entity)
×
263
        }
264
}
265

266
func (e *EEA) buildRepositoryInfoWrapper(
267
        ctx context.Context,
268
        repoID uuid.UUID,
269
        projID uuid.UUID,
270
) (*entities.EntityInfoWrapper, error) {
2✔
271
        ent, err := e.entityFetcher.EntityWithPropertiesByID(ctx, repoID, nil)
2✔
272
        if err != nil {
3✔
273
                return nil, fmt.Errorf("error fetching entity: %w", err)
1✔
274
        }
1✔
275

276
        if ent.Entity.ProjectID != projID {
1✔
277
                return nil, fmt.Errorf("entity %s does not belong to project %s", repoID, projID)
×
278
        }
×
279

280
        rawRepo, err := e.entityFetcher.EntityWithPropertiesAsProto(ctx, ent, e.provMan)
1✔
281
        if err != nil {
1✔
282
                return nil, fmt.Errorf("error converting entity to protobuf: %w", err)
×
283
        }
×
284

285
        r, ok := rawRepo.(*minderv1.Repository)
1✔
286
        if !ok {
1✔
287
                return nil, fmt.Errorf("error converting entity to repository")
×
288
        }
×
289

290
        return entities.NewEntityInfoWrapper().
1✔
291
                WithRepository(r).
1✔
292
                WithID(repoID).
1✔
293
                WithProjectID(projID).
1✔
294
                WithProviderID(ent.Entity.ProviderID), nil
1✔
295
}
296

297
func (e *EEA) buildArtifactInfoWrapper(
298
        ctx context.Context,
299
        artID uuid.UUID,
300
        projID uuid.UUID,
301
) (*entities.EntityInfoWrapper, error) {
2✔
302
        ent, err := e.entityFetcher.EntityWithPropertiesByID(ctx, artID, nil)
2✔
303
        if err != nil {
2✔
304
                return nil, fmt.Errorf("error fetching entity: %w", err)
×
305
        }
×
306

307
        if ent.Entity.ProjectID != projID {
2✔
308
                return nil, fmt.Errorf("entity %s does not belong to project %s", artID, projID)
×
309
        }
×
310

311
        rawPR, err := e.entityFetcher.EntityWithPropertiesAsProto(ctx, ent, e.provMan)
2✔
312
        if err != nil {
2✔
313
                return nil, fmt.Errorf("error converting entity to protobuf: %w", err)
×
314
        }
×
315

316
        a, ok := rawPR.(*minderv1.Artifact)
2✔
317
        if !ok {
2✔
318
                return nil, fmt.Errorf("error converting entity to artifact")
×
319
        }
×
320

321
        eiw := entities.NewEntityInfoWrapper().
2✔
322
                WithProjectID(projID).
2✔
323
                WithArtifact(a).
2✔
324
                WithID(artID).
2✔
325
                WithProviderID(ent.Entity.ProviderID)
2✔
326
        return eiw, nil
2✔
327
}
328

329
func (e *EEA) buildPullRequestInfoWrapper(
330
        ctx context.Context,
331
        prID uuid.UUID,
332
        projID uuid.UUID,
333
) (*entities.EntityInfoWrapper, error) {
1✔
334
        ent, err := e.entityFetcher.EntityWithPropertiesByID(ctx, prID, nil)
1✔
335
        if err != nil {
1✔
336
                return nil, fmt.Errorf("error fetching entity: %w", err)
×
337
        }
×
338

339
        if ent.Entity.ProjectID != projID {
1✔
340
                return nil, fmt.Errorf("entity %s does not belong to project %s", prID, projID)
×
341
        }
×
342

343
        rawPR, err := e.entityFetcher.EntityWithPropertiesAsProto(ctx, ent, e.provMan)
1✔
344
        if err != nil {
1✔
345
                return nil, fmt.Errorf("error converting entity to protobuf: %w", err)
×
346
        }
×
347

348
        pr, ok := rawPR.(*pbinternal.PullRequest)
1✔
349
        if !ok {
1✔
350
                return nil, fmt.Errorf("error converting entity to pull request")
×
351
        }
×
352

353
        return entities.NewEntityInfoWrapper().
1✔
354
                WithProjectID(projID).
1✔
355
                WithPullRequest(pr).
1✔
356
                WithID(prID).
1✔
357
                WithProviderID(ent.Entity.ProviderID), nil
1✔
358
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc