• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mindersec / minder / 24052250697

06 Apr 2026 09:31PM UTC coverage: 58.551%. First build
24052250697

Pull #6278

github

web-flow
Merge 3d125320c into d7c1044e8
Pull Request #6278: feat: make executor event handling timeout configurable

28 of 32 new or added lines in 2 files covered. (87.5%)

19330 of 33014 relevant lines covered (58.55%)

36.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.06
/internal/engine/handler.go
1
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
2
// SPDX-License-Identifier: Apache-2.0
3

4
package engine
5

6
import (
7
        "context"
8
        "fmt"
9
        "slices"
10
        "sync"
11
        "time"
12

13
        "github.com/ThreeDotsLabs/watermill/message"
14
        "github.com/rs/zerolog"
15

16
        "github.com/mindersec/minder/internal/engine/engcontext"
17
        "github.com/mindersec/minder/internal/engine/entities"
18
        minderlogger "github.com/mindersec/minder/internal/logger"
19
        pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
20
        "github.com/mindersec/minder/pkg/eventer/constants"
21
        "github.com/mindersec/minder/pkg/eventer/interfaces"
22
)
23

24
const (
25
        // DefaultExecutionTimeout is the timeout for execution of a set
26
        // of profiles on an entity.
27
        DefaultExecutionTimeout = 5 * time.Minute
28

29
        // ArtifactSignatureWaitPeriod is the waiting period for potential artifact signature to be available
30
        // before proceeding with evaluation.
31
        ArtifactSignatureWaitPeriod = 10 * time.Second
32
)
33

34
// ExecutorEventHandler handles entity events, executes evaluations,
35
// and publishes the results.
36
type ExecutorEventHandler struct {
37
        evt                    interfaces.Publisher
38
        handlerMiddleware      []message.HandlerMiddleware
39
        wgEntityEventExecution *sync.WaitGroup
40
        executor               Executor
41

42
        executionTimeout time.Duration
43

44
        cancels []*context.CancelFunc
45
        lock    sync.Mutex
46
}
47

48
// NewExecutorEventHandler creates a new ExecutorEventHandler with
49
// configurable execution timeout.
50
func NewExecutorEventHandler(
51
        ctx context.Context,
52
        evt interfaces.Publisher,
53
        handlerMiddleware []message.HandlerMiddleware,
54
        executor Executor,
55
        executionTimeout time.Duration,
56
) *ExecutorEventHandler {
2✔
57

2✔
58
        if executionTimeout <= 0 {
2✔
NEW
59
                executionTimeout = DefaultExecutionTimeout
×
NEW
60
        }
×
61

62
        eh := &ExecutorEventHandler{
2✔
63
                evt:                    evt,
2✔
64
                wgEntityEventExecution: &sync.WaitGroup{},
2✔
65
                handlerMiddleware:      handlerMiddleware,
2✔
66
                executor:               executor,
2✔
67
                executionTimeout:       executionTimeout,
2✔
68
        }
2✔
69

2✔
70
        zerolog.Ctx(ctx).Debug().
2✔
71
                Dur("execution_timeout", executionTimeout).
2✔
72
                Msg("executor event handler initialized")
2✔
73

2✔
74
        go func() {
4✔
75
                <-ctx.Done()
2✔
76
                eh.lock.Lock()
2✔
77
                defer eh.lock.Unlock()
2✔
78

2✔
79
                for _, cancel := range eh.cancels {
2✔
80
                        (*cancel)()
×
81
                }
×
82
        }()
83

84
        return eh
2✔
85
}
86

87
// Register registers the handler for entity evaluation events.
88
func (e *ExecutorEventHandler) Register(r interfaces.Registrar) {
×
89
        r.Register(constants.TopicQueueEntityEvaluate, e.HandleEntityEvent, e.handlerMiddleware...)
×
90
}
×
91

92
// Wait blocks until all entity event executions are complete.
93
func (e *ExecutorEventHandler) Wait() {
2✔
94
        e.wgEntityEventExecution.Wait()
2✔
95
}
2✔
96

97
// HandleEntityEvent processes incoming entity events and triggers evaluation.
98
func (e *ExecutorEventHandler) HandleEntityEvent(msg *message.Message) error {
3✔
99

3✔
100
        // NOTE: we're _deliberately_ "escaping" from the parent context's Cancel/Done
3✔
101
        // completion, because the default watermill behavior for both Go channels and
3✔
102
        // SQL is to process messages sequentially, but we need additional parallelism
3✔
103
        // beyond that. When we switch to a different message processing system, we
3✔
104
        // should aim to remove this goroutine altogether and have the messaging system
3✔
105
        // provide the parallelism.
3✔
106
        // We _do_ still want to cancel on shutdown, however.
3✔
107
        msgCtx := context.WithoutCancel(msg.Context())
3✔
108

3✔
109
        // This allows us to cancel rule evaluation directly when terminationContext
3✔
110
        // is cancelled.
3✔
111
        //nolint:gosec
3✔
112
        msgCtx, shutdownCancel := context.WithCancel(msgCtx)
3✔
113

3✔
114
        e.lock.Lock()
3✔
115
        e.cancels = append(e.cancels, &shutdownCancel)
3✔
116
        e.lock.Unlock()
3✔
117

3✔
118
        msg = msg.Copy()
3✔
119

3✔
120
        inf, err := entities.ParseEntityEvent(msg)
3✔
121
        if err != nil {
3✔
122
                return fmt.Errorf("error unmarshalling payload: %w", err)
×
123
        }
×
124

125
        e.wgEntityEventExecution.Add(1)
3✔
126

3✔
127
        go func() {
6✔
128
                defer e.wgEntityEventExecution.Done()
3✔
129

3✔
130
                if inf.Type == pb.Entity_ENTITY_ARTIFACTS {
3✔
131
                        time.Sleep(ArtifactSignatureWaitPeriod)
×
132
                }
×
133

134
                // use configurable timeout
135
                ctx, cancel := context.WithTimeout(msgCtx, e.executionTimeout)
3✔
136
                defer cancel()
3✔
137

3✔
138
                defer func() {
6✔
139
                        e.lock.Lock()
3✔
140
                        e.cancels = slices.DeleteFunc(e.cancels, func(cf *context.CancelFunc) bool {
6✔
141
                                return cf == &shutdownCancel
3✔
142
                        })
3✔
143
                        e.lock.Unlock()
3✔
144
                }()
145

146
                ctx = engcontext.WithEntityContext(ctx, &engcontext.EntityContext{
3✔
147
                        Project: engcontext.Project{ID: inf.ProjectID},
3✔
148
                        Provider: engcontext.Provider{
3✔
149
                                Name: inf.ProviderID.String(), // unchanged
3✔
150
                        },
3✔
151
                })
3✔
152

3✔
153
                ts := minderlogger.BusinessRecord(ctx)
3✔
154
                ctx = ts.WithTelemetry(ctx)
3✔
155

3✔
156
                logger := zerolog.Ctx(ctx)
3✔
157

3✔
158
                if err := inf.WithExecutionIDFromMessage(msg); err != nil {
3✔
NEW
159
                        logger.Debug().
×
160
                                Str("message_id", msg.UUID).
×
161
                                Msg("message does not contain execution ID, skipping")
×
162
                        return
×
163
                }
×
164

165
                err := e.executor.EvalEntityEvent(ctx, inf)
3✔
166

3✔
167
                logMsg := logger.Info()
3✔
168
                if err != nil {
4✔
169
                        logMsg = logger.Error()
1✔
170
                }
1✔
171
                ts.Record(logMsg).Send()
3✔
172

3✔
173
                // record telemetry regardless of error. We explicitly record telemetry
3✔
174
                // here even though we also record it in the middleware because the evaluation
3✔
175
                // is done in a separate goroutine which usually still runs after the middleware
3✔
176
                // had already recorded the telemetry.
3✔
177

3✔
178
                if err != nil {
4✔
179
                        logger.Info().
1✔
180
                                Str("project", inf.ProjectID.String()).
1✔
181
                                Str("provider_id", inf.ProviderID.String()).
1✔
182
                                Str("entity", inf.Type.String()).
1✔
183
                                Str("entity_id", inf.EntityID.String()).
1✔
184
                                Err(err).
1✔
185
                                Msg("got error while evaluating entity event")
1✔
186
                }
1✔
187

188
                msg, err := inf.BuildMessage()
3✔
189
                if err != nil {
3✔
190
                        logger.Err(err).Msg("error building message")
×
191
                        return
×
192
                }
×
193

194
                if err := e.evt.Publish(constants.TopicQueueEntityFlush, msg); err != nil {
3✔
195
                        logger.Err(err).Msg("error publishing flush event")
×
196
                }
×
197
        }()
198

199
        return nil
3✔
200
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc