• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mindersec / minder / 24001775928

05 Apr 2026 12:44PM UTC coverage: 58.311% (+0.01%) from 58.298%
24001775928

Pull #6278

github

web-flow
Merge 40e538acf into 63a6a8e5f
Pull Request #6278: feat: make executor event handling timeout configurable

12 of 13 new or added lines in 2 files covered. (92.31%)

16 existing lines in 1 file now uncovered.

19225 of 32970 relevant lines covered (58.31%)

36.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.86
/internal/engine/handler.go
1
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
2
// SPDX-License-Identifier: Apache-2.0
3

4
package engine
5

6
import (
7
        "context"
8
        "fmt"
9
        "slices"
10
        "sync"
11
        "time"
12

13
        "github.com/ThreeDotsLabs/watermill/message"
14
        "github.com/rs/zerolog"
15

16
        "github.com/mindersec/minder/internal/engine/engcontext"
17
        "github.com/mindersec/minder/internal/engine/entities"
18
        minderlogger "github.com/mindersec/minder/internal/logger"
19
        pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
20
        "github.com/mindersec/minder/pkg/eventer/constants"
21
        "github.com/mindersec/minder/pkg/eventer/interfaces"
22
)
23

24
const (
25
        // DefaultExecutionTimeout is the timeout for execution of a set
26
        // of profiles on an entity.
27
        DefaultExecutionTimeout = 5 * time.Minute
28
        // ArtifactSignatureWaitPeriod is the waiting period for potential artifact signature to be available
29
        // before proceeding with evaluation.
30
        ArtifactSignatureWaitPeriod = 10 * time.Second
31
)
32

33
// ExecutorEventHandler is responsible for consuming entity events, passing
34
// entities to the executor, and then publishing the results.
35
type ExecutorEventHandler struct {
36
        evt                    interfaces.Publisher
37
        handlerMiddleware      []message.HandlerMiddleware
38
        wgEntityEventExecution *sync.WaitGroup
39
        executor               Executor
40
        executionTimeout       time.Duration
41
        // cancels are a set of cancel functions for current entity events in flight.
42
        // This allows us to cancel rule evaluation directly when terminationContext
43
        // is cancelled.
44
        cancels []*context.CancelFunc
45
        lock    sync.Mutex
46
}
47

48
// NewExecutorEventHandler creates the event handler for the executor
49
func NewExecutorEventHandler(
50
        ctx context.Context,
51
        evt interfaces.Publisher,
52
        handlerMiddleware []message.HandlerMiddleware,
53
        executor Executor,
54
        executionTimeout time.Duration,
55
) *ExecutorEventHandler {
3✔
56
        if executionTimeout <= 0 {
4✔
57
                executionTimeout = DefaultExecutionTimeout
1✔
58
        }
1✔
59
        eh := &ExecutorEventHandler{
3✔
60
                evt:                    evt,
3✔
61
                wgEntityEventExecution: &sync.WaitGroup{},
3✔
62
                handlerMiddleware:      handlerMiddleware,
3✔
63
                executor:               executor,
3✔
64
                executionTimeout:       executionTimeout,
3✔
65
        }
3✔
66
        zerolog.Ctx(ctx).Info().
3✔
67
                Dur("execution_timeout", executionTimeout).
3✔
68
                Msg("executor event handler initialized")
3✔
69
        go func() {
6✔
70
                <-ctx.Done()
3✔
71
                eh.lock.Lock()
3✔
72
                defer eh.lock.Unlock()
3✔
73

3✔
74
                for _, cancel := range eh.cancels {
3✔
UNCOV
75
                        (*cancel)()
×
UNCOV
76
                }
×
77
        }()
78

79
        return eh
3✔
80
}
81

82
// Register implements the Consumer interface.
UNCOV
83
func (e *ExecutorEventHandler) Register(r interfaces.Registrar) {
×
UNCOV
84
        r.Register(constants.TopicQueueEntityEvaluate, e.HandleEntityEvent, e.handlerMiddleware...)
×
UNCOV
85
}
×
86

87
// Wait waits for all the entity executions to finish.
88
func (e *ExecutorEventHandler) Wait() {
1✔
89
        e.wgEntityEventExecution.Wait()
1✔
90
}
1✔
91

92
// ExecutionTimeout returns the configured execution timeout for the handler.
93
func (e *ExecutorEventHandler) ExecutionTimeout() time.Duration {
2✔
94
        return e.executionTimeout
2✔
95
}
2✔
96

97
// HandleEntityEvent handles events coming from webhooks/signals
98
// as well as the init event.
99
func (e *ExecutorEventHandler) HandleEntityEvent(msg *message.Message) error {
2✔
100

2✔
101
        // NOTE: we're _deliberately_ "escaping" from the parent context's Cancel/Done
2✔
102
        // completion, because the default watermill behavior for both Go channels and
2✔
103
        // SQL is to process messages sequentially, but we need additional parallelism
2✔
104
        // beyond that.  When we switch to a different message processing system, we
2✔
105
        // should aim to remove this goroutine altogether and have the messaging system
2✔
106
        // provide the parallelism.
2✔
107
        // We _do_ still want to cancel on shutdown, however.
2✔
108
        // TODO: Make this timeout configurable
2✔
109
        msgCtx := context.WithoutCancel(msg.Context())
2✔
110
        //nolint:gosec // this is called when we iterate over e.cancels
2✔
111
        msgCtx, shutdownCancel := context.WithCancel(msgCtx)
2✔
112

2✔
113
        e.lock.Lock()
2✔
114
        e.cancels = append(e.cancels, &shutdownCancel)
2✔
115
        e.lock.Unlock()
2✔
116

2✔
117
        // Let's not share memory with the caller.  Note that this does not copy Context
2✔
118
        msg = msg.Copy()
2✔
119

2✔
120
        inf, err := entities.ParseEntityEvent(msg)
2✔
121
        if err != nil {
2✔
UNCOV
122
                return fmt.Errorf("error unmarshalling payload: %w", err)
×
123
        }
×
124

125
        e.wgEntityEventExecution.Add(1)
2✔
126
        go func() {
4✔
127
                defer e.wgEntityEventExecution.Done()
2✔
128
                if inf.Type == pb.Entity_ENTITY_ARTIFACTS {
2✔
129
                        time.Sleep(ArtifactSignatureWaitPeriod)
×
130
                }
×
131

132
                ctx := msgCtx
2✔
133
                cancel := func() {}
4✔
134
                defer cancel()
2✔
135
                defer func() {
4✔
136
                        e.lock.Lock()
2✔
137
                        e.cancels = slices.DeleteFunc(e.cancels, func(cf *context.CancelFunc) bool {
4✔
138
                                return cf == &shutdownCancel
2✔
139
                        })
2✔
140
                        e.lock.Unlock()
2✔
141
                }()
142

143
                ctx = engcontext.WithEntityContext(ctx, &engcontext.EntityContext{
2✔
144
                        Project: engcontext.Project{ID: inf.ProjectID},
2✔
145
                        // TODO: extract Provider name from ProviderID?
2✔
146
                })
2✔
147

2✔
148
                ts := minderlogger.BusinessRecord(ctx)
2✔
149
                ctx = ts.WithTelemetry(ctx)
2✔
150

2✔
151
                logger := zerolog.Ctx(ctx)
2✔
152
                if err := inf.WithExecutionIDFromMessage(msg); err != nil {
2✔
UNCOV
153
                        logger.Info().
×
UNCOV
154
                                Str("message_id", msg.UUID).
×
UNCOV
155
                                Msg("message does not contain execution ID, skipping")
×
156
                        return
×
157
                }
×
158

159
                err := e.executor.EvalEntityEvent(ctx, inf)
2✔
160

2✔
161
                // record telemetry regardless of error. We explicitly record telemetry
2✔
162
                // here even though we also record it in the middleware because the evaluation
2✔
163
                // is done in a separate goroutine which usually still runs after the middleware
2✔
164
                // had already recorded the telemetry.
2✔
165
                logMsg := logger.Info()
2✔
166
                if err != nil {
2✔
UNCOV
167
                        logMsg = logger.Error()
×
UNCOV
168
                }
×
169
                ts.Record(logMsg).Send()
2✔
170

2✔
171
                if err != nil {
2✔
172
                        logger.Info().
×
173
                                Str("project", inf.ProjectID.String()).
×
174
                                Str("provider_id", inf.ProviderID.String()).
×
UNCOV
175
                                Str("entity", inf.Type.String()).
×
UNCOV
176
                                Str("entity_id", inf.EntityID.String()).
×
UNCOV
177
                                Err(err).Msg("got error while evaluating entity event")
×
178
                }
×
179

180
                // We don't need to unset the execution ID because the event is going to be
181
                // deleted from the database anyway. The aggregator will take care of that.
182
                msg, err := inf.BuildMessage()
2✔
183
                if err != nil {
2✔
UNCOV
184
                        logger.Err(err).Msg("error building message")
×
UNCOV
185
                        return
×
186
                }
×
187

188
                // Publish the result of the entity evaluation
189
                if err := e.evt.Publish(constants.TopicQueueEntityFlush, msg); err != nil {
2✔
190
                        logger.Err(err).Msg("error publishing flush event")
×
191
                }
×
192
        }()
193

194
        return nil
2✔
195
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc