• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mindersec / minder / 25076733413

28 Apr 2026 08:44PM UTC coverage: 60.47% (+0.5%) from 60.0%
25076733413

Pull #6278

github

web-flow
Merge 7277590bc into 45b9c0d1a
Pull Request #6278: feat: make executor event handling timeout configurable

27 of 29 new or added lines in 2 files covered. (93.1%)

427 existing lines in 13 files now uncovered.

20410 of 33752 relevant lines covered (60.47%)

37.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.85
/internal/engine/handler.go
1
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
2
// SPDX-License-Identifier: Apache-2.0
3

4
package engine
5

6
import (
7
        "context"
8
        "fmt"
9
        "slices"
10
        "sync"
11
        "time"
12

13
        "github.com/ThreeDotsLabs/watermill/message"
14
        "github.com/rs/zerolog"
15

16
        "github.com/mindersec/minder/internal/engine/engcontext"
17
        "github.com/mindersec/minder/internal/engine/entities"
18
        minderlogger "github.com/mindersec/minder/internal/logger"
19
        pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
20
        "github.com/mindersec/minder/pkg/eventer/constants"
21
        "github.com/mindersec/minder/pkg/eventer/interfaces"
22
)
23

24
const (
25
        // DefaultExecutionTimeout is the timeout for execution of a set
26
        // of profiles on an entity.
27
        DefaultExecutionTimeout = 5 * time.Minute
28

29
        // ArtifactSignatureWaitPeriod is the waiting period for potential artifact signature to be available
30
        // before proceeding with evaluation.
31
        ArtifactSignatureWaitPeriod = 10 * time.Second
32
)
33

34
// ExecutorEventHandler handles entity events, executes evaluations,
35
// and publishes the results.
36
type ExecutorEventHandler struct {
37
        evt                    interfaces.Publisher
38
        handlerMiddleware      []message.HandlerMiddleware
39
        wgEntityEventExecution *sync.WaitGroup
40
        executor               Executor
41

42
        executionTimeout time.Duration
43

44
        cancels []*context.CancelFunc
45
        lock    sync.Mutex
46
        closed  bool
47
}
48

49
// NewExecutorEventHandler creates a new ExecutorEventHandler with
50
// configurable execution timeout.
51
func NewExecutorEventHandler(
52
        ctx context.Context,
53
        evt interfaces.Publisher,
54
        handlerMiddleware []message.HandlerMiddleware,
55
        executor Executor,
56
        executionTimeout time.Duration,
57
) *ExecutorEventHandler {
5✔
58

5✔
59
        if executionTimeout <= 0 {
7✔
60
                executionTimeout = DefaultExecutionTimeout
2✔
61
        }
2✔
62

63
        eh := &ExecutorEventHandler{
5✔
64
                evt:                    evt,
5✔
65
                wgEntityEventExecution: &sync.WaitGroup{},
5✔
66
                handlerMiddleware:      handlerMiddleware,
5✔
67
                executor:               executor,
5✔
68
                executionTimeout:       executionTimeout,
5✔
69
        }
5✔
70

5✔
71
        zerolog.Ctx(ctx).Debug().
5✔
72
                Dur("execution_timeout", executionTimeout).
5✔
73
                Msg("executor event handler initialized")
5✔
74

5✔
75
        go func() {
10✔
76
                <-ctx.Done()
5✔
77
                eh.lock.Lock()
5✔
78
                defer eh.lock.Unlock()
5✔
79

5✔
80
                eh.closed = true
5✔
81

5✔
82
                for _, cancel := range eh.cancels {
5✔
83
                        (*cancel)()
×
84
                }
×
85
        }()
86

87
        return eh
5✔
88
}
89

90
// Register registers the handler for entity evaluation events.
91
func (e *ExecutorEventHandler) Register(r interfaces.Registrar) {
×
92
        r.Register(constants.TopicQueueEntityEvaluate, e.HandleEntityEvent, e.handlerMiddleware...)
×
93
}
×
94

95
// Wait blocks until all entity event executions are complete.
96
func (e *ExecutorEventHandler) Wait() {
5✔
97
        e.wgEntityEventExecution.Wait()
5✔
98
}
5✔
99

100
// HandleEntityEvent processes incoming entity events and triggers evaluation.
101
func (e *ExecutorEventHandler) HandleEntityEvent(msg *message.Message) error {
6✔
102

6✔
103
        // NOTE: we're _deliberately_ "escaping" from the parent context's Cancel/Done
6✔
104
        // completion, because the default watermill behavior for both Go channels and
6✔
105
        // SQL is to process messages sequentially, but we need additional parallelism
6✔
106
        // beyond that. When we switch to a different message processing system, we
6✔
107
        // should aim to remove this goroutine altogether and have the messaging system
6✔
108
        // provide the parallelism.
6✔
109
        // We _do_ still want to cancel on shutdown, however.
6✔
110
        msgCtx := context.WithoutCancel(msg.Context())
6✔
111

6✔
112
        // This allows us to cancel rule evaluation directly when terminationContext
6✔
113
        // is cancelled.
6✔
114
        //nolint:gosec
6✔
115
        msgCtx, shutdownCancel := context.WithCancel(msgCtx)
6✔
116

6✔
117
        e.lock.Lock()
6✔
118
        if e.closed {
7✔
119
                e.lock.Unlock()
1✔
120
                shutdownCancel()
1✔
121
                return nil
1✔
122
        }
1✔
123
        e.cancels = append(e.cancels, &shutdownCancel)
5✔
124
        e.lock.Unlock()
5✔
125

5✔
126
        // Copy the message so the async goroutine does not share mutable state with
5✔
127
        // the original Watermill message.
5✔
128
        msg = msg.Copy()
5✔
129

5✔
130
        inf, err := entities.ParseEntityEvent(msg)
5✔
131
        if err != nil {
5✔
132
                return fmt.Errorf("error unmarshalling payload: %w", err)
×
133
        }
×
134

135
        e.wgEntityEventExecution.Add(1)
5✔
136

5✔
137
        go func() {
10✔
138
                defer e.wgEntityEventExecution.Done()
5✔
139

5✔
140
                if inf.Type == pb.Entity_ENTITY_ARTIFACTS {
5✔
141
                        // Wait for artifact signatures, but allow early exit on shutdown
×
142
                        select {
×
143
                        case <-time.After(ArtifactSignatureWaitPeriod):
×
144
                        case <-msgCtx.Done():
×
145
                                // stop waiting early, but continue execution
146
                        }
147
                }
148

149
                ctx, cancel := context.WithTimeout(msgCtx, e.executionTimeout)
5✔
150
                defer cancel()
5✔
151

5✔
152
                defer func() {
10✔
153
                        e.lock.Lock()
5✔
154
                        e.cancels = slices.DeleteFunc(e.cancels, func(cf *context.CancelFunc) bool {
10✔
155
                                return cf == &shutdownCancel
5✔
156
                        })
5✔
157
                        e.lock.Unlock()
5✔
158
                }()
159

160
                ctx = engcontext.WithEntityContext(ctx, &engcontext.EntityContext{
5✔
161
                        Project: engcontext.Project{ID: inf.ProjectID},
5✔
162
                        Provider: engcontext.Provider{
5✔
163
                                Name: inf.ProviderID.String(),
5✔
164
                        },
5✔
165
                })
5✔
166

5✔
167
                ts := minderlogger.BusinessRecord(ctx)
5✔
168
                ctx = ts.WithTelemetry(ctx)
5✔
169

5✔
170
                logger := zerolog.Ctx(ctx)
5✔
171

5✔
172
                if err := inf.WithExecutionIDFromMessage(msg); err != nil {
5✔
NEW
173
                        logger.Debug().
×
174
                                Str("message_id", msg.UUID).
×
175
                                Msg("message does not contain execution ID, skipping")
×
176
                        return
×
177
                }
×
178

179
                err := e.executor.EvalEntityEvent(ctx, inf)
5✔
180

5✔
181
                logMsg := logger.Info()
5✔
182
                if err != nil {
6✔
183
                        logMsg = logger.Error()
1✔
184
                }
1✔
185

186
                // record telemetry regardless of error. We explicitly record telemetry
187
                // here even though we also record it in the middleware because the evaluation
188
                // is done in a separate goroutine which usually still runs after the middleware
189
                // had already recorded the telemetry.
190
                ts.Record(logMsg).Send()
5✔
191

5✔
192
                if err != nil {
6✔
193
                        logger.Info().
1✔
194
                                Str("project", inf.ProjectID.String()).
1✔
195
                                Str("provider_id", inf.ProviderID.String()).
1✔
196
                                Str("entity", inf.Type.String()).
1✔
197
                                Str("entity_id", inf.EntityID.String()).
1✔
198
                                Err(err).
1✔
199
                                Msg("got error while evaluating entity event")
1✔
200
                }
1✔
201

202
                msg, err := inf.BuildMessage()
5✔
203
                if err != nil {
5✔
204
                        logger.Err(err).Msg("error building message")
×
205
                        return
×
206
                }
×
207

208
                if err := e.evt.Publish(constants.TopicQueueEntityFlush, msg); err != nil {
5✔
UNCOV
209
                        logger.Err(err).Msg("error publishing flush event")
×
210
                }
×
211
        }()
212

213
        return nil
5✔
214
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc