• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mindersec / minder / 24025144653

06 Apr 2026 08:38AM UTC coverage: 58.513%. First build
24025144653

Pull #6278

github

web-flow
Merge ae7c77356 into 94ee52363
Pull Request #6278: feat: make executor event handling timeout configurable

23 of 27 new or added lines in 2 files covered. (85.19%)

19290 of 32967 relevant lines covered (58.51%)

36.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.42
/internal/engine/handler.go
1
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
2
// SPDX-License-Identifier: Apache-2.0
3

4
package engine
5

6
import (
7
        "context"
8
        "fmt"
9
        "slices"
10
        "sync"
11
        "time"
12

13
        "github.com/ThreeDotsLabs/watermill/message"
14
        "github.com/rs/zerolog"
15

16
        "github.com/mindersec/minder/internal/engine/engcontext"
17
        "github.com/mindersec/minder/internal/engine/entities"
18
        minderlogger "github.com/mindersec/minder/internal/logger"
19
        pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
20
        "github.com/mindersec/minder/pkg/eventer/constants"
21
        "github.com/mindersec/minder/pkg/eventer/interfaces"
22
)
23

24
const (
25
        // DefaultExecutionTimeout is the timeout for execution of a set
26
        // of profiles on an entity.
27
        DefaultExecutionTimeout = 5 * time.Minute
28

29
        // ArtifactSignatureWaitPeriod is the waiting period for potential artifact signature to be available
30
        // before proceeding with evaluation.
31
        ArtifactSignatureWaitPeriod = 10 * time.Second
32
)
33

34
// ExecutorEventHandler is responsible for consuming entity events, passing
35
// entities to the executor, and then publishing the results.
36
type ExecutorEventHandler struct {
37
        evt                    interfaces.Publisher
38
        handlerMiddleware      []message.HandlerMiddleware
39
        wgEntityEventExecution *sync.WaitGroup
40
        executor               Executor
41

42
        executionTimeout time.Duration
43

44
        // cancels are a set of cancel functions for current entity events in flight.
45
        cancels []*context.CancelFunc
46
        lock    sync.Mutex
47
}
48

49
// NewExecutorEventHandler creates the event handler for the executor
50
func NewExecutorEventHandler(
51
        ctx context.Context,
52
        evt interfaces.Publisher,
53
        handlerMiddleware []message.HandlerMiddleware,
54
        executor Executor,
55
        executionTimeout time.Duration,
56
) *ExecutorEventHandler {
2✔
57

2✔
58
        if executionTimeout <= 0 {
2✔
NEW
59
                executionTimeout = DefaultExecutionTimeout
×
NEW
60
        }
×
61

62
        eh := &ExecutorEventHandler{
2✔
63
                evt:                    evt,
2✔
64
                wgEntityEventExecution: &sync.WaitGroup{},
2✔
65
                handlerMiddleware:      handlerMiddleware,
2✔
66
                executor:               executor,
2✔
67
                executionTimeout:       executionTimeout,
2✔
68
        }
2✔
69

2✔
70
        // Debug-level log (not noisy in production)
2✔
71
        zerolog.Ctx(ctx).Debug().
2✔
72
                Dur("execution_timeout", executionTimeout).
2✔
73
                Msg("executor event handler initialized")
2✔
74

2✔
75
        go func() {
4✔
76
                <-ctx.Done()
2✔
77
                eh.lock.Lock()
2✔
78
                defer eh.lock.Unlock()
2✔
79

2✔
80
                for _, cancel := range eh.cancels {
2✔
81
                        (*cancel)()
×
82
                }
×
83
        }()
84

85
        return eh
2✔
86
}
87

88
// Register implements the Consumer interface.
89
func (e *ExecutorEventHandler) Register(r interfaces.Registrar) {
×
90
        r.Register(constants.TopicQueueEntityEvaluate, e.HandleEntityEvent, e.handlerMiddleware...)
×
91
}
×
92

93
// Wait waits for all the entity executions to finish.
94
func (e *ExecutorEventHandler) Wait() {
2✔
95
        e.wgEntityEventExecution.Wait()
2✔
96
}
2✔
97

98
// HandleEntityEvent handles events coming from webhooks/signals
99
// as well as the init event.
100
func (e *ExecutorEventHandler) HandleEntityEvent(msg *message.Message) error {
3✔
101

3✔
102
        // Escape parent cancellation but still support shutdown cancellation
3✔
103
        msgCtx := context.WithoutCancel(msg.Context())
3✔
104

3✔
105
        //nolint:gosec
3✔
106
        msgCtx, shutdownCancel := context.WithCancel(msgCtx)
3✔
107

3✔
108
        e.lock.Lock()
3✔
109
        e.cancels = append(e.cancels, &shutdownCancel)
3✔
110
        e.lock.Unlock()
3✔
111

3✔
112
        // Copy message to avoid shared memory issues
3✔
113
        msg = msg.Copy()
3✔
114

3✔
115
        inf, err := entities.ParseEntityEvent(msg)
3✔
116
        if err != nil {
3✔
117
                return fmt.Errorf("error unmarshalling payload: %w", err)
×
118
        }
×
119

120
        e.wgEntityEventExecution.Add(1)
3✔
121

3✔
122
        go func() {
6✔
123
                defer e.wgEntityEventExecution.Done()
3✔
124

3✔
125
                if inf.Type == pb.Entity_ENTITY_ARTIFACTS {
3✔
126
                        time.Sleep(ArtifactSignatureWaitPeriod)
×
127
                }
×
128

129
                // ✅ FIX: Proper configurable timeout
130
                ctx, cancel := context.WithTimeout(msgCtx, e.executionTimeout)
3✔
131
                defer cancel()
3✔
132

3✔
133
                defer func() {
6✔
134
                        e.lock.Lock()
3✔
135
                        e.cancels = slices.DeleteFunc(e.cancels, func(cf *context.CancelFunc) bool {
7✔
136
                                return cf == &shutdownCancel
4✔
137
                        })
4✔
138
                        e.lock.Unlock()
3✔
139
                }()
140

141
                ctx = engcontext.WithEntityContext(ctx, &engcontext.EntityContext{
3✔
142
                        Project: engcontext.Project{ID: inf.ProjectID},
3✔
143
                        Provider: engcontext.Provider{
3✔
144
                                Name: inf.ProviderID.String(),
3✔
145
                        },
3✔
146
                })
3✔
147

3✔
148
                ts := minderlogger.BusinessRecord(ctx)
3✔
149
                ctx = ts.WithTelemetry(ctx)
3✔
150

3✔
151
                logger := zerolog.Ctx(ctx)
3✔
152

3✔
153
                if err := inf.WithExecutionIDFromMessage(msg); err != nil {
3✔
NEW
154
                        logger.Debug().
×
155
                                Str("message_id", msg.UUID).
×
156
                                Msg("message does not contain execution ID, skipping")
×
157
                        return
×
158
                }
×
159

160
                err := e.executor.EvalEntityEvent(ctx, inf)
3✔
161

3✔
162
                logMsg := logger.Info()
3✔
163
                if err != nil {
4✔
164
                        logMsg = logger.Error()
1✔
165
                }
1✔
166
                ts.Record(logMsg).Send()
3✔
167

3✔
168
                if err != nil {
4✔
169
                        logger.Info().
1✔
170
                                Str("project", inf.ProjectID.String()).
1✔
171
                                Str("provider_id", inf.ProviderID.String()).
1✔
172
                                Str("entity", inf.Type.String()).
1✔
173
                                Str("entity_id", inf.EntityID.String()).
1✔
174
                                Err(err).
1✔
175
                                Msg("got error while evaluating entity event")
1✔
176
                }
1✔
177

178
                msg, err := inf.BuildMessage()
3✔
179
                if err != nil {
3✔
180
                        logger.Err(err).Msg("error building message")
×
181
                        return
×
182
                }
×
183

184
                if err := e.evt.Publish(constants.TopicQueueEntityFlush, msg); err != nil {
3✔
185
                        logger.Err(err).Msg("error publishing flush event")
×
186
                }
×
187
        }()
188

189
        return nil
3✔
190
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc