• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mostafa / xk6-kafka / 10814152930

11 Sep 2024 02:44PM UTC coverage: 75.27%. Remained the same
10814152930

push

github

web-flow
Update deps (#305)

* Update k6 to v0.53.0
* Update Go to v1.23.1
* Update deps
* Update Go version in actions
* Pin package to avoid conflicts

1534 of 2038 relevant lines covered (75.27%)

13.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.45
/schema_registry.go
1
package kafka
2

3
import (
4
        "encoding/binary"
5
        "encoding/json"
6
        "fmt"
7
        "net/http"
8

9
        "github.com/grafana/sobek"
10
        "github.com/linkedin/goavro/v2"
11
        "github.com/riferrei/srclient"
12
        "github.com/santhosh-tekuri/jsonschema/v5"
13
        "go.k6.io/k6/js/common"
14
)
15

16
type Element string
17

18
const (
19
        Key                Element = "key"
20
        Value              Element = "value"
21
        MagicPrefixSize    int     = 5
22
        ConcurrentRequests int     = 16
23
)
24

25
type BasicAuth struct {
26
        Username string `json:"username"`
27
        Password string `json:"password"`
28
}
29

30
type SchemaRegistryConfig struct {
31
        EnableCaching bool      `json:"enableCaching"`
32
        URL           string    `json:"url"`
33
        BasicAuth     BasicAuth `json:"basicAuth"`
34
        TLS           TLSConfig `json:"tls"`
35
}
36

37
const (
38
        TopicNameStrategy       string = "TopicNameStrategy"
39
        RecordNameStrategy      string = "RecordNameStrategy"
40
        TopicRecordNameStrategy string = "TopicRecordNameStrategy"
41
)
42

43
// Schema is a wrapper around the schema registry schema.
44
// The Codec() and JsonSchema() methods will return the respective codecs (duck-typing).
45
type Schema struct {
46
        EnableCaching bool                 `json:"enableCaching"`
47
        ID            int                  `json:"id"`
48
        Schema        string               `json:"schema"`
49
        SchemaType    *srclient.SchemaType `json:"schemaType"`
50
        Version       int                  `json:"version"`
51
        References    []srclient.Reference `json:"references"`
52
        Subject       string               `json:"subject"`
53
        codec         *goavro.Codec
54
        jsonSchema    *jsonschema.Schema
55
}
56

57
type SubjectNameConfig struct {
58
        Schema              string  `json:"schema"`
59
        Topic               string  `json:"topic"`
60
        Element             Element `json:"element"`
61
        SubjectNameStrategy string  `json:"subjectNameStrategy"`
62
}
63

64
type WireFormat struct {
65
        SchemaID int    `json:"schemaId"`
66
        Data     []byte `json:"data"`
67
}
68

69
// Codec ensures access to Codec
70
// Will try to initialize a new one if it hasn't been initialized before
71
// Will return nil if it can't initialize a codec from the schema
72
func (s *Schema) Codec() *goavro.Codec {
11✔
73
        if s.codec == nil {
19✔
74
                codec, err := goavro.NewCodec(s.Schema)
8✔
75
                if err == nil {
16✔
76
                        s.codec = codec
8✔
77
                }
8✔
78
        }
79
        return s.codec
11✔
80
}
81

82
// JsonSchema ensures access to JsonSchema
83
// Will try to initialize a new one if it hasn't been initialized before
84
// Will return nil if it can't initialize a json schema from the schema
85
func (s *Schema) JsonSchema() *jsonschema.Schema {
5✔
86
        if s.jsonSchema == nil {
10✔
87
                jsonSchema, err := jsonschema.CompileString("schema.json", s.Schema)
5✔
88
                if err == nil {
8✔
89
                        s.jsonSchema = jsonSchema
3✔
90
                }
3✔
91
        }
92
        return s.jsonSchema
5✔
93
}
94

95
func (k *Kafka) schemaRegistryClientClass(call sobek.ConstructorCall) *sobek.Object {
1✔
96
        runtime := k.vu.Runtime()
1✔
97
        var configuration *SchemaRegistryConfig
1✔
98
        var schemaRegistryClient *srclient.SchemaRegistryClient
1✔
99

1✔
100
        if len(call.Arguments) == 1 {
2✔
101
                if params, ok := call.Argument(0).Export().(map[string]interface{}); ok {
2✔
102
                        if b, err := json.Marshal(params); err != nil {
1✔
103
                                common.Throw(runtime, err)
×
104
                        } else {
1✔
105
                                if err = json.Unmarshal(b, &configuration); err != nil {
1✔
106
                                        common.Throw(runtime, err)
×
107
                                }
×
108
                        }
109
                }
110

111
                schemaRegistryClient = k.schemaRegistryClient(configuration)
1✔
112
        }
113

114
        schemaRegistryClientObject := runtime.NewObject()
1✔
115
        // This is the schema registry client object itself
1✔
116
        if err := schemaRegistryClientObject.Set("This", schemaRegistryClient); err != nil {
1✔
117
                common.Throw(runtime, err)
×
118
        }
×
119

120
        err := schemaRegistryClientObject.Set("getSchema", func(call sobek.FunctionCall) sobek.Value {
2✔
121
                if len(call.Arguments) == 0 {
1✔
122
                        common.Throw(runtime, ErrNotEnoughArguments)
×
123
                }
×
124

125
                if schemaRegistryClient == nil {
1✔
126
                        common.Throw(runtime, ErrNoSchemaRegistryClient)
×
127
                }
×
128

129
                var schema *Schema
1✔
130
                if params, ok := call.Argument(0).Export().(map[string]interface{}); ok {
2✔
131
                        if b, err := json.Marshal(params); err != nil {
1✔
132
                                common.Throw(runtime, err)
×
133
                        } else {
1✔
134
                                if err = json.Unmarshal(b, &schema); err != nil {
1✔
135
                                        common.Throw(runtime, err)
×
136
                                }
×
137
                        }
138
                }
139

140
                return runtime.ToValue(k.getSchema(schemaRegistryClient, schema))
1✔
141
        })
142
        if err != nil {
1✔
143
                common.Throw(runtime, err)
×
144
        }
×
145

146
        err = schemaRegistryClientObject.Set("createSchema", func(call sobek.FunctionCall) sobek.Value {
2✔
147
                if len(call.Arguments) == 0 {
1✔
148
                        common.Throw(runtime, ErrNotEnoughArguments)
×
149
                }
×
150

151
                if schemaRegistryClient == nil {
1✔
152
                        common.Throw(runtime, ErrNoSchemaRegistryClient)
×
153
                }
×
154

155
                var schema *Schema
1✔
156
                if params, ok := call.Argument(0).Export().(map[string]interface{}); ok {
2✔
157
                        if b, err := json.Marshal(params); err != nil {
1✔
158
                                common.Throw(runtime, err)
×
159
                        } else {
1✔
160
                                if err = json.Unmarshal(b, &schema); err != nil {
1✔
161
                                        common.Throw(runtime, err)
×
162
                                }
×
163
                        }
164
                }
165

166
                return runtime.ToValue(k.createSchema(schemaRegistryClient, schema))
1✔
167
        })
168
        if err != nil {
1✔
169
                common.Throw(runtime, err)
×
170
        }
×
171

172
        var subjectNameConfig *SubjectNameConfig
1✔
173
        err = schemaRegistryClientObject.Set("getSubjectName", func(call sobek.FunctionCall) sobek.Value {
2✔
174
                if len(call.Arguments) == 0 {
1✔
175
                        common.Throw(runtime, ErrNotEnoughArguments)
×
176
                }
×
177

178
                if params, ok := call.Argument(0).Export().(map[string]interface{}); ok {
2✔
179
                        if b, err := json.Marshal(params); err != nil {
1✔
180
                                common.Throw(runtime, err)
×
181
                        } else {
1✔
182
                                if err = json.Unmarshal(b, &subjectNameConfig); err != nil {
1✔
183
                                        common.Throw(runtime, err)
×
184
                                }
×
185
                        }
186
                }
187

188
                return runtime.ToValue(k.getSubjectName(subjectNameConfig))
1✔
189
        })
190
        if err != nil {
1✔
191
                common.Throw(runtime, err)
×
192
        }
×
193

194
        err = schemaRegistryClientObject.Set("serialize", func(call sobek.FunctionCall) sobek.Value {
2✔
195
                if len(call.Arguments) == 0 {
1✔
196
                        common.Throw(runtime, ErrNotEnoughArguments)
×
197
                }
×
198

199
                var metadata *Container
1✔
200
                if params, ok := call.Argument(0).Export().(map[string]interface{}); ok {
2✔
201
                        if b, err := json.Marshal(params); err != nil {
1✔
202
                                common.Throw(runtime, err)
×
203
                        } else {
1✔
204
                                if err = json.Unmarshal(b, &metadata); err != nil {
1✔
205
                                        common.Throw(runtime, err)
×
206
                                }
×
207
                        }
208
                }
209

210
                return runtime.ToValue(k.serialize(metadata))
1✔
211
        })
212
        if err != nil {
1✔
213
                common.Throw(runtime, err)
×
214
        }
×
215

216
        err = schemaRegistryClientObject.Set("deserialize", func(call sobek.FunctionCall) sobek.Value {
2✔
217
                if len(call.Arguments) == 0 {
1✔
218
                        common.Throw(runtime, ErrNotEnoughArguments)
×
219
                }
×
220

221
                var metadata *Container
1✔
222
                if params, ok := call.Argument(0).Export().(map[string]interface{}); ok {
2✔
223
                        if b, err := json.Marshal(params); err != nil {
1✔
224
                                common.Throw(runtime, err)
×
225
                        } else {
1✔
226
                                if err = json.Unmarshal(b, &metadata); err != nil {
1✔
227
                                        common.Throw(runtime, err)
×
228
                                }
×
229
                        }
230
                }
231

232
                return runtime.ToValue(k.deserialize(metadata))
1✔
233
        })
234
        if err != nil {
1✔
235
                common.Throw(runtime, err)
×
236
        }
×
237

238
        return schemaRegistryClientObject
1✔
239
}
240

241
// schemaRegistryClient creates a schemaRegistryClient instance
242
// with the given configuration. It will also configure auth and TLS credentials if exists.
243
func (k *Kafka) schemaRegistryClient(config *SchemaRegistryConfig) *srclient.SchemaRegistryClient {
6✔
244
        runtime := k.vu.Runtime()
6✔
245
        var srClient *srclient.SchemaRegistryClient
6✔
246

6✔
247
        tlsConfig, err := GetTLSConfig(config.TLS)
6✔
248
        if err != nil {
12✔
249
                // Ignore the error if we're not using TLS
6✔
250
                if err.Code != noTLSConfig {
6✔
251
                        common.Throw(runtime, err)
×
252
                }
×
253
                srClient = srclient.CreateSchemaRegistryClient(config.URL)
6✔
254
        }
255

256
        if tlsConfig != nil {
6✔
257
                httpClient := &http.Client{
×
258
                        Transport: &http.Transport{
×
259
                                TLSClientConfig: tlsConfig,
×
260
                        },
×
261
                }
×
262
                srClient = srclient.CreateSchemaRegistryClientWithOptions(
×
263
                        config.URL, httpClient, ConcurrentRequests)
×
264
        }
×
265

266
        if config.BasicAuth.Username != "" && config.BasicAuth.Password != "" {
11✔
267
                srClient.SetCredentials(config.BasicAuth.Username, config.BasicAuth.Password)
5✔
268
        }
5✔
269

270
        // The default value for a boolean is false, so the caching
271
        // feature of the srclient package will be disabled.
272
        srClient.CachingEnabled(config.EnableCaching)
6✔
273

6✔
274
        return srClient
6✔
275
}
276

277
// getSchema returns the schema for the given subject and schema ID and version.
278
func (k *Kafka) getSchema(client *srclient.SchemaRegistryClient, schema *Schema) *Schema {
4✔
279
        // If EnableCache is set, check if the schema is in the cache.
4✔
280
        if schema.EnableCaching {
4✔
281
                if schema, ok := k.schemaCache[schema.Subject]; ok {
×
282
                        return schema
×
283
                }
×
284
        }
285

286
        runtime := k.vu.Runtime()
4✔
287
        // The client always caches the schema.
4✔
288
        var schemaInfo *srclient.Schema
4✔
289
        var err error
4✔
290
        // Default version of the schema is the latest version.
4✔
291
        if schema.Version == 0 {
8✔
292
                schemaInfo, err = client.GetLatestSchema(schema.Subject)
4✔
293
        } else {
4✔
294
                schemaInfo, err = client.GetSchemaByVersion(
×
295
                        schema.Subject, schema.Version)
×
296
        }
×
297

298
        if err == nil {
5✔
299
                wrappedSchema := &Schema{
1✔
300
                        EnableCaching: schema.EnableCaching,
1✔
301
                        ID:            schemaInfo.ID(),
1✔
302
                        Version:       schemaInfo.Version(),
1✔
303
                        Schema:        schemaInfo.Schema(),
1✔
304
                        SchemaType:    schemaInfo.SchemaType(),
1✔
305
                        References:    schemaInfo.References(),
1✔
306
                        Subject:       schema.Subject,
1✔
307
                }
1✔
308
                // If the Cache is set, cache the schema.
1✔
309
                if wrappedSchema.EnableCaching {
1✔
310
                        k.schemaCache[wrappedSchema.Subject] = wrappedSchema
×
311
                }
×
312
                return wrappedSchema
1✔
313
        } else {
3✔
314
                err := NewXk6KafkaError(schemaNotFound, "Failed to get schema from schema registry", err)
3✔
315
                common.Throw(runtime, err)
3✔
316
                return nil
3✔
317
        }
3✔
318
}
319

320
// createSchema creates a new schema in the schema registry.
321
func (k *Kafka) createSchema(client *srclient.SchemaRegistryClient, schema *Schema) *Schema {
1✔
322
        runtime := k.vu.Runtime()
1✔
323
        schemaInfo, err := client.CreateSchema(
1✔
324
                schema.Subject,
1✔
325
                schema.Schema,
1✔
326
                *schema.SchemaType,
1✔
327
                schema.References...)
1✔
328
        if err != nil {
1✔
329
                err := NewXk6KafkaError(schemaCreationFailed, "Failed to create schema.", err)
×
330
                common.Throw(runtime, err)
×
331
                return nil
×
332
        }
×
333

334
        wrappedSchema := &Schema{
1✔
335
                EnableCaching: schema.EnableCaching,
1✔
336
                ID:            schemaInfo.ID(),
1✔
337
                Version:       schemaInfo.Version(),
1✔
338
                Schema:        schemaInfo.Schema(),
1✔
339
                SchemaType:    schemaInfo.SchemaType(),
1✔
340
                References:    schemaInfo.References(),
1✔
341
                Subject:       schema.Subject,
1✔
342
        }
1✔
343
        if schema.EnableCaching {
1✔
344
                k.schemaCache[schema.Subject] = wrappedSchema
×
345
        }
×
346
        return wrappedSchema
1✔
347
}
348

349
// getSubjectName returns the subject name for the given schema and topic.
350
func (k *Kafka) getSubjectName(subjectNameConfig *SubjectNameConfig) string {
11✔
351
        if subjectNameConfig.SubjectNameStrategy == "" ||
11✔
352
                subjectNameConfig.SubjectNameStrategy == TopicNameStrategy {
15✔
353
                return subjectNameConfig.Topic + "-" + string(subjectNameConfig.Element)
4✔
354
        }
4✔
355

356
        runtime := k.vu.Runtime()
7✔
357
        var schemaMap map[string]interface{}
7✔
358
        err := json.Unmarshal([]byte(subjectNameConfig.Schema), &schemaMap)
7✔
359
        if err != nil {
8✔
360
                common.Throw(runtime, NewXk6KafkaError(
1✔
361
                        failedUnmarshalSchema, "Failed to unmarshal schema", err))
1✔
362
        }
1✔
363
        recordName := ""
6✔
364
        if namespace, ok := schemaMap["namespace"]; ok {
9✔
365
                if namespace, ok := namespace.(string); ok {
6✔
366
                        recordName = namespace + "."
3✔
367
                } else {
3✔
368
                        err := NewXk6KafkaError(failedTypeCast, "Failed to cast to string", nil)
×
369
                        common.Throw(runtime, err)
×
370
                }
×
371
        }
372
        if name, ok := schemaMap["name"]; ok {
12✔
373
                if name, ok := name.(string); ok {
12✔
374
                        recordName += name
6✔
375
                } else {
6✔
376
                        err := NewXk6KafkaError(failedTypeCast, "Failed to cast to string", nil)
×
377
                        common.Throw(runtime, err)
×
378
                }
×
379
        }
380

381
        if subjectNameConfig.SubjectNameStrategy == RecordNameStrategy {
8✔
382
                return recordName
2✔
383
        }
2✔
384
        if subjectNameConfig.SubjectNameStrategy == TopicRecordNameStrategy {
7✔
385
                return subjectNameConfig.Topic + "-" + recordName
3✔
386
        }
3✔
387

388
        err = NewXk6KafkaError(failedToEncode, fmt.Sprintf(
1✔
389
                "Unknown subject name strategy: %v", subjectNameConfig.SubjectNameStrategy), nil)
1✔
390
        common.Throw(runtime, err)
1✔
391
        return ""
1✔
392
}
393

394
// encodeWireFormat adds the proprietary 5-byte prefix to the Avro, ProtoBuf or
395
// JSONSchema payload.
396
// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
397
func (k *Kafka) encodeWireFormat(data []byte, schemaID int) []byte {
4✔
398
        schemaIDBytes := make([]byte, MagicPrefixSize-1)
4✔
399
        binary.BigEndian.PutUint32(schemaIDBytes, uint32(schemaID))
4✔
400
        return append(append([]byte{0}, schemaIDBytes...), data...)
4✔
401
}
4✔
402

403
// decodeWireFormat removes the proprietary 5-byte prefix from the Avro, ProtoBuf
404
// or JSONSchema payload.
405
// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
406
func (k *Kafka) decodeWireFormat(message []byte) []byte {
3✔
407
        runtime := k.vu.Runtime()
3✔
408
        if len(message) < MagicPrefixSize {
4✔
409
                err := NewXk6KafkaError(messageTooShort,
1✔
410
                        "Invalid message: message too short to contain schema id.", nil)
1✔
411
                common.Throw(runtime, err)
1✔
412
                return nil
1✔
413
        }
1✔
414
        if message[0] != 0 {
2✔
415
                err := NewXk6KafkaError(messageTooShort, "Invalid message: invalid start byte.", nil)
×
416
                common.Throw(runtime, err)
×
417
                return nil
×
418
        }
×
419
        return message[MagicPrefixSize:]
2✔
420
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc