• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mindersec / minder / 25079233025

28 Apr 2026 09:42PM UTC coverage: 60.446%. First build
25079233025

Pull #6281

github

web-flow
Merge cb6dd0a87 into 45b9c0d1a
Pull Request #6281: refactor: populate provider field in entity context using provider ID fallback

49 of 64 new or added lines in 2 files covered. (76.56%)

20420 of 33782 relevant lines covered (60.45%)

37.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.47
/internal/engine/handler.go
1
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
2
// SPDX-License-Identifier: Apache-2.0
3

4
package engine
5

6
import (
7
        "context"
8
        "fmt"
9
        "slices"
10
        "sync"
11
        "time"
12

13
        "github.com/ThreeDotsLabs/watermill/message"
14
        "github.com/rs/zerolog"
15

16
        "github.com/mindersec/minder/internal/db"
17
        "github.com/mindersec/minder/internal/engine/engcontext"
18
        "github.com/mindersec/minder/internal/engine/entities"
19
        minderlogger "github.com/mindersec/minder/internal/logger"
20
        pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
21
        "github.com/mindersec/minder/pkg/eventer/constants"
22
        "github.com/mindersec/minder/pkg/eventer/interfaces"
23
)
24

25
const (
26
        // DefaultExecutionTimeout defines the default timeout for entity execution.
27
        DefaultExecutionTimeout = 5 * time.Minute
28

29
        // ArtifactSignatureWaitPeriod defines delay before processing artifact events.
30
        ArtifactSignatureWaitPeriod = 10 * time.Second
31
)
32

33
// ExecutorEventHandler handles entity evaluation events.
34
type ExecutorEventHandler struct {
35
        evt                    interfaces.Publisher
36
        handlerMiddleware      []message.HandlerMiddleware
37
        wgEntityEventExecution *sync.WaitGroup
38
        executor               Executor
39

40
        executionTimeout time.Duration
41
        store            db.Store
42

43
        // cancels are the cancel functions for entity events currently in flight.
44
        // This allows us to cancel rule evaluation directly when the parent
45
        // context is cancelled.
46
        cancels []*context.CancelFunc
47
        lock    sync.Mutex
48
        closed  bool
49
}
50

51
// NewExecutorEventHandler creates a new ExecutorEventHandler.
52
//
53
// This constructor is kept for compatibility with legacy call sites.
54
// It uses the default timeout and does not resolve provider names from store.
55
func NewExecutorEventHandler(
56
        ctx context.Context,
57
        evt interfaces.Publisher,
58
        handlerMiddleware []message.HandlerMiddleware,
59
        executor Executor,
60
) *ExecutorEventHandler {
1✔
61
        return NewExecutorEventHandlerWithStore(
1✔
62
                ctx,
1✔
63
                evt,
1✔
64
                handlerMiddleware,
1✔
65
                executor,
1✔
66
                DefaultExecutionTimeout,
1✔
67
                nil,
1✔
68
        )
1✔
69
}
1✔
70

71
// NewExecutorEventHandlerWithStore creates a new ExecutorEventHandler and
72
// configures timeout and provider lookup store.
73
func NewExecutorEventHandlerWithStore(
74
        ctx context.Context,
75
        evt interfaces.Publisher,
76
        handlerMiddleware []message.HandlerMiddleware,
77
        executor Executor,
78
        executionTimeout time.Duration,
79
        store db.Store,
80
) *ExecutorEventHandler {
2✔
81

2✔
82
        if executionTimeout <= 0 {
2✔
NEW
83
                executionTimeout = DefaultExecutionTimeout
×
NEW
84
        }
×
85

86
        eh := &ExecutorEventHandler{
2✔
87
                evt:                    evt,
2✔
88
                wgEntityEventExecution: &sync.WaitGroup{},
2✔
89
                handlerMiddleware:      handlerMiddleware,
2✔
90
                executor:               executor,
2✔
91
                executionTimeout:       executionTimeout,
2✔
92
                store:                  store,
2✔
93
        }
2✔
94

2✔
95
        zerolog.Ctx(ctx).Debug().
2✔
96
                Dur("execution_timeout", executionTimeout).
2✔
97
                Msg("executor event handler initialized")
2✔
98

2✔
99
        go func() {
4✔
100
                <-ctx.Done()
2✔
101
                eh.lock.Lock()
2✔
102
                defer eh.lock.Unlock()
2✔
103

2✔
104
                eh.closed = true
2✔
105

2✔
106
                for _, cancel := range eh.cancels {
2✔
107
                        (*cancel)()
×
108
                }
×
109
        }()
110

111
        return eh
2✔
112
}
113

114
// Register registers the handler for entity evaluation events.
115
func (e *ExecutorEventHandler) Register(r interfaces.Registrar) {
×
NEW
116
        r.Register(constants.TopicQueueEntityEvaluate, e.HandleEntityEvent, e.handlerMiddleware...)
×
117
}
×
118

119
// Wait blocks until all entity executions are complete.
120
func (e *ExecutorEventHandler) Wait() {
2✔
121
        e.wgEntityEventExecution.Wait()
2✔
122
}
2✔
123

124
// HandleEntityEvent processes incoming entity events.
125
func (e *ExecutorEventHandler) HandleEntityEvent(msg *message.Message) error {
3✔
126

3✔
127
        // NOTE: we're _deliberately_ "escaping" from the parent context's Cancel/Done
3✔
128
        // completion, because the default watermill behavior for both Go channels and
3✔
129
        // SQL is to process messages sequentially, but we need additional parallelism
3✔
130
        // beyond that. When we switch to a different message processing system, we
3✔
131
        // should aim to remove this goroutine altogether and have the messaging system
3✔
132
        // provide the parallelism.
3✔
133
        // We _do_ still want to cancel on shutdown, however.
3✔
134
        msgCtx := context.WithoutCancel(msg.Context())
3✔
135

3✔
136
        // This allows us to cancel rule evaluation directly when terminationContext
3✔
137
        // is cancelled.
3✔
138
        //nolint:gosec
3✔
139
        msgCtx, shutdownCancel := context.WithCancel(msgCtx)
3✔
140

3✔
141
        e.lock.Lock()
3✔
142
        if e.closed {
4✔
143
                e.lock.Unlock()
1✔
144
                shutdownCancel()
1✔
145
                return nil
1✔
146
        }
1✔
147
        e.cancels = append(e.cancels, &shutdownCancel)
2✔
148
        e.lock.Unlock()
2✔
149

2✔
150
        msg = msg.Copy()
2✔
151

2✔
152
        inf, err := entities.ParseEntityEvent(msg)
2✔
153
        if err != nil {
2✔
154
                return fmt.Errorf("error unmarshalling payload: %w", err)
×
155
        }
×
156

157
        e.wgEntityEventExecution.Add(1)
2✔
158

2✔
159
        go func() {
4✔
160
                defer e.wgEntityEventExecution.Done()
2✔
161

2✔
162
                if inf.Type == pb.Entity_ENTITY_ARTIFACTS {
2✔
163
                        // Wait for artifact signatures, but allow early exit on shutdown
×
NEW
164
                        select {
×
165
                        case <-time.After(ArtifactSignatureWaitPeriod):
×
166
                        case <-msgCtx.Done():
×
167
                                // stop waiting early, but continue execution
168
                        }
169
                }
170

171
                timeout := e.executionTimeout
2✔
172
                if timeout <= 0 {
2✔
NEW
173
                        timeout = DefaultExecutionTimeout
×
NEW
174
                }
×
175
                ctx, cancel := context.WithTimeout(msgCtx, timeout)
2✔
176
                defer cancel()
2✔
177

2✔
178
                defer func() {
4✔
179
                        e.lock.Lock()
2✔
180
                        e.cancels = slices.DeleteFunc(e.cancels, func(cf *context.CancelFunc) bool {
5✔
181
                                return cf == &shutdownCancel
3✔
182
                        })
3✔
183
                        e.lock.Unlock()
2✔
184
                }()
185

186
                providerName := ""
2✔
187
                if e.store != nil {
4✔
188
                        provider, err := e.store.GetProviderByID(ctx, inf.ProviderID)
2✔
189
                        if err != nil {
2✔
NEW
190
                                zerolog.Ctx(ctx).Debug().
×
191
                                        Err(err).
×
192
                                        Str("provider_id", inf.ProviderID.String()).
×
193
                                        Msg("failed to resolve provider name")
×
194
                        } else if provider.Name != "" {
4✔
195
                                providerName = provider.Name
2✔
196
                        }
2✔
197
                }
198

199
                ctx = engcontext.WithEntityContext(ctx, &engcontext.EntityContext{
2✔
200
                        Project: engcontext.Project{ID: inf.ProjectID},
2✔
201
                        Provider: engcontext.Provider{
2✔
202
                                Name: providerName,
2✔
203
                        },
2✔
204
                })
2✔
205

2✔
206
                ts := minderlogger.BusinessRecord(ctx)
2✔
207
                ctx = ts.WithTelemetry(ctx)
2✔
208

2✔
209
                logger := zerolog.Ctx(ctx)
2✔
210

2✔
211
                if err := inf.WithExecutionIDFromMessage(msg); err != nil {
2✔
212
                        logger.Debug().
×
NEW
213
                                Str("message_id", msg.UUID).
×
NEW
214
                                Msg("message does not contain execution ID, skipping")
×
NEW
215
                        return
×
NEW
216
                }
×
217

218
                err = e.executor.EvalEntityEvent(ctx, inf)
2✔
219

2✔
220
                logMsg := logger.Info()
2✔
221
                if err != nil {
2✔
222
                        logMsg = logger.Error()
×
223
                }
×
224
                ts.Record(logMsg).Send()
2✔
225

2✔
226
                // record telemetry regardless of error. We explicitly record telemetry
2✔
227
                // here even though we also record it in the middleware because the evaluation
2✔
228
                // is done in a separate goroutine which usually still runs after the middleware
2✔
229
                // had already recorded the telemetry.
2✔
230

2✔
231
                if err != nil {
2✔
232
                        logger.Info().
×
233
                                Str("project", inf.ProjectID.String()).
×
234
                                Str("provider_id", inf.ProviderID.String()).
×
235
                                Str("entity", inf.Type.String()).
×
236
                                Str("entity_id", inf.EntityID.String()).
×
237
                                Err(err).
×
238
                                Msg("got error while evaluating entity event")
×
239
                }
×
240

241
                msg, err := inf.BuildMessage()
2✔
242
                if err != nil {
2✔
243
                        logger.Err(err).Msg("error building message")
×
244
                        return
×
245
                }
×
246

247
                if err := e.evt.Publish(constants.TopicQueueEntityFlush, msg); err != nil {
2✔
248
                        logger.Err(err).Msg("error publishing flush event")
×
249
                }
×
250
        }()
251

252
        return nil
2✔
253
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc