• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mostafa / xk6-kafka / 10814152930

11 Sep 2024 02:44PM UTC coverage: 75.27%. Remained the same
10814152930

push

github

web-flow
Update deps (#305)

* Update k6 to v0.53.0
* Update Go to v1.23.1
* Update deps
* Update Go version in actions
* Pin package to avoid conflicts

1534 of 2038 relevant lines covered (75.27%)

13.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.0
/topic.go
1
package kafka
2

3
import (
4
        "encoding/json"
5
        "net"
6
        "strconv"
7

8
        "github.com/grafana/sobek"
9
        kafkago "github.com/segmentio/kafka-go"
10
        "go.k6.io/k6/js/common"
11
)
12

13
type ConnectionConfig struct {
14
        Address string     `json:"address"`
15
        SASL    SASLConfig `json:"sasl"`
16
        TLS     TLSConfig  `json:"tls"`
17
}
18

19
// connectionClass is a constructor for the Connection object in JS
20
// that creates a new connection for creating, listing and deleting topics,
21
// e.g. new Connection(...).
22
// nolint: funlen
23
func (k *Kafka) connectionClass(call sobek.ConstructorCall) *sobek.Object {
1✔
24
        runtime := k.vu.Runtime()
1✔
25
        var connectionConfig *ConnectionConfig
1✔
26
        if len(call.Arguments) == 0 {
1✔
27
                common.Throw(runtime, ErrNotEnoughArguments)
×
28
        }
×
29

30
        if params, ok := call.Argument(0).Export().(map[string]interface{}); ok {
2✔
31
                if b, err := json.Marshal(params); err != nil {
1✔
32
                        common.Throw(runtime, err)
×
33
                } else {
1✔
34
                        if err = json.Unmarshal(b, &connectionConfig); err != nil {
1✔
35
                                common.Throw(runtime, err)
×
36
                        }
×
37
                }
38
        }
39

40
        connection := k.getKafkaControllerConnection(connectionConfig)
1✔
41

1✔
42
        connectionObject := runtime.NewObject()
1✔
43
        // This is the connection object itself
1✔
44
        if err := connectionObject.Set("This", connection); err != nil {
1✔
45
                common.Throw(runtime, err)
×
46
        }
×
47

48
        err := connectionObject.Set("createTopic", func(call sobek.FunctionCall) sobek.Value {
2✔
49
                var topicConfig *kafkago.TopicConfig
1✔
50
                if len(call.Arguments) == 0 {
1✔
51
                        common.Throw(runtime, ErrNotEnoughArguments)
×
52
                }
×
53

54
                if params, ok := call.Argument(0).Export().(map[string]interface{}); ok {
2✔
55
                        if b, err := json.Marshal(params); err != nil {
1✔
56
                                common.Throw(runtime, err)
×
57
                        } else {
1✔
58
                                if err = json.Unmarshal(b, &topicConfig); err != nil {
1✔
59
                                        common.Throw(runtime, err)
×
60
                                }
×
61
                        }
62
                }
63

64
                k.createTopic(connection, topicConfig)
1✔
65
                return sobek.Undefined()
1✔
66
        })
67
        if err != nil {
1✔
68
                common.Throw(runtime, err)
×
69
        }
×
70

71
        err = connectionObject.Set("deleteTopic", func(call sobek.FunctionCall) sobek.Value {
2✔
72
                if len(call.Arguments) > 0 {
2✔
73
                        if topic, ok := call.Argument(0).Export().(string); !ok {
1✔
74
                                common.Throw(runtime, ErrNotEnoughArguments)
×
75
                        } else {
1✔
76
                                k.deleteTopic(connection, topic)
1✔
77
                        }
1✔
78
                }
79

80
                return sobek.Undefined()
1✔
81
        })
82
        if err != nil {
1✔
83
                common.Throw(runtime, err)
×
84
        }
×
85

86
        err = connectionObject.Set("listTopics", func(call sobek.FunctionCall) sobek.Value {
3✔
87
                topics := k.listTopics(connection)
2✔
88
                return runtime.ToValue(topics)
2✔
89
        })
2✔
90
        if err != nil {
1✔
91
                common.Throw(runtime, err)
×
92
        }
×
93

94
        err = connectionObject.Set("close", func(call sobek.FunctionCall) sobek.Value {
2✔
95
                if err := connection.Close(); err != nil {
1✔
96
                        common.Throw(runtime, err)
×
97
                }
×
98

99
                return sobek.Undefined()
1✔
100
        })
101
        if err != nil {
1✔
102
                common.Throw(runtime, err)
×
103
        }
×
104

105
        return connectionObject
1✔
106
}
107

108
// getKafkaControllerConnection returns a kafka controller connection with a given node address.
109
// It will also try to use the auth and TLS settings to create a secure connection. The connection
110
// should be closed after use.
111
func (k *Kafka) getKafkaControllerConnection(connectionConfig *ConnectionConfig) *kafkago.Conn {
10✔
112
        dialer, wrappedError := GetDialer(connectionConfig.SASL, connectionConfig.TLS)
10✔
113
        if wrappedError != nil {
10✔
114
                logger.WithField("error", wrappedError).Error(wrappedError)
×
115
                if dialer == nil {
×
116
                        common.Throw(k.vu.Runtime(), wrappedError)
×
117
                        return nil
×
118
                }
×
119
        }
120

121
        ctx := k.vu.Context()
10✔
122
        if ctx == nil {
10✔
123
                err := NewXk6KafkaError(noContextError, "No context.", nil)
×
124
                logger.WithField("error", err).Info(err)
×
125
                common.Throw(k.vu.Runtime(), err)
×
126
                return nil
×
127
        }
×
128

129
        conn, err := dialer.DialContext(ctx, "tcp", connectionConfig.Address)
10✔
130
        if err != nil {
11✔
131
                wrappedError := NewXk6KafkaError(dialerError, "Failed to create dialer.", err)
1✔
132
                logger.WithField("error", wrappedError).Error(wrappedError)
1✔
133
                common.Throw(k.vu.Runtime(), wrappedError)
1✔
134
                return nil
1✔
135
        }
1✔
136

137
        controller, err := conn.Controller()
9✔
138
        if err != nil {
9✔
139
                wrappedError := NewXk6KafkaError(failedGetController, "Failed to get controller.", err)
×
140
                logger.WithField("error", wrappedError).Error(wrappedError)
×
141
                common.Throw(k.vu.Runtime(), wrappedError)
×
142
                return nil
×
143
        }
×
144

145
        controllerConn, err := dialer.DialContext(
9✔
146
                ctx, "tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
9✔
147
        if err != nil {
9✔
148
                wrappedError := NewXk6KafkaError(failedGetController, "Failed to get controller.", err)
×
149
                logger.WithField("error", wrappedError).Error(wrappedError)
×
150
                common.Throw(k.vu.Runtime(), wrappedError)
×
151
                return nil
×
152
        }
×
153

154
        return controllerConn
9✔
155
}
156

157
// createTopic creates a topic with the given name, partitions, replication factor and compression.
158
// It will also try to use the auth and TLS settings to create a secure connection. If the topic
159
// already exists, it will do no-op.
160
func (k *Kafka) createTopic(conn *kafkago.Conn, topicConfig *kafkago.TopicConfig) {
6✔
161
        if topicConfig.NumPartitions <= 0 {
12✔
162
                topicConfig.NumPartitions = 1
6✔
163
        }
6✔
164

165
        if topicConfig.ReplicationFactor <= 0 {
12✔
166
                topicConfig.ReplicationFactor = 1
6✔
167
        }
6✔
168

169
        err := conn.CreateTopics(*topicConfig)
6✔
170
        if err != nil {
6✔
171
                wrappedError := NewXk6KafkaError(failedCreateTopic, "Failed to create topic.", err)
×
172
                logger.WithField("error", wrappedError).Error(wrappedError)
×
173
                common.Throw(k.vu.Runtime(), wrappedError)
×
174
        }
×
175
}
176

177
// deleteTopic deletes the given topic from the given address. It will also try to
178
// use the auth and TLS settings to create a secure connection. If the topic
179
// does not exist, it will raise an error.
180
func (k *Kafka) deleteTopic(conn *kafkago.Conn, topic string) {
2✔
181
        err := conn.DeleteTopics([]string{topic}...)
2✔
182
        if err != nil {
2✔
183
                wrappedError := NewXk6KafkaError(failedDeleteTopic, "Failed to delete topic.", err)
×
184
                logger.WithField("error", wrappedError).Error(wrappedError)
×
185
                common.Throw(k.vu.Runtime(), wrappedError)
×
186
        }
×
187
}
188

189
// listTopics lists the topics from the given address. It will also try to
190
// use the auth and TLS settings to create a secure connection. If the topic
191
// does not exist, it will raise an error.
192
func (k *Kafka) listTopics(conn *kafkago.Conn) []string {
6✔
193
        partitions, err := conn.ReadPartitions()
6✔
194
        if err != nil {
6✔
195
                wrappedError := NewXk6KafkaError(failedReadPartitions, "Failed to read partitions.", err)
×
196
                logger.WithField("error", wrappedError).Error(wrappedError)
×
197
                common.Throw(k.vu.Runtime(), wrappedError)
×
198
                return nil
×
199
        }
×
200

201
        // There should be a better way to return unique set of
202
        // topics instead of looping over them twice
203
        topicSet := map[string]struct{}{}
6✔
204

6✔
205
        for _, partition := range partitions {
458✔
206
                topicSet[partition.Topic] = struct{}{}
452✔
207
        }
452✔
208

209
        topics := make([]string, 0, len(topicSet))
6✔
210
        for topic := range topicSet {
82✔
211
                topics = append(topics, topic)
76✔
212
        }
76✔
213

214
        return topics
6✔
215
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc