• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

raystack / meteor / 11610963305

31 Oct 2024 11:18AM UTC coverage: 82.433% (-0.4%) from 82.814%
11610963305

push

github

ravisuhag
chore: remove unused imports

6809 of 8260 relevant lines covered (82.43%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.34
/plugins/extractors/kafka/kafka.go
1
package kafka
2

3
import (
4
        "context"
5
        "crypto/tls"
6
        "crypto/x509"
7
        _ "embed" // used to print the embedded assets
8
        "errors"
9
        "fmt"
10
        "os"
11
        "strings"
12
        "time"
13

14
        "github.com/IBM/sarama"
15
        "github.com/raystack/meteor/models"
16
        v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2"
17
        "github.com/raystack/meteor/plugins"
18
        "github.com/raystack/meteor/registry"
19
        "github.com/raystack/salt/log"
20
        "github.com/segmentio/kafka-go"
21
        "go.opentelemetry.io/otel"
22
        "go.opentelemetry.io/otel/attribute"
23
        "go.opentelemetry.io/otel/metric"
24
        "google.golang.org/protobuf/types/known/anypb"
25
        "google.golang.org/protobuf/types/known/structpb"
26
)
27

28
//go:embed README.md
29
var summary string
30

31
// default topics map to skip
32
var defaultTopics = map[string]struct{}{
33
        "__consumer_offsets": {},
34
        "_schemas":           {},
35
}
36

37
// Config holds the set of configuration for the kafka extractor
38
type Config struct {
39
        Broker string     `json:"broker" yaml:"broker" mapstructure:"broker" validate:"required"`
40
        Auth   AuthConfig `json:"auth_config" yaml:"auth_config" mapstructure:"auth_config"`
41
}
42

43
type AuthConfig struct {
44
        TLS struct {
45
                // Whether to use TLS when connecting to the broker
46
                // (defaults to false).
47
                Enabled bool `mapstructure:"enabled"`
48

49
                // controls whether a client verifies the server's certificate chain and host name
50
                // defaults to false
51
                InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"`
52

53
                // certificate file for client authentication
54
                CertFile string `mapstructure:"cert_file"`
55

56
                // key file for client authentication
57
                KeyFile string `mapstructure:"key_file"`
58

59
                // certificate authority file for TLS client authentication
60
                CAFile string `mapstructure:"ca_file"`
61
        } `mapstructure:"tls"`
62

63
        SASL struct {
64
                Enabled   bool   `mapstructure:"enabled"`
65
                Mechanism string `mapstructure:"mechanism"`
66
        }
67
}
68

69
var sampleConfig = `broker: "localhost:9092"`
70

71
var info = plugins.Info{
72
        Description:  "Topic list from Apache Kafka.",
73
        SampleConfig: sampleConfig,
74
        Summary:      summary,
75
        Tags:         []string{"oss", "extractor"},
76
}
77

78
// Extractor manages the extraction of data
79
// from a kafka broker
80
type Extractor struct {
81
        plugins.BaseExtractor
82
        // internal states
83
        conn       sarama.Consumer
84
        logger     log.Logger
85
        config     Config
86
        clientDurn metric.Int64Histogram
87
}
88

89
// New returns a pointer to an initialized Extractor Object
90
func New(logger log.Logger) *Extractor {
1✔
91
        e := &Extractor{
1✔
92
                logger: logger,
1✔
93
        }
1✔
94
        e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config)
1✔
95

1✔
96
        return e
1✔
97
}
1✔
98

99
// Init initializes the extractor
100
func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
1✔
101
        clientDurn, err := otel.Meter("github.com/raystack/meteor/plugins/extractors/kafka").
1✔
102
                Int64Histogram("meteor.kafka.client.duration", metric.WithUnit("ms"))
1✔
103
        if err != nil {
1✔
104
                otel.Handle(err)
×
105
        }
×
106

107
        e.clientDurn = clientDurn
1✔
108

1✔
109
        if err := e.BaseExtractor.Init(ctx, config); err != nil {
2✔
110
                return err
1✔
111
        }
1✔
112

113
        consumerConfig := sarama.NewConfig()
1✔
114
        if e.config.Auth.TLS.Enabled {
2✔
115
                tlsConfig, err := e.createTLSConfig()
1✔
116
                if err != nil {
2✔
117
                        return fmt.Errorf("create tls config: %w", err)
1✔
118
                }
1✔
119
                consumerConfig.Net.TLS.Enable = true
1✔
120
                consumerConfig.Net.TLS.Config = tlsConfig
1✔
121
        }
122

123
        if e.config.Auth.SASL.Enabled {
1✔
124
                consumerConfig.Net.SASL.Enable = true
×
125
                if e.config.Auth.SASL.Mechanism == sarama.SASLTypeOAuth {
×
126
                        consumerConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
×
127
                        consumerConfig.Net.SASL.TokenProvider = NewKubernetesTokenProvider()
×
128
                }
×
129
        }
130

131
        consumer, err := sarama.NewConsumer([]string{e.config.Broker}, consumerConfig)
1✔
132
        if err != nil {
2✔
133
                fmt.Printf("Error is here !! %s", err.Error())
1✔
134
                return fmt.Errorf("failed to create kafka consumer for brokers %s and config %+v. Error %s", e.config.Broker,
1✔
135
                        consumerConfig, err.Error())
1✔
136
        }
1✔
137

138
        e.conn = consumer
1✔
139
        return nil
1✔
140
}
141

142
// Extract checks if the extractor is ready to extract
143
// if so, then extracts metadata from the kafka broker
144
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) {
1✔
145
        defer e.conn.Close()
1✔
146

1✔
147
        defer func(start time.Time) {
2✔
148
                attributes := []attribute.KeyValue{
1✔
149
                        attribute.String("kafka.broker", e.config.Broker),
1✔
150
                        attribute.Bool("success", err == nil),
1✔
151
                }
1✔
152
                if err != nil {
1✔
153
                        errorCode := "UNKNOWN"
×
154
                        var kErr kafka.Error
×
155
                        if errors.As(err, &kErr) {
×
156
                                errorCode = strings.ReplaceAll(
×
157
                                        strings.ToUpper(kErr.Title()), " ", "_",
×
158
                                )
×
159
                        }
×
160
                        attributes = append(attributes, attribute.String("kafka.error_code", errorCode))
×
161
                }
162

163
                e.clientDurn.Record(
1✔
164
                        ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attributes...),
1✔
165
                )
1✔
166
        }(time.Now())
167

168
        topics, err := e.conn.Topics()
1✔
169
        if err != nil {
1✔
170
                return fmt.Errorf("fetch topics: %w", err)
×
171
        }
×
172

173
        // build and push topics
174
        for _, topic := range topics {
2✔
175
                // skip if topic is a default topic
1✔
176
                _, isDefaultTopic := defaultTopics[topic]
1✔
177
                if isDefaultTopic {
2✔
178
                        continue
1✔
179
                }
180

181
                partitions, err := e.conn.Partitions(topic)
1✔
182
                if err != nil {
1✔
183
                        e.logger.Error("failed to fetch partitions for topic", "err", err, "topic", topic)
×
184
                        continue
×
185
                }
186
                asset, err := e.buildAsset(topic, len(partitions))
1✔
187
                if err != nil {
1✔
188
                        e.logger.Error("failed to build asset", "err", err, "topic", topic)
×
189
                        continue
×
190
                }
191
                emit(models.NewRecord(asset))
1✔
192
        }
193
        return nil
1✔
194
}
195

196
func (e *Extractor) createTLSConfig() (*tls.Config, error) {
1✔
197
        authConfig := e.config.Auth.TLS
1✔
198

1✔
199
        if authConfig.CAFile == "" {
1✔
200
                //nolint:gosec
×
201
                return &tls.Config{
×
202
                        InsecureSkipVerify: e.config.Auth.TLS.InsecureSkipVerify,
×
203
                }, nil
×
204
        }
×
205

206
        var cert tls.Certificate
1✔
207
        var err error
1✔
208
        if authConfig.CertFile != "" && authConfig.KeyFile != "" {
2✔
209
                cert, err = tls.LoadX509KeyPair(authConfig.CertFile, authConfig.KeyFile)
1✔
210
                if err != nil {
2✔
211
                        return nil, fmt.Errorf("create cert: %w", err)
1✔
212
                }
1✔
213
        }
214

215
        caCert, err := os.ReadFile(authConfig.CAFile)
1✔
216
        if err != nil {
2✔
217
                return nil, fmt.Errorf("read ca cert file: %w", err)
1✔
218
        }
1✔
219

220
        caCertPool := x509.NewCertPool()
1✔
221
        caCertPool.AppendCertsFromPEM(caCert)
1✔
222

1✔
223
        //nolint:gosec
1✔
224
        return &tls.Config{
1✔
225
                Certificates:       []tls.Certificate{cert},
1✔
226
                RootCAs:            caCertPool,
1✔
227
                InsecureSkipVerify: e.config.Auth.TLS.InsecureSkipVerify,
1✔
228
        }, nil
1✔
229
}
230

231
// Build topic metadata model using a topic and number of partitions
232
func (e *Extractor) buildAsset(topicName string, numOfPartitions int) (*v1beta2.Asset, error) {
1✔
233
        topic, err := anypb.New(&v1beta2.Topic{
1✔
234
                Profile: &v1beta2.TopicProfile{
1✔
235
                        NumberOfPartitions: int64(numOfPartitions),
1✔
236
                },
1✔
237
                Attributes: &structpb.Struct{}, // ensure attributes don't get overwritten if present
1✔
238
        })
1✔
239
        if err != nil {
1✔
240
                e.logger.Warn("error creating Any struct", "error", err)
×
241
        }
×
242

243
        return &v1beta2.Asset{
1✔
244
                Urn:     models.NewURN("kafka", e.UrnScope, "topic", topicName),
1✔
245
                Name:    topicName,
1✔
246
                Service: "kafka",
1✔
247
                Type:    "topic",
1✔
248
                Data:    topic,
1✔
249
        }, nil
1✔
250
}
251

252
func init() {
1✔
253
        if err := registry.Extractors.Register("kafka", func() plugins.Extractor {
1✔
254
                return New(plugins.GetLog())
×
255
        }); err != nil {
×
256
                panic(err)
×
257
        }
258
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc