• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mindersec / minder / 24087961937

07 Apr 2026 02:52PM UTC coverage: 58.52%. First build
24087961937

Pull #6281

github

web-flow
Merge 9e18d0806 into 59497f2fc
Pull Request #6281: refactor: populate provider field in entity context using provider ID fallback

28 of 37 new or added lines in 2 files covered. (75.68%)

19322 of 33018 relevant lines covered (58.52%)

36.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.22
/internal/engine/handler.go
1
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
2
// SPDX-License-Identifier: Apache-2.0
3

4
package engine
5

6
import (
7
        "context"
8
        "fmt"
9
        "slices"
10
        "sync"
11
        "time"
12

13
        "github.com/ThreeDotsLabs/watermill/message"
14
        "github.com/rs/zerolog"
15

16
        "github.com/mindersec/minder/internal/engine/engcontext"
17
        "github.com/mindersec/minder/internal/engine/entities"
18
        minderlogger "github.com/mindersec/minder/internal/logger"
19
        pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
20
        "github.com/mindersec/minder/pkg/eventer/constants"
21
        "github.com/mindersec/minder/pkg/eventer/interfaces"
22
)
23

24
const (
25
        // DefaultExecutionTimeout defines the default timeout for entity execution.
26
        DefaultExecutionTimeout = 5 * time.Minute
27

28
        // ArtifactSignatureWaitPeriod defines delay before processing artifact events.
29
        ArtifactSignatureWaitPeriod = 10 * time.Second
30
)
31

32
// ExecutorEventHandler handles entity evaluation events.
33
type ExecutorEventHandler struct {
34
        evt                    interfaces.Publisher
35
        handlerMiddleware      []message.HandlerMiddleware
36
        wgEntityEventExecution *sync.WaitGroup
37
        executor               Executor
38

39
        executionTimeout time.Duration
40

41
        cancels []*context.CancelFunc
42
        lock    sync.Mutex
43
}
44

45
// NewExecutorEventHandler creates a new ExecutorEventHandler.
46
func NewExecutorEventHandler(
47
        ctx context.Context,
48
        evt interfaces.Publisher,
49
        handlerMiddleware []message.HandlerMiddleware,
50
        executor Executor,
51
        executionTimeout time.Duration,
52
) *ExecutorEventHandler {
1✔
53

1✔
54
        if executionTimeout <= 0 {
1✔
NEW
55
                executionTimeout = DefaultExecutionTimeout
×
NEW
56
        }
×
57

58
        eh := &ExecutorEventHandler{
1✔
59
                evt:                    evt,
1✔
60
                wgEntityEventExecution: &sync.WaitGroup{},
1✔
61
                handlerMiddleware:      handlerMiddleware,
1✔
62
                executor:               executor,
1✔
63
                executionTimeout:       executionTimeout,
1✔
64
        }
1✔
65

1✔
66
        zerolog.Ctx(ctx).Debug().
1✔
67
                Dur("execution_timeout", executionTimeout).
1✔
68
                Msg("executor event handler initialized")
1✔
69

1✔
70
        go func() {
2✔
71
                <-ctx.Done()
1✔
72
                eh.lock.Lock()
1✔
73
                defer eh.lock.Unlock()
1✔
74

1✔
75
                for _, cancel := range eh.cancels {
1✔
76
                        (*cancel)()
×
77
                }
×
78
        }()
79

80
        return eh
1✔
81
}
82

83
// Register registers the handler for entity evaluation events.
84
func (e *ExecutorEventHandler) Register(r interfaces.Registrar) {
×
85
        r.Register(constants.TopicQueueEntityEvaluate, e.HandleEntityEvent, e.handlerMiddleware...)
×
86
}
×
87

88
// Wait blocks until all entity executions are complete.
89
func (e *ExecutorEventHandler) Wait() {
1✔
90
        e.wgEntityEventExecution.Wait()
1✔
91
}
1✔
92

93
// HandleEntityEvent processes incoming entity events.
94
func (e *ExecutorEventHandler) HandleEntityEvent(msg *message.Message) error {
2✔
95

2✔
96
        // NOTE: we're _deliberately_ "escaping" from the parent context's Cancel/Done
2✔
97
        // completion, because the default watermill behavior for both Go channels and
2✔
98
        // SQL is to process messages sequentially, but we need additional parallelism
2✔
99
        // beyond that. When we switch to a different message processing system, we
2✔
100
        // should aim to remove this goroutine altogether and have the messaging system
2✔
101
        // provide the parallelism.
2✔
102
        // We _do_ still want to cancel on shutdown, however.
2✔
103
        msgCtx := context.WithoutCancel(msg.Context())
2✔
104

2✔
105
        // This allows us to cancel rule evaluation directly when terminationContext
2✔
106
        // is cancelled.
2✔
107
        //nolint:gosec
2✔
108
        msgCtx, shutdownCancel := context.WithCancel(msgCtx)
2✔
109

2✔
110
        e.lock.Lock()
2✔
111
        e.cancels = append(e.cancels, &shutdownCancel)
2✔
112
        e.lock.Unlock()
2✔
113

2✔
114
        msg = msg.Copy()
2✔
115

2✔
116
        inf, err := entities.ParseEntityEvent(msg)
2✔
117
        if err != nil {
2✔
118
                return fmt.Errorf("error unmarshalling payload: %w", err)
×
119
        }
×
120

121
        e.wgEntityEventExecution.Add(1)
2✔
122

2✔
123
        go func() {
4✔
124
                defer e.wgEntityEventExecution.Done()
2✔
125

2✔
126
                if inf.Type == pb.Entity_ENTITY_ARTIFACTS {
2✔
127
                        time.Sleep(ArtifactSignatureWaitPeriod)
×
128
                }
×
129

130
                timeout := e.executionTimeout
2✔
131
                if timeout <= 0 {
2✔
NEW
132
                        timeout = DefaultExecutionTimeout
×
NEW
133
                }
×
134
                ctx, cancel := context.WithTimeout(msgCtx, timeout)
2✔
135
                defer cancel()
2✔
136

2✔
137
                defer func() {
4✔
138
                        e.lock.Lock()
2✔
139
                        e.cancels = slices.DeleteFunc(e.cancels, func(cf *context.CancelFunc) bool {
5✔
140
                                return cf == &shutdownCancel
3✔
141
                        })
3✔
142
                        e.lock.Unlock()
2✔
143
                }()
144

145
                ctx = engcontext.WithEntityContext(ctx, &engcontext.EntityContext{
2✔
146
                        Project: engcontext.Project{ID: inf.ProjectID},
2✔
147
                        Provider: engcontext.Provider{
2✔
148
                                Name: inf.ProviderID.String(),
2✔
149
                        },
2✔
150
                })
2✔
151

2✔
152
                ts := minderlogger.BusinessRecord(ctx)
2✔
153
                ctx = ts.WithTelemetry(ctx)
2✔
154

2✔
155
                logger := zerolog.Ctx(ctx)
2✔
156

2✔
157
                if err := inf.WithExecutionIDFromMessage(msg); err != nil {
2✔
NEW
158
                        logger.Debug().
×
159
                                Str("message_id", msg.UUID).
×
160
                                Msg("message does not contain execution ID, skipping")
×
161
                        return
×
162
                }
×
163

164
                err := e.executor.EvalEntityEvent(ctx, inf)
2✔
165

2✔
166
                logMsg := logger.Info()
2✔
167
                if err != nil {
2✔
168
                        logMsg = logger.Error()
×
169
                }
×
170
                ts.Record(logMsg).Send()
2✔
171

2✔
172
                // record telemetry regardless of error. We explicitly record telemetry
2✔
173
                // here even though we also record it in the middleware because the evaluation
2✔
174
                // is done in a separate goroutine which usually still runs after the middleware
2✔
175
                // had already recorded the telemetry.
2✔
176

2✔
177
                if err != nil {
2✔
178
                        logger.Info().
×
179
                                Str("project", inf.ProjectID.String()).
×
180
                                Str("provider_id", inf.ProviderID.String()).
×
181
                                Str("entity", inf.Type.String()).
×
182
                                Str("entity_id", inf.EntityID.String()).
×
NEW
183
                                Err(err).
×
NEW
184
                                Msg("got error while evaluating entity event")
×
185
                }
×
186

187
                msg, err := inf.BuildMessage()
2✔
188
                if err != nil {
2✔
189
                        logger.Err(err).Msg("error building message")
×
190
                        return
×
191
                }
×
192

193
                if err := e.evt.Publish(constants.TopicQueueEntityFlush, msg); err != nil {
2✔
194
                        logger.Err(err).Msg("error publishing flush event")
×
195
                }
×
196
        }()
197

198
        return nil
2✔
199
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc