• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

elastic / cloudbeat / 16114132341

07 Jul 2025 10:09AM UTC coverage: 76.124% (-0.07%) from 76.193%
16114132341

Pull #3430

github

orestisfl
Add mocks
Pull Request #3430: Add OpenTelemetry

147 of 205 new or added lines in 7 files covered. (71.71%)

2 existing lines in 1 file now uncovered.

9463 of 12431 relevant lines covered (76.12%)

16.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.11
/internal/evaluator/opa.go
1
// Licensed to Elasticsearch B.V. under one or more contributor
2
// license agreements. See the NOTICE file distributed with
3
// this work for additional information regarding copyright
4
// ownership. Elasticsearch B.V. licenses this file to you under
5
// the Apache License, Version 2.0 (the "License"); you may
6
// not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17

18
package evaluator
19

20
import (
21
        "bytes"
22
        "context"
23
        "fmt"
24
        "time"
25

26
        "github.com/mitchellh/mapstructure"
27
        "github.com/open-policy-agent/opa/v1/plugins"
28
        "github.com/open-policy-agent/opa/v1/sdk"
29
        "go.opentelemetry.io/otel/attribute"
30

31
        "github.com/elastic/cloudbeat/internal/config"
32
        dlogger "github.com/elastic/cloudbeat/internal/evaluator/debug_logger"
33
        "github.com/elastic/cloudbeat/internal/infra/clog"
34
        "github.com/elastic/cloudbeat/internal/infra/observability"
35
        "github.com/elastic/cloudbeat/internal/resources/fetching"
36
)
37

38
var now = func() time.Time { return time.Now().UTC() }
4✔
39

40
const scopeName = "github.com/elastic/cloudbeat/internal/evaluator"
41

42
type OpaEvaluator struct {
43
        log       *clog.Logger
44
        opa       *sdk.OPA
45
        benchmark string
46
}
47

48
type OpaInput struct {
49
        fetching.Result
50
        Benchmark string `json:"benchmark,omitempty"`
51
}
52

53
var opaConfig = `{
54
        "bundles": {
55
                "CSP": {
56
                        "resource": "file://%s"
57
                }
58
        },
59
%s
60
}`
61

62
var logPlugin = `
63
        "decision_logs": {
64
                "plugin": "%s"
65
        },
66
        "plugins": {
67
                "%s": {}
68
        }`
69

70
func NewOpaEvaluator(ctx context.Context, log *clog.Logger, cfg *config.Config) (*OpaEvaluator, error) {
6✔
71
        // provide the OPA configuration which specifies
6✔
72
        // fetching policy bundle and logging decisions locally to the console
6✔
73

6✔
74
        log.Infof("OPA bundle path: %s", cfg.BundlePath)
6✔
75

6✔
76
        plugin := fmt.Sprintf(logPlugin, dlogger.PluginName, dlogger.PluginName)
6✔
77
        opaCfg := fmt.Sprintf(opaConfig, cfg.BundlePath, plugin)
6✔
78

6✔
79
        // create an instance of the OPA object
6✔
80
        opa, err := sdk.New(ctx, sdk.Options{
6✔
81
                Config:        bytes.NewReader([]byte(opaCfg)),
6✔
82
                Logger:        newLogger(),
6✔
83
                ConsoleLogger: newLogger(),
6✔
84
                Plugins: map[string]plugins.Factory{
6✔
85
                        dlogger.PluginName: &dlogger.Factory{},
6✔
86
                },
6✔
87
        })
6✔
88
        if err != nil {
6✔
89
                return nil, fmt.Errorf("fail to init opa: %s", err.Error())
×
90
        }
×
91

92
        // Newer cloudbeat versions shouldn't look at deprecated runtime config values
93
        // and should always get benchmark values because the integration is auto updated with the stack
94
        var benchmark string
6✔
95
        if cfg.Benchmark != "" {
6✔
96
                // Assume that isSupportedBenchmark ran in config creation
×
97
                benchmark = cfg.Benchmark
×
98
        } else {
6✔
99
                log.Warn("no benchmark supplied")
6✔
100
        }
6✔
101

102
        log.Info("Successfully initiated OPA")
6✔
103
        return &OpaEvaluator{
6✔
104
                log:       log,
6✔
105
                opa:       opa,
6✔
106
                benchmark: benchmark,
6✔
107
        }, nil
6✔
108
}
109

110
func (o *OpaEvaluator) Eval(ctx context.Context, resourceInfo fetching.ResourceInfo) (EventData, error) {
4✔
111
        ctx, span := observability.StartSpan(ctx, scopeName, "OPA Eval")
4✔
112
        defer span.End()
4✔
113

4✔
114
        resMetadata, err := resourceInfo.GetMetadata()
4✔
115
        if err != nil {
4✔
NEW
116
                return EventData{}, observability.FailSpan(span, "failed to get resource metadata", err)
×
117
        }
×
118

119
        fetcherResult := fetching.Result{
4✔
120
                Type:     resMetadata.Type,
4✔
121
                SubType:  resMetadata.SubType,
4✔
122
                Resource: resourceInfo.GetData(),
4✔
123
        }
4✔
124

4✔
125
        result, err := o.decision(ctx, OpaInput{
4✔
126
                Result:    fetcherResult,
4✔
127
                Benchmark: o.benchmark,
4✔
128
        })
4✔
129
        if err != nil {
4✔
NEW
130
                return EventData{}, observability.FailSpan(span, "error running the policy", err)
×
131
        }
×
132

133
        ruleResults, err := o.decode(result)
4✔
134
        if err != nil {
4✔
NEW
135
                return EventData{}, observability.FailSpan(span, "error decoding findings", err)
×
136
        }
×
137

138
        span.SetAttributes(
4✔
139
                attribute.Int("findings.count", len(ruleResults.Findings)),
4✔
140
                attribute.String("resource.type", resMetadata.Type),
4✔
141
                attribute.String("resource.sub_type", resMetadata.SubType),
4✔
142
                attribute.String("resource.name", resMetadata.Name),
4✔
143
        )
4✔
144
        o.log.Debugf("Created %d findings for input: %v", len(ruleResults.Findings), fetcherResult)
4✔
145
        return EventData{ruleResults, resourceInfo}, nil
4✔
146
}
147

148
func (o *OpaEvaluator) Stop(ctx context.Context) {
×
149
        o.opa.Stop(ctx)
×
150
}
×
151

152
func (o *OpaEvaluator) decision(ctx context.Context, input OpaInput) (any, error) {
4✔
153
        // get the named policy decision for the specified input
4✔
154
        result, err := o.opa.Decision(ctx, sdk.DecisionOptions{
4✔
155
                Path:  "main",
4✔
156
                Input: input,
4✔
157
        })
4✔
158
        if err != nil {
4✔
159
                return nil, err
×
160
        }
×
161

162
        return result.Result, nil
4✔
163
}
164

165
func (o *OpaEvaluator) decode(result any) (RuleResult, error) {
5✔
166
        var opaResult RuleResult
5✔
167
        decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{Result: &opaResult})
5✔
168
        if err != nil {
5✔
169
                return RuleResult{}, err
×
170
        }
×
171

172
        err = decoder.Decode(result)
5✔
173
        opaResult.Metadata.CreatedAt = now()
5✔
174
        return opaResult, err
5✔
175
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc