• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mostafa / xk6-kafka / 10814152930

11 Sep 2024 02:44PM UTC coverage: 75.27%. Remained the same
10814152930

push

github

web-flow
Update deps (#305)

* Update k6 to v0.53.0
* Update Go to v1.23.1
* Update deps
* Update Go version in actions
* Pin package to avoid conflicts

1534 of 2038 relevant lines covered (75.27%)

13.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.77
/auth.go
1
package kafka
2

3
import (
4
        "context"
5
        "crypto/tls"
6
        "crypto/x509"
7
        "fmt"
8
        "os"
9
        "time"
10

11
        "github.com/aws/aws-sdk-go-v2/config"
12
        kafkago "github.com/segmentio/kafka-go"
13
        "github.com/segmentio/kafka-go/sasl"
14
        "github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2"
15
        "github.com/segmentio/kafka-go/sasl/plain"
16
        "github.com/segmentio/kafka-go/sasl/scram"
17
)
18

19
// TLSVersions is a map of TLS versions to their numeric values.
20
var TLSVersions map[string]uint16
21

22
const (
23
        none            = "none"
24
        saslPlain       = "sasl_plain"
25
        saslScramSha256 = "sasl_scram_sha256"
26
        saslScramSha512 = "sasl_scram_sha512"
27
        saslSsl         = "sasl_ssl"
28
        saslAwsIam      = "sasl_aws_iam"
29

30
        Timeout = time.Second * 10
31
)
32

33
type SASLConfig struct {
34
        Username  string `json:"username"`
35
        Password  string `json:"password"`
36
        Algorithm string `json:"algorithm"`
37
}
38

39
type TLSConfig struct {
40
        EnableTLS             bool   `json:"enableTls"`
41
        InsecureSkipTLSVerify bool   `json:"insecureSkipTlsVerify"`
42
        MinVersion            string `json:"minVersion"`
43
        ClientCertPem         string `json:"clientCertPem"`
44
        ClientKeyPem          string `json:"clientKeyPem"`
45
        ServerCaPem           string `json:"serverCaPem"`
46
}
47

48
// GetDialer creates a kafka dialer from the given auth string or an unauthenticated dialer if the auth string is empty.
49
func GetDialer(saslConfig SASLConfig, tlsConfig TLSConfig) (*kafkago.Dialer, *Xk6KafkaError) {
35✔
50
        // Create a unauthenticated dialer with no TLS
35✔
51
        dialer := &kafkago.Dialer{
35✔
52
                Timeout:   Timeout,
35✔
53
                DualStack: false,
35✔
54
        }
35✔
55

35✔
56
        // Create a SASL-authenticated dialer with no TLS
35✔
57
        saslMechanism, err := GetSASLMechanism(saslConfig)
35✔
58
        if err != nil {
36✔
59
                return nil, err
1✔
60
        }
1✔
61
        if saslMechanism != nil {
38✔
62
                dialer.DualStack = true
4✔
63
                dialer.SASLMechanism = saslMechanism
4✔
64
        }
4✔
65

66
        // Create a TLS dialer, either with or without SASL authentication
67
        tlsObject, err := GetTLSConfig(tlsConfig)
34✔
68
        if err != nil {
66✔
69
                // Ignore the error if we're not using TLS
32✔
70
                if err.Code != noTLSConfig {
33✔
71
                        logger.WithField("error", err).Error("Cannot process TLS config")
1✔
72
                }
1✔
73
        }
74
        if tlsObject == nil && saslConfig.Algorithm == saslSsl {
35✔
75
                return nil, NewXk6KafkaError(
1✔
76
                        failedCreateDialerWithSaslSSL, "You must enable TLS to use SASL_SSL", nil)
1✔
77
        }
1✔
78
        dialer.TLS = tlsObject
33✔
79
        dialer.DualStack = (tlsObject != nil)
33✔
80

33✔
81
        return dialer, nil
33✔
82
}
83

84
// GetSASLMechanism returns a kafka SASL config from the given credentials.
85
func GetSASLMechanism(saslConfig SASLConfig) (sasl.Mechanism, *Xk6KafkaError) {
35✔
86
        if saslConfig.Algorithm == "" {
65✔
87
                saslConfig.Algorithm = none
30✔
88
        }
30✔
89

90
        switch saslConfig.Algorithm {
35✔
91
        case none:
30✔
92
                return nil, nil
30✔
93
        case saslPlain, saslSsl:
3✔
94
                mechanism := plain.Mechanism{
3✔
95
                        Username: saslConfig.Username,
3✔
96
                        Password: saslConfig.Password,
3✔
97
                }
3✔
98
                return mechanism, nil
3✔
99
        case saslScramSha256, saslScramSha512:
2✔
100
                hashes := make(map[string]scram.Algorithm)
2✔
101
                hashes[saslScramSha256] = scram.SHA256
2✔
102
                hashes[saslScramSha512] = scram.SHA512
2✔
103

2✔
104
                mechanism, err := scram.Mechanism(
2✔
105
                        hashes[saslConfig.Algorithm],
2✔
106
                        saslConfig.Username,
2✔
107
                        saslConfig.Password,
2✔
108
                )
2✔
109
                if err != nil {
3✔
110
                        return nil, NewXk6KafkaError(
1✔
111
                                failedCreateDialerWithScram, "Unable to create SCRAM mechanism", err)
1✔
112
                }
1✔
113
                return mechanism, nil
1✔
114
        case saslAwsIam:
×
115
                cfg, err := config.LoadDefaultConfig(context.TODO())
×
116
                if err != nil {
×
117
                        return nil, NewXk6KafkaError(
×
118
                                failedCreateDialerWithAwsIam, "Unable to load AWS IAM config for AWS MSK", err)
×
119
                }
×
120
                return aws_msk_iam_v2.NewMechanism(cfg), nil
×
121
        default:
×
122
                // Should we fail silently?
×
123
                return nil, nil
×
124
        }
125
}
126

127
// GetTLSConfig creates a TLS config from the given TLS config struct and checks for errors.
128
// nolint: funlen
129
func GetTLSConfig(tlsConfig TLSConfig) (*tls.Config, *Xk6KafkaError) {
48✔
130
        tlsObject := newTLSObject(tlsConfig)
48✔
131

48✔
132
        if tlsConfig.EnableTLS {
58✔
133
                if tlsConfig.ServerCaPem == "" {
12✔
134
                        return tlsObject, nil
2✔
135
                }
2✔
136
        } else {
38✔
137
                // TLS is disabled, and we continue with a unauthenticated dialer
38✔
138
                return nil, NewXk6KafkaError(
38✔
139
                        noTLSConfig, "No TLS config provided. Continuing with TLS disabled.", nil)
38✔
140
        }
38✔
141

142
        if tlsConfig.ClientCertPem != "" && tlsConfig.ClientKeyPem != "" {
16✔
143
                // Try to load the certificates from string
8✔
144
                cert, err := tls.X509KeyPair([]byte(tlsConfig.ClientCertPem), []byte(tlsConfig.ClientKeyPem))
8✔
145
                if err != nil && err.Error() == "tls: failed to find any PEM data in certificate input" {
15✔
146
                        // Fall back to loading the client certificate and key from the file
7✔
147
                        if err := fileExists(tlsConfig.ClientCertPem); err != nil {
9✔
148
                                return nil, err
2✔
149
                        }
2✔
150

151
                        if err := fileExists(tlsConfig.ClientKeyPem); err != nil {
6✔
152
                                return nil, err
1✔
153
                        }
1✔
154

155
                        cert, err = tls.LoadX509KeyPair(tlsConfig.ClientCertPem, tlsConfig.ClientKeyPem)
4✔
156
                        if err != nil {
5✔
157
                                return nil, NewXk6KafkaError(
1✔
158
                                        failedLoadX509KeyPair,
1✔
159
                                        fmt.Sprintf(
1✔
160
                                                "Error creating x509 key pair from \"%s\" and \"%s\".",
1✔
161
                                                tlsConfig.ClientCertPem,
1✔
162
                                                tlsConfig.ClientKeyPem),
1✔
163
                                        err)
1✔
164
                        }
1✔
165
                } else if err != nil {
1✔
166
                        return nil, NewXk6KafkaError(
×
167
                                failedLoadX509KeyPair,
×
168
                                "Error creating x509 key pair from passed content.", err)
×
169
                }
×
170

171
                tlsObject.Certificates = []tls.Certificate{cert}
4✔
172
        }
173

174
        caCertPool := x509.NewCertPool()
4✔
175

4✔
176
        // Load the CA certificate as string if provided
4✔
177
        if ok := caCertPool.AppendCertsFromPEM([]byte(tlsConfig.ServerCaPem)); !ok {
7✔
178
                // Fall back if file path is provided
3✔
179
                if err := fileExists(tlsConfig.ServerCaPem); err != nil {
4✔
180
                        return nil, err
1✔
181
                }
1✔
182

183
                caCert, err := os.ReadFile(tlsConfig.ServerCaPem)
2✔
184
                if err != nil {
2✔
185
                        // This might happen on permissions issues or if the file is unreadable somehow
×
186
                        return nil, NewXk6KafkaError(
×
187
                                failedReadCaCertFile,
×
188
                                fmt.Sprintf(
×
189
                                        "Error reading CA certificate file \"%s\".",
×
190
                                        tlsConfig.ServerCaPem),
×
191
                                err)
×
192
                }
×
193

194
                if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
3✔
195
                        return nil, NewXk6KafkaError(
1✔
196
                                failedAppendCaCertFile,
1✔
197
                                fmt.Sprintf(
1✔
198
                                        "Error appending CA certificate file \"%s\".",
1✔
199
                                        tlsConfig.ServerCaPem),
1✔
200
                                nil)
1✔
201
                }
1✔
202
        }
203

204
        tlsObject.RootCAs = caCertPool
2✔
205
        return tlsObject, nil
2✔
206
}
207

208
// newTLSConfig returns a tls.Config object from the given TLS config.
209
func newTLSObject(tlsConfig TLSConfig) *tls.Config {
48✔
210
        // Create a TLS config with default settings
48✔
211
        // #nosec G402
48✔
212
        tlsObject := &tls.Config{
48✔
213
                InsecureSkipVerify: tlsConfig.InsecureSkipTLSVerify,
48✔
214
                MinVersion:         tls.VersionTLS12,
48✔
215
        }
48✔
216

48✔
217
        // Set the minimum TLS version
48✔
218
        if tlsConfig.MinVersion != "" {
48✔
219
                if minVersion, ok := TLSVersions[tlsConfig.MinVersion]; ok {
×
220
                        tlsObject.MinVersion = minVersion
×
221
                }
×
222
        }
223

224
        return tlsObject
48✔
225
}
226

227
// fileExists returns nil if the given file exists and error otherwise.
228
func fileExists(filename string) *Xk6KafkaError {
25✔
229
        if _, err := os.Stat(filename); err != nil {
31✔
230
                return NewXk6KafkaError(fileNotFound, fmt.Sprintf("File not found: %s", filename), err)
6✔
231
        }
6✔
232
        return nil
19✔
233
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc