• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mindersec / minder / 24027088219

06 Apr 2026 09:44AM UTC coverage: 58.475%. First build
24027088219

Pull #6281

github

web-flow
Merge 30c9bcb5f into 94ee52363
Pull Request #6281: refactor: populate provider field in entity context using provider ID fallback

27 of 39 new or added lines in 2 files covered. (69.23%)

19282 of 32975 relevant lines covered (58.47%)

36.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.83
/internal/engine/handler.go
1
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
2
// SPDX-License-Identifier: Apache-2.0
3

4
package engine
5

6
import (
7
        "context"
8
        "fmt"
9
        "slices"
10
        "sync"
11
        "time"
12

13
        "github.com/ThreeDotsLabs/watermill/message"
14
        "github.com/rs/zerolog"
15

16
        "github.com/mindersec/minder/internal/db"
17
        "github.com/mindersec/minder/internal/engine/engcontext"
18
        "github.com/mindersec/minder/internal/engine/entities"
19
        minderlogger "github.com/mindersec/minder/internal/logger"
20
        pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
21
        "github.com/mindersec/minder/pkg/eventer/constants"
22
        "github.com/mindersec/minder/pkg/eventer/interfaces"
23
)
24

25
const (
26
        // DefaultExecutionTimeout defines the default timeout for entity execution.
27
        DefaultExecutionTimeout = 5 * time.Minute
28

29
        // ArtifactSignatureWaitPeriod defines delay before processing artifact events.
30
        ArtifactSignatureWaitPeriod = 10 * time.Second
31
)
32

33
// ExecutorEventHandler handles entity evaluation events.
34
type ExecutorEventHandler struct {
35
        evt                    interfaces.Publisher
36
        handlerMiddleware      []message.HandlerMiddleware
37
        wgEntityEventExecution *sync.WaitGroup
38
        executor               Executor
39

40
        executionTimeout time.Duration
41
        store            db.Store
42

43
        cancels []*context.CancelFunc
44
        lock    sync.Mutex
45
}
46

47
// NewExecutorEventHandler creates a new ExecutorEventHandler.
48
func NewExecutorEventHandler(
49
        ctx context.Context,
50
        evt interfaces.Publisher,
51
        handlerMiddleware []message.HandlerMiddleware,
52
        executor Executor,
53
        executionTimeout time.Duration,
54
        store db.Store,
55
) *ExecutorEventHandler {
1✔
56

1✔
57
        if executionTimeout <= 0 {
1✔
NEW
58
                executionTimeout = DefaultExecutionTimeout
×
NEW
59
        }
×
60

61
        eh := &ExecutorEventHandler{
1✔
62
                evt:                    evt,
1✔
63
                wgEntityEventExecution: &sync.WaitGroup{},
1✔
64
                handlerMiddleware:      handlerMiddleware,
1✔
65
                executor:               executor,
1✔
66
                executionTimeout:       executionTimeout,
1✔
67
                store:                  store,
1✔
68
        }
1✔
69

1✔
70
        zerolog.Ctx(ctx).Debug().
1✔
71
                Dur("execution_timeout", executionTimeout).
1✔
72
                Msg("executor event handler initialized")
1✔
73

1✔
74
        go func() {
2✔
75
                <-ctx.Done()
1✔
76
                eh.lock.Lock()
1✔
77
                defer eh.lock.Unlock()
1✔
78

1✔
79
                for _, cancel := range eh.cancels {
1✔
80
                        (*cancel)()
×
81
                }
×
82
        }()
83

84
        return eh
1✔
85
}
86

87
// Register registers the handler for entity evaluation events.
88
func (e *ExecutorEventHandler) Register(r interfaces.Registrar) {
×
89
        r.Register(constants.TopicQueueEntityEvaluate, e.HandleEntityEvent, e.handlerMiddleware...)
×
90
}
×
91

92
// Wait blocks until all entity executions are complete.
93
func (e *ExecutorEventHandler) Wait() {
1✔
94
        e.wgEntityEventExecution.Wait()
1✔
95
}
1✔
96

97
// HandleEntityEvent processes incoming entity events.
98
func (e *ExecutorEventHandler) HandleEntityEvent(msg *message.Message) error {
2✔
99
        msgCtx := context.WithoutCancel(msg.Context())
2✔
100

2✔
101
        msgCtx, shutdownCancel := context.WithCancel(msgCtx) //nolint:gosec
2✔
102

2✔
103
        e.lock.Lock()
2✔
104
        e.cancels = append(e.cancels, &shutdownCancel)
2✔
105
        e.lock.Unlock()
2✔
106

2✔
107
        msg = msg.Copy()
2✔
108

2✔
109
        inf, err := entities.ParseEntityEvent(msg)
2✔
110
        if err != nil {
2✔
111
                return fmt.Errorf("error unmarshalling payload: %w", err)
×
112
        }
×
113

114
        e.wgEntityEventExecution.Add(1)
2✔
115

2✔
116
        go func() {
4✔
117
                defer e.wgEntityEventExecution.Done()
2✔
118

2✔
119
                if inf.Type == pb.Entity_ENTITY_ARTIFACTS {
2✔
120
                        time.Sleep(ArtifactSignatureWaitPeriod)
×
121
                }
×
122

123
                ctx, cancel := context.WithTimeout(msgCtx, e.executionTimeout)
2✔
124
                defer cancel()
2✔
125

2✔
126
                defer func() {
4✔
127
                        e.lock.Lock()
2✔
128
                        e.cancels = slices.DeleteFunc(e.cancels, func(cf *context.CancelFunc) bool {
4✔
129
                                return cf == &shutdownCancel
2✔
130
                        })
2✔
131
                        e.lock.Unlock()
2✔
132
                }()
133

134
                providerName := ""
2✔
135

2✔
136
                provider, err := e.store.GetProviderByID(ctx, inf.ProviderID)
2✔
137
                if err != nil {
2✔
NEW
138
                        zerolog.Ctx(ctx).Debug().
×
NEW
139
                                Err(err).
×
NEW
140
                                Str("provider_id", inf.ProviderID.String()).
×
NEW
141
                                Msg("failed to resolve provider name")
×
142
                } else if provider.Name != "" {
4✔
143
                        providerName = provider.Name
2✔
144
                }
2✔
145

146
                ctx = engcontext.WithEntityContext(ctx, &engcontext.EntityContext{
2✔
147
                        Project: engcontext.Project{ID: inf.ProjectID},
2✔
148
                        Provider: engcontext.Provider{
2✔
149
                                Name: providerName,
2✔
150
                        },
2✔
151
                })
2✔
152

2✔
153
                ts := minderlogger.BusinessRecord(ctx)
2✔
154
                ctx = ts.WithTelemetry(ctx)
2✔
155

2✔
156
                logger := zerolog.Ctx(ctx)
2✔
157

2✔
158
                if err := inf.WithExecutionIDFromMessage(msg); err != nil {
2✔
NEW
159
                        logger.Debug().
×
160
                                Str("message_id", msg.UUID).
×
161
                                Msg("message does not contain execution ID, skipping")
×
162
                        return
×
163
                }
×
164

165
                err = e.executor.EvalEntityEvent(ctx, inf)
2✔
166

2✔
167
                logMsg := logger.Info()
2✔
168
                if err != nil {
2✔
169
                        logMsg = logger.Error()
×
170
                }
×
171
                ts.Record(logMsg).Send()
2✔
172

2✔
173
                if err != nil {
2✔
174
                        logger.Info().
×
175
                                Str("project", inf.ProjectID.String()).
×
176
                                Str("provider_id", inf.ProviderID.String()).
×
177
                                Str("entity", inf.Type.String()).
×
178
                                Str("entity_id", inf.EntityID.String()).
×
NEW
179
                                Err(err).
×
NEW
180
                                Msg("got error while evaluating entity event")
×
181
                }
×
182

183
                msg, err := inf.BuildMessage()
2✔
184
                if err != nil {
2✔
185
                        logger.Err(err).Msg("error building message")
×
186
                        return
×
187
                }
×
188

189
                if err := e.evt.Publish(constants.TopicQueueEntityFlush, msg); err != nil {
2✔
190
                        logger.Err(err).Msg("error publishing flush event")
×
191
                }
×
192
        }()
193

194
        return nil
2✔
195
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc