• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

stacklok / minder / 11290223436

11 Oct 2024 09:51AM UTC coverage: 53.179% (+0.05%) from 53.125%
11290223436

Pull #4705

github

web-flow
Merge 14aa03bb6 into f191f5d1e
Pull Request #4705: Remove database use from handleRelevantRepositoryEvent

61 of 71 new or added lines in 4 files covered. (85.92%)

4 existing lines in 2 files now uncovered.

14613 of 27479 relevant lines covered (53.18%)

41.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.73
/internal/eea/eea.go
1
// Copyright 2023 Stacklok, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
// Package rule provides the CLI subcommand for managing rules
15

16
// Package eea provides objects and event handlers for the EEA. EEA stands for
17
// Event Execution Aggregator. The EEA is responsible for aggregating events
18
// from the webhook and making sure we don't send too many events to the
19
// executor engine.
20
package eea
21

22
import (
23
        "context"
24
        "database/sql"
25
        "errors"
26
        "fmt"
27

28
        "github.com/ThreeDotsLabs/watermill/message"
29
        "github.com/google/uuid"
30
        "github.com/rs/zerolog"
31

32
        serverconfig "github.com/stacklok/minder/internal/config/server"
33
        "github.com/stacklok/minder/internal/db"
34
        "github.com/stacklok/minder/internal/engine/entities"
35
        "github.com/stacklok/minder/internal/entities/properties/service"
36
        "github.com/stacklok/minder/internal/events"
37
        "github.com/stacklok/minder/internal/providers/manager"
38
        minderv1 "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
39
)
40

41
// EEA is the Event Execution Aggregator
42
type EEA struct {
43
        querier db.Store
44
        evt     events.Publisher
45
        cfg     *serverconfig.AggregatorConfig
46

47
        entityFetcher service.PropertiesService
48
        provMan       manager.ProviderManager
49
}
50

51
// NewEEA creates a new EEA
52
func NewEEA(querier db.Store, evt events.Publisher, cfg *serverconfig.AggregatorConfig,
53
        ef service.PropertiesService, provMan manager.ProviderManager) *EEA {
8✔
54
        return &EEA{
8✔
55
                querier:       querier,
8✔
56
                evt:           evt,
8✔
57
                cfg:           cfg,
8✔
58
                entityFetcher: ef,
8✔
59
                provMan:       provMan,
8✔
60
        }
8✔
61
}
8✔
62

63
// Register implements the Consumer interface.
64
func (e *EEA) Register(r events.Registrar) {
1✔
65
        r.Register(events.TopicQueueEntityFlush, e.FlushMessageHandler)
1✔
66
}
1✔
67

68
// AggregateMiddleware will pass on the event to the executor engine
69
// if the event is ready to be executed. Else it'll cache
70
// the event until it's ready to be executed.
71
func (e *EEA) AggregateMiddleware(h message.HandlerFunc) message.HandlerFunc {
2✔
72
        return func(msg *message.Message) ([]*message.Message, error) {
103✔
73
                msg, err := e.aggregate(msg)
101✔
74
                if err != nil {
101✔
75
                        return nil, fmt.Errorf("error aggregating event: %w", err)
×
76
                }
×
77

78
                if msg == nil {
200✔
79
                        return nil, nil
99✔
80
                }
99✔
81

82
                return h(msg)
2✔
83
        }
84
}
85

86
// nolint:gocyclo // TODO: hacking in the TODO about foreign keys pushed this over the limit.
87
func (e *EEA) aggregate(msg *message.Message) (*message.Message, error) {
101✔
88
        ctx := msg.Context()
101✔
89
        inf, err := entities.ParseEntityEvent(msg)
101✔
90
        if err != nil {
101✔
91
                return nil, fmt.Errorf("error unmarshalling payload: %w", err)
×
92
        }
×
93

94
        projectID := inf.ProjectID
101✔
95

101✔
96
        logger := zerolog.Ctx(ctx).With().
101✔
97
                Str("component", "EEA").
101✔
98
                // This is added for consistency with how watermill
101✔
99
                // tracks message UUID when logging.
101✔
100
                Str("message_uuid", msg.UUID).
101✔
101
                Str("entity", inf.Type.ToString()).
101✔
102
                Logger()
101✔
103

101✔
104
        entityID, err := inf.GetID()
101✔
105
        if err != nil {
101✔
106
                logger.Debug().AnErr("error getting entity ID", err).Msgf("Entity ID was not set for event %s", inf.Type)
×
107
                // Nothing we can do after this.
×
108
                return nil, nil
×
109
        }
×
110

111
        logger = logger.With().Str("entity_id", entityID.String()).Logger()
101✔
112

101✔
113
        tx, err := e.querier.BeginTransaction()
101✔
114
        if err != nil {
101✔
115
                return nil, fmt.Errorf("error beginning transaction: %w", err)
×
116
        }
×
117
        qtx := e.querier.GetQuerierWithTransaction(tx)
101✔
118

101✔
119
        // We'll only attempt to lock if the entity exists.
101✔
120
        _, err = qtx.GetEntityByID(ctx, entityID)
101✔
121
        if err != nil {
101✔
122
                // explicit rollback if entity had an issue.
×
123
                _ = e.querier.Rollback(tx)
×
124
                if errors.Is(err, sql.ErrNoRows) {
×
125
                        logger.Debug().Msg("entity not found")
×
126
                        return nil, nil
×
127
                }
×
128
                return nil, fmt.Errorf("error getting entity: %w", err)
×
129
        }
130

131
        res, err := qtx.LockIfThresholdNotExceeded(ctx, db.LockIfThresholdNotExceededParams{
101✔
132
                Entity:           entities.EntityTypeToDB(inf.Type),
101✔
133
                EntityInstanceID: entityID,
101✔
134
                ProjectID:        projectID,
101✔
135
                Interval:         fmt.Sprintf("%d", e.cfg.LockInterval),
101✔
136
        })
101✔
137
        if err == nil {
103✔
138
                if err := tx.Commit(); err != nil {
2✔
139
                        return nil, fmt.Errorf("error committing transaction: %w", err)
×
140
                }
×
141
        } else {
99✔
142
                _ = e.querier.Rollback(tx)
99✔
143
        }
99✔
144

145
        // if nothing was retrieved from the database, then we can assume
146
        // that the event is not ready to be executed.
147
        if err != nil && errors.Is(err, sql.ErrNoRows) {
200✔
148
                logger.Info().Msg("executor not ready to process event. Queuing in flush cache.")
99✔
149

99✔
150
                _, err := e.querier.EnqueueFlush(ctx, db.EnqueueFlushParams{
99✔
151
                        Entity:           entities.EntityTypeToDB(inf.Type),
99✔
152
                        EntityInstanceID: entityID,
99✔
153
                        ProjectID:        projectID,
99✔
154
                })
99✔
155
                if err != nil {
197✔
156
                        // We already have this item in the queue.
98✔
157
                        if errors.Is(err, sql.ErrNoRows) {
196✔
158
                                return nil, nil
98✔
159
                        }
98✔
160
                        return nil, fmt.Errorf("error enqueuing flush: %w", err)
×
161
                }
162

163
                return nil, nil
1✔
164
        } else if err != nil {
2✔
165
                logger.Err(err).Msg("error locking event")
×
166
                return nil, fmt.Errorf("error locking: %w", err)
×
167
        }
×
168

169
        logger.Info().Str("execution_id", res.LockedBy.String()).Msg("event ready to be executed")
2✔
170
        msg.Metadata.Set(entities.ExecutionIDKey, res.LockedBy.String())
2✔
171

2✔
172
        return msg, nil
2✔
173
}
174

175
// FlushMessageHandler will flush the cache of events to the executor engine
176
// if the event is ready to be executed.
177
func (e *EEA) FlushMessageHandler(msg *message.Message) error {
60✔
178
        ctx := msg.Context()
60✔
179

60✔
180
        inf, err := entities.ParseEntityEvent(msg)
60✔
181
        if err != nil {
60✔
182
                return fmt.Errorf("error unmarshalling payload: %w", err)
×
183
        }
×
184

185
        eID, err := inf.GetID()
60✔
186
        if err != nil {
60✔
187
                return fmt.Errorf("error getting entity ID: %w", err)
×
188
        }
×
189

190
        logger := zerolog.Ctx(ctx).With().
60✔
191
                Str("component", "EEA").
60✔
192
                Str("function", "FlushMessageHandler").
60✔
193
                // This is added for consistency with how watermill
60✔
194
                // tracks message UUID when logging.
60✔
195
                Str("message_uuid", msg.UUID).
60✔
196
                Str("entity", inf.Type.ToString()).Logger()
60✔
197

60✔
198
        logger.Debug().Msg("flushing event")
60✔
199

60✔
200
        _, err = e.querier.FlushCache(ctx, eID)
60✔
201
        // Nothing to do here. If we can't flush the cache, it means
60✔
202
        // that the event has already been executed.
60✔
203
        if err != nil && errors.Is(err, sql.ErrNoRows) {
60✔
UNCOV
204
                zerolog.Ctx(ctx).Debug().Msg("no flushing needed")
×
UNCOV
205
                return nil
×
206
        } else if err != nil {
115✔
207
                return fmt.Errorf("error flushing cache: %w", err)
55✔
208
        }
55✔
209

210
        logger.Debug().Msg("re-publishing event because of flush")
5✔
211

5✔
212
        // Now that we've flushed the event, let's try to publish it again
5✔
213
        // which means, go through the locking process again.
5✔
214
        if err := inf.Publish(e.evt); err != nil {
5✔
215
                return fmt.Errorf("error publishing execute event: %w", err)
×
216
        }
×
217

218
        return nil
5✔
219
}
220

221
// FlushAll will flush all events in the cache to the executor engine
222
func (e *EEA) FlushAll(ctx context.Context) error {
7✔
223
        caches, err := e.querier.ListFlushCache(ctx)
7✔
224
        if err != nil {
8✔
225
                return fmt.Errorf("error listing flush cache: %w", err)
1✔
226
        }
1✔
227

228
        for _, cache := range caches {
11✔
229
                cache := cache
5✔
230

5✔
231
                eiw, err := e.buildEntityWrapper(ctx, cache.Entity,
5✔
232
                        cache.ProjectID, cache.EntityInstanceID)
5✔
233
                if err != nil {
6✔
234
                        if errors.Is(err, sql.ErrNoRows) || errors.Is(err, service.ErrEntityNotFound) {
2✔
235
                                continue
1✔
236
                        }
237
                        return fmt.Errorf("error building entity wrapper: %w", err)
×
238
                }
239

240
                msg, err := eiw.BuildMessage()
4✔
241
                if err != nil {
4✔
242
                        return fmt.Errorf("error building message: %w", err)
×
243
                }
×
244

245
                msg.SetContext(ctx)
4✔
246

4✔
247
                if err := e.FlushMessageHandler(msg); err != nil {
4✔
248
                        return fmt.Errorf("error flushing messages: %w", err)
×
249
                }
×
250
        }
251

252
        return nil
6✔
253
}
254

255
func (e *EEA) buildEntityWrapper(
256
        ctx context.Context,
257
        entity db.Entities,
258
        projID uuid.UUID,
259
        entityID uuid.UUID,
260
) (*entities.EntityInfoWrapper, error) {
5✔
261
        switch entity {
5✔
262
        case db.EntitiesRepository:
2✔
263
                return e.buildRepositoryInfoWrapper(ctx, entityID, projID)
2✔
264
        case db.EntitiesArtifact:
2✔
265
                return e.buildArtifactInfoWrapper(ctx, entityID, projID)
2✔
266
        case db.EntitiesPullRequest:
1✔
267
                return e.buildPullRequestInfoWrapper(ctx, entityID, projID)
1✔
268
        case db.EntitiesBuildEnvironment, db.EntitiesRelease,
269
                db.EntitiesPipelineRun, db.EntitiesTaskRun, db.EntitiesBuild:
×
270
                return nil, fmt.Errorf("entity type %q not yet supported", entity)
×
271
        default:
×
272
                return nil, fmt.Errorf("unknown entity type: %q", entity)
×
273
        }
274
}
275

276
func (e *EEA) buildRepositoryInfoWrapper(
277
        ctx context.Context,
278
        repoID uuid.UUID,
279
        projID uuid.UUID,
280
) (*entities.EntityInfoWrapper, error) {
2✔
281
        ent, err := e.entityFetcher.EntityWithPropertiesByID(ctx, repoID, nil)
2✔
282
        if err != nil {
3✔
283
                return nil, fmt.Errorf("error fetching entity: %w", err)
1✔
284
        }
1✔
285

286
        if ent.Entity.ProjectID != projID {
1✔
287
                return nil, fmt.Errorf("entity %s does not belong to project %s", repoID, projID)
×
288
        }
×
289

290
        rawRepo, err := e.entityFetcher.EntityWithPropertiesAsProto(ctx, ent, e.provMan)
1✔
291
        if err != nil {
1✔
292
                return nil, fmt.Errorf("error converting entity to protobuf: %w", err)
×
293
        }
×
294

295
        r, ok := rawRepo.(*minderv1.Repository)
1✔
296
        if !ok {
1✔
297
                return nil, fmt.Errorf("error converting entity to repository")
×
298
        }
×
299

300
        return entities.NewEntityInfoWrapper().
1✔
301
                WithRepository(r).
1✔
302
                WithID(repoID).
1✔
303
                WithProjectID(projID).
1✔
304
                WithProviderID(ent.Entity.ProviderID), nil
1✔
305
}
306

307
func (e *EEA) buildArtifactInfoWrapper(
308
        ctx context.Context,
309
        artID uuid.UUID,
310
        projID uuid.UUID,
311
) (*entities.EntityInfoWrapper, error) {
2✔
312
        ent, err := e.entityFetcher.EntityWithPropertiesByID(ctx, artID, nil)
2✔
313
        if err != nil {
2✔
314
                return nil, fmt.Errorf("error fetching entity: %w", err)
×
315
        }
×
316

317
        if ent.Entity.ProjectID != projID {
2✔
318
                return nil, fmt.Errorf("entity %s does not belong to project %s", artID, projID)
×
319
        }
×
320

321
        rawPR, err := e.entityFetcher.EntityWithPropertiesAsProto(ctx, ent, e.provMan)
2✔
322
        if err != nil {
2✔
323
                return nil, fmt.Errorf("error converting entity to protobuf: %w", err)
×
324
        }
×
325

326
        a, ok := rawPR.(*minderv1.Artifact)
2✔
327
        if !ok {
2✔
328
                return nil, fmt.Errorf("error converting entity to artifact")
×
329
        }
×
330

331
        eiw := entities.NewEntityInfoWrapper().
2✔
332
                WithProjectID(projID).
2✔
333
                WithArtifact(a).
2✔
334
                WithID(artID).
2✔
335
                WithProviderID(ent.Entity.ProviderID)
2✔
336
        return eiw, nil
2✔
337
}
338

339
func (e *EEA) buildPullRequestInfoWrapper(
340
        ctx context.Context,
341
        prID uuid.UUID,
342
        projID uuid.UUID,
343
) (*entities.EntityInfoWrapper, error) {
1✔
344
        ent, err := e.entityFetcher.EntityWithPropertiesByID(ctx, prID, nil)
1✔
345
        if err != nil {
1✔
346
                return nil, fmt.Errorf("error fetching entity: %w", err)
×
347
        }
×
348

349
        if ent.Entity.ProjectID != projID {
1✔
350
                return nil, fmt.Errorf("entity %s does not belong to project %s", prID, projID)
×
351
        }
×
352

353
        rawPR, err := e.entityFetcher.EntityWithPropertiesAsProto(ctx, ent, e.provMan)
1✔
354
        if err != nil {
1✔
355
                return nil, fmt.Errorf("error converting entity to protobuf: %w", err)
×
356
        }
×
357

358
        pr, ok := rawPR.(*minderv1.PullRequest)
1✔
359
        if !ok {
1✔
360
                return nil, fmt.Errorf("error converting entity to pull request")
×
361
        }
×
362

363
        return entities.NewEntityInfoWrapper().
1✔
364
                WithProjectID(projID).
1✔
365
                WithPullRequest(pr).
1✔
366
                WithID(prID).
1✔
367
                WithProviderID(ent.Entity.ProviderID), nil
1✔
368
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc