• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mindersec / minder / 11366847092

16 Oct 2024 01:36PM UTC coverage: 54.774% (-0.08%) from 54.85%
11366847092

Pull #4762

github

web-flow
Merge 6859731c3 into c34f35443
Pull Request #4762: Implement Minder TestKit

50 of 60 new or added lines in 19 files covered. (83.33%)

13 existing lines in 3 files now uncovered.

14944 of 27283 relevant lines covered (54.77%)

41.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.26
/internal/engine/executor.go
1
// Copyright 2023 Stacklok, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package engine
16

17
import (
18
        "context"
19
        "fmt"
20
        "time"
21

22
        "github.com/google/uuid"
23
        "github.com/open-feature/go-sdk/openfeature"
24
        "github.com/rs/zerolog"
25

26
        "github.com/mindersec/minder/internal/db"
27
        "github.com/mindersec/minder/internal/engine/actions"
28
        "github.com/mindersec/minder/internal/engine/actions/alert"
29
        "github.com/mindersec/minder/internal/engine/actions/remediate"
30
        "github.com/mindersec/minder/internal/engine/entities"
31
        evalerrors "github.com/mindersec/minder/internal/engine/errors"
32
        "github.com/mindersec/minder/internal/engine/ingestcache"
33
        engif "github.com/mindersec/minder/internal/engine/interfaces"
34
        eoptions "github.com/mindersec/minder/internal/engine/options"
35
        "github.com/mindersec/minder/internal/engine/rtengine"
36
        "github.com/mindersec/minder/internal/engine/selectors"
37
        "github.com/mindersec/minder/internal/entities/properties/service"
38
        "github.com/mindersec/minder/internal/history"
39
        minderlogger "github.com/mindersec/minder/internal/logger"
40
        "github.com/mindersec/minder/internal/profiles"
41
        "github.com/mindersec/minder/internal/profiles/models"
42
        "github.com/mindersec/minder/internal/providers/manager"
43
        provsel "github.com/mindersec/minder/internal/providers/selectors"
44
        pb "github.com/mindersec/minder/pkg/api/protobuf/go/minder/v1"
45
        provinfv1 "github.com/mindersec/minder/pkg/providers/v1"
46
)
47

48
//go:generate go run go.uber.org/mock/mockgen -package mock_$GOPACKAGE -destination=./mock/$GOFILE -source=./$GOFILE
49

50
// Executor is the engine that executes the rules for a given event
51
type Executor interface {
52
        EvalEntityEvent(ctx context.Context, inf *entities.EntityInfoWrapper) error
53
}
54

55
type executor struct {
56
        querier         db.Store
57
        providerManager manager.ProviderManager
58
        metrics         *ExecutorMetrics
59
        historyService  history.EvaluationHistoryService
60
        featureFlags    openfeature.IClient
61
        profileStore    profiles.ProfileStore
62
        selBuilder      selectors.SelectionBuilder
63
        propService     service.PropertiesService
64
}
65

66
// NewExecutor creates a new executor
67
func NewExecutor(
68
        querier db.Store,
69
        providerManager manager.ProviderManager,
70
        metrics *ExecutorMetrics,
71
        historyService history.EvaluationHistoryService,
72
        featureFlags openfeature.IClient,
73
        profileStore profiles.ProfileStore,
74
        selBuilder selectors.SelectionBuilder,
75
        propService service.PropertiesService,
76
) Executor {
1✔
77
        return &executor{
1✔
78
                querier:         querier,
1✔
79
                providerManager: providerManager,
1✔
80
                metrics:         metrics,
1✔
81
                historyService:  historyService,
1✔
82
                featureFlags:    featureFlags,
1✔
83
                profileStore:    profileStore,
1✔
84
                selBuilder:      selBuilder,
1✔
85
                propService:     propService,
1✔
86
        }
1✔
87
}
1✔
88

89
// EvalEntityEvent evaluates the entity specified in the EntityInfoWrapper
90
// against all relevant rules in the project hierarchy.
91
func (e *executor) EvalEntityEvent(ctx context.Context, inf *entities.EntityInfoWrapper) error {
1✔
92
        logger := zerolog.Ctx(ctx).Info().
1✔
93
                Str("entity_type", inf.Type.ToString()).
1✔
94
                Str("execution_id", inf.ExecutionID.String()).
1✔
95
                Str("provider_id", inf.ProviderID.String()).
1✔
96
                Str("project_id", inf.ProjectID.String())
1✔
97
        logger.Msg("entity evaluation - started")
1✔
98

1✔
99
        // track the time taken to evaluate each entity
1✔
100
        entityStartTime := time.Now()
1✔
101
        defer e.metrics.TimeProfileEvaluation(ctx, entityStartTime)
1✔
102

1✔
103
        provider, err := e.providerManager.InstantiateFromID(ctx, inf.ProviderID)
1✔
104
        if err != nil {
1✔
105
                return fmt.Errorf("could not instantiate provider: %w", err)
×
106
        }
×
107

108
        // This is a cache, so we can avoid querying the ingester upstream
109
        // for every rule. We use a sync.Map because it's safe for concurrent
110
        // access.
111
        var ingestCache ingestcache.Cache
1✔
112
        if inf.Type == pb.Entity_ENTITY_ARTIFACTS {
1✔
113
                // We use a noop cache for artifacts because we don't want to cache
×
114
                // anything for them. The signature information is essentially another artifact version,
×
115
                // and so we don't want to cache that.
×
116
                ingestCache = ingestcache.NewNoopCache()
×
117
        } else {
1✔
118
                ingestCache = ingestcache.NewCache()
1✔
119
        }
1✔
120

121
        defer e.releaseLockAndFlush(ctx, inf)
1✔
122

1✔
123
        entityType := entities.EntityTypeToDB(inf.Type)
1✔
124
        // Load all the relevant rule type engines for this entity
1✔
125
        ruleEngineCache, err := rtengine.NewRuleEngineCache(
1✔
126
                ctx,
1✔
127
                e.querier,
1✔
128
                entityType,
1✔
129
                inf.ProjectID,
1✔
130
                provider,
1✔
131
                ingestCache,
1✔
132
                eoptions.WithFlagsClient(e.featureFlags),
1✔
133
        )
1✔
134
        if err != nil {
1✔
135
                return fmt.Errorf("unable to fetch rule type instances for project: %w", err)
×
136
        }
×
137

138
        // Get the profiles in the project hierarchy which have rules for this entity type
139
        // along with the relevant rule instances
140
        profileAggregates, err := e.profileStore.GetProfilesForEvaluation(ctx, inf.ProjectID, entityType)
1✔
141
        if err != nil {
1✔
142
                return fmt.Errorf("error while retrieving profiles and rule instances: %w", err)
×
143
        }
×
144

145
        // For each profile, get the profileEvalStatus first. Then, if the profileEvalStatus is nil
146
        // evaluate each rule and store the outcome in the database. If profileEvalStatus is non-nil,
147
        // just store it for all rules without evaluation.
148
        for _, profile := range profileAggregates {
3✔
149

2✔
150
                profileEvalStatus := e.profileEvalStatus(ctx, inf, profile)
2✔
151

2✔
152
                for _, rule := range profile.Rules {
3✔
153
                        if err := e.evaluateRule(ctx, inf, provider, &profile, &rule, ruleEngineCache, profileEvalStatus); err != nil {
1✔
154
                                return fmt.Errorf("error evaluating entity event: %w", err)
×
155
                        }
×
156
                }
157
        }
158

159
        return nil
1✔
160
}
161

162
func (e *executor) evaluateRule(
163
        ctx context.Context,
164
        inf *entities.EntityInfoWrapper,
165
        provider provinfv1.Provider,
166
        profile *models.ProfileAggregate,
167
        rule *models.RuleInstance,
168
        ruleEngineCache rtengine.Cache,
169
        profileEvalStatus error,
170
) error {
1✔
171
        // Create eval status params
1✔
172
        evalParams, err := e.createEvalStatusParams(ctx, inf, profile, rule)
1✔
173
        if err != nil {
1✔
174
                return fmt.Errorf("error creating eval status params: %w", err)
×
175
        }
×
176

177
        // retrieve the rule type engine from the cache
178
        ruleEngine, err := ruleEngineCache.GetRuleEngine(ctx, rule.RuleTypeID)
1✔
179
        if err != nil {
1✔
180
                return fmt.Errorf("error creating rule type engine: %w", err)
×
181
        }
×
182

183
        // create the action engine for this rule instance
184
        // unlike the rule type engine, this cannot be cached
185
        actionEngine, err := actions.NewRuleActions(ctx, ruleEngine.GetRuleType(), provider, &profile.ActionConfig)
1✔
186
        if err != nil {
1✔
187
                return fmt.Errorf("cannot create rule actions engine: %w", err)
×
188
        }
×
189

190
        evalParams.SetActionsOnOff(actionEngine.GetOnOffState())
1✔
191

1✔
192
        // Update the lock lease at the end of the evaluation
1✔
193
        defer e.updateLockLease(ctx, *inf.ExecutionID, evalParams)
1✔
194

1✔
195
        // Evaluate the rule
1✔
196
        var evalErr error
1✔
197
        if profileEvalStatus != nil {
1✔
UNCOV
198
                evalErr = profileEvalStatus
×
199
        } else {
1✔
200
                // enrich the logger with the entity type and execution ID
1✔
201
                ctx := zerolog.Ctx(ctx).With().
1✔
202
                        Str("entity_type", inf.Type.ToString()).
1✔
203
                        Str("execution_id", inf.ExecutionID.String()).
1✔
204
                        Logger().WithContext(ctx)
1✔
205
                evalErr = ruleEngine.Eval(ctx, inf.Entity, evalParams.GetRule().Def, evalParams.GetRule().Params, evalParams)
1✔
206

1✔
207
        }
1✔
208
        evalParams.SetEvalErr(evalErr)
1✔
209

1✔
210
        // Perform actionEngine, if any
1✔
211
        actionsErr := actionEngine.DoActions(ctx, inf.Entity, evalParams)
1✔
212
        evalParams.SetActionsErr(ctx, actionsErr)
1✔
213

1✔
214
        // Log the evaluation
1✔
215
        logEval(ctx, inf, evalParams, ruleEngine.GetRuleType().Name)
1✔
216

1✔
217
        // Create or update the evaluation status
1✔
218
        return e.createOrUpdateEvalStatus(ctx, evalParams)
1✔
219
}
220

221
func (e *executor) profileEvalStatus(
222
        ctx context.Context,
223
        eiw *entities.EntityInfoWrapper,
224
        aggregate models.ProfileAggregate,
225
) error {
2✔
226
        // so far this function only handles selectors. In the future we can extend it to handle other
2✔
227
        // profile-global evaluations
2✔
228

2✔
229
        if len(aggregate.Selectors) == 0 {
3✔
230
                return nil
1✔
231
        }
1✔
232

233
        selection, err := e.selBuilder.NewSelectionFromProfile(eiw.Type, aggregate.Selectors)
1✔
234
        if err != nil {
1✔
235
                return fmt.Errorf("error creating selection from profile: %w", err)
×
236
        }
×
237

238
        // get the entity UUID (the primary key in the database)
239
        entityID, err := eiw.GetID()
1✔
240
        if err != nil {
1✔
241
                return fmt.Errorf("error getting entity id: %w", err)
×
242
        }
×
243

244
        // get the entity with properties by the entity UUID
245
        ewp, err := e.propService.EntityWithPropertiesByID(ctx, entityID,
1✔
246
                service.CallBuilder().WithStoreOrTransaction(e.querier))
1✔
247
        if err != nil {
1✔
248
                return fmt.Errorf("error getting entity with properties: %w", err)
×
249
        }
×
250

251
        selEnt := provsel.EntityToSelectorEntity(ctx, e.querier, eiw.Type, ewp)
1✔
252
        if selEnt == nil {
1✔
253
                return fmt.Errorf("error converting entity to selector entity")
×
254
        }
×
255

256
        selected, matchedSelector, err := selection.Select(selEnt)
1✔
257
        if err != nil {
1✔
258
                return fmt.Errorf("error selecting entity: %w", err)
×
259
        }
×
260

261
        if !selected {
1✔
262
                return evalerrors.NewErrEvaluationSkipped("entity not applicable due to profile selector %s", matchedSelector)
×
263
        }
×
264

265
        return nil
1✔
266
}
267

268
func (e *executor) updateLockLease(
269
        ctx context.Context,
270
        executionID uuid.UUID,
271
        params *engif.EvalStatusParams,
272
) {
1✔
273
        logger := params.DecorateLogger(
1✔
274
                zerolog.Ctx(ctx).With().Str("execution_id", executionID.String()).Logger())
1✔
275

1✔
276
        if err := e.querier.UpdateLease(ctx, db.UpdateLeaseParams{
1✔
277
                LockedBy:         executionID,
1✔
278
                EntityInstanceID: params.EntityID,
1✔
279
        }); err != nil {
1✔
280
                logger.Err(err).Msg("error updating lock lease")
×
281
                return
×
282
        }
×
283

284
        logger.Info().Msg("lock lease updated")
1✔
285
}
286

287
func (e *executor) releaseLockAndFlush(
288
        ctx context.Context,
289
        inf *entities.EntityInfoWrapper,
290
) {
1✔
291
        eID, err := inf.GetID()
1✔
292
        if err != nil {
1✔
293
                zerolog.Ctx(ctx).Error().Err(err).Msg("error getting entity id")
×
294
                return
×
295
        }
×
296

297
        logger := zerolog.Ctx(ctx).Info().
1✔
298
                Str("entity_type", inf.Type.ToString()).
1✔
299
                Str("execution_id", inf.ExecutionID.String()).
1✔
300
                Str("entity_id", eID.String())
1✔
301

1✔
302
        if err := e.querier.ReleaseLock(ctx, db.ReleaseLockParams{
1✔
303
                EntityInstanceID: eID,
1✔
304
                LockedBy:         *inf.ExecutionID,
1✔
305
        }); err != nil {
1✔
306
                logger.Err(err).Msg("error updating lock lease")
×
307
        }
×
308
}
309

310
func logEval(
311
        ctx context.Context,
312
        inf *entities.EntityInfoWrapper,
313
        params *engif.EvalStatusParams,
314
        ruleTypeName string,
315
) {
1✔
316
        evalLog := params.DecorateLogger(
1✔
317
                zerolog.Ctx(ctx).With().
1✔
318
                        Str("eval_status", string(evalerrors.ErrorAsEvalStatus(params.GetEvalErr()))).
1✔
319
                        Str("project_id", inf.ProjectID.String()).
1✔
320
                        Logger())
1✔
321

1✔
322
        // log evaluation result and actions status
1✔
323
        evalLog.Info().
1✔
324
                Str("action", string(remediate.ActionType)).
1✔
325
                Str("action_status", string(evalerrors.ErrorAsRemediationStatus(params.GetActionsErr().RemediateErr))).
1✔
326
                Str("action", string(alert.ActionType)).
1✔
327
                Str("action_status", string(evalerrors.ErrorAsAlertStatus(params.GetActionsErr().AlertErr))).
1✔
328
                Msg("entity evaluation - completed")
1✔
329

1✔
330
        // log business logic
1✔
331
        minderlogger.BusinessRecord(ctx).AddRuleEval(params, ruleTypeName)
1✔
332
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc