• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mostafa / xk6-kafka / 10814152930

11 Sep 2024 02:44PM UTC coverage: 75.27%. Remained the same
10814152930

push

github

web-flow
Update deps (#305)

* Update k6 to v0.53.0
* Update Go to v1.23.1
* Update deps
* Update Go version in actions
* Pin package to avoid conflicts

1534 of 2038 relevant lines covered (75.27%)

13.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.36
/serdes.go
1
package kafka
2

3
import (
4
        "github.com/riferrei/srclient"
5
        "go.k6.io/k6/js/common"
6
)
7

8
type Container struct {
9
        Data       interface{}         `json:"data"`
10
        Schema     *Schema             `json:"schema"`
11
        SchemaType srclient.SchemaType `json:"schemaType"`
12
}
13

14
// serialize checks whether the incoming data has a schema or not.
15
// If the data has a schema, it encodes the data into Avro, JSONSchema or Protocol Buffer.
16
// Then it adds the wire format prefix and returns the binary to be used in key or value.
17
// If no schema is passed, it treats the data as a byte array, a string or a JSON object without
18
// a JSONSchema. Then, it returns the data as a byte array.
19
// nolint: funlen
20
func (k *Kafka) serialize(container *Container) []byte {
40✔
21
        if container.Schema == nil {
74✔
22
                // we are dealing with a byte array, a string or a JSON object without a JSONSchema
34✔
23
                serde, err := GetSerdes(container.SchemaType)
34✔
24
                if err != nil {
35✔
25
                        common.Throw(k.vu.Runtime(), err)
1✔
26
                        return nil
1✔
27
                }
1✔
28

29
                data, err := serde.Serialize(container.Data, nil)
33✔
30
                if err != nil {
39✔
31
                        common.Throw(k.vu.Runtime(), err)
6✔
32
                        return nil
6✔
33
                }
6✔
34
                return data
27✔
35
        } else {
6✔
36
                // we are dealing with binary data to be encoded with Avro, JSONSchema or Protocol Buffer
6✔
37

6✔
38
                switch container.SchemaType {
6✔
39
                case srclient.Avro, srclient.Json:
6✔
40
                        serde, err := GetSerdes(container.SchemaType)
6✔
41
                        if err != nil {
6✔
42
                                common.Throw(k.vu.Runtime(), err)
×
43
                                return nil
×
44
                        }
×
45

46
                        bytesData, err := serde.Serialize(container.Data, container.Schema)
6✔
47
                        if err != nil {
9✔
48
                                common.Throw(k.vu.Runtime(), err)
3✔
49
                                return nil
3✔
50
                        }
3✔
51

52
                        return k.encodeWireFormat(bytesData, container.Schema.ID)
3✔
53
                default:
×
54
                        common.Throw(k.vu.Runtime(), ErrUnsupportedOperation)
×
55
                        return nil
×
56
                }
57
        }
58
}
59

60
// deserialize checks whether the incoming data has a schema or not.
61
// If the data has a schema, it removes the wire format prefix and decodes the data into JSON
62
// using Avro, JSONSchema or Protocol Buffer schemas. It returns the decoded data as JSON object.
63
// If no schema is passed, it treats the data as a byte array, a string or a JSON object without
64
// a JSONSchema. Then, it returns the data based on how it can decode it.
65
// nolint: funlen
66
func (k *Kafka) deserialize(container *Container) interface{} {
7✔
67
        if container.Schema == nil {
13✔
68
                // we are dealing with a byte array, a string or a JSON object without a JSONSchema
6✔
69
                serde, err := GetSerdes(container.SchemaType)
6✔
70
                if err != nil {
6✔
71
                        common.Throw(k.vu.Runtime(), err)
×
72
                        return nil
×
73
                }
×
74

75
                switch container.Data.(type) {
6✔
76
                case []byte:
6✔
77
                        switch container.SchemaType {
6✔
78
                        case String:
5✔
79
                                return string(container.Data.([]byte))
5✔
80
                        default:
1✔
81
                                if isJSON(container.Data.([]byte)) {
2✔
82
                                        js, err := toMap(container.Data.([]byte))
1✔
83
                                        if err != nil {
1✔
84
                                                common.Throw(k.vu.Runtime(), err)
×
85
                                                return nil
×
86
                                        }
×
87
                                        return js
1✔
88
                                }
89
                                return container.Data.([]byte)
×
90
                        }
91
                case string:
×
92
                        if isBase64Encoded(container.Data.(string)) {
×
93
                                if data, err := base64ToBytes(container.Data.(string)); err != nil {
×
94
                                        common.Throw(k.vu.Runtime(), err)
×
95
                                        return nil
×
96
                                } else {
×
97
                                        if result, err := serde.Deserialize(data, nil); err != nil {
×
98
                                                common.Throw(k.vu.Runtime(), err)
×
99
                                                return nil
×
100
                                        } else {
×
101
                                                return result
×
102
                                        }
×
103
                                }
104
                        }
105

106
                        return []byte(container.Data.(string))
×
107
                default:
×
108
                        return container.Data
×
109
                }
110
        } else {
1✔
111
                // we are dealing with binary data to be encoded with Avro, JSONSchema or Protocol Buffer
1✔
112
                runtime := k.vu.Runtime()
1✔
113

1✔
114
                var jsonBytes []byte
1✔
115

1✔
116
                switch container.Data.(type) {
1✔
117
                case []byte:
×
118
                        jsonBytes = container.Data.([]byte)
×
119
                case string:
1✔
120
                        // Decode the data into JSON bytes from base64-encoded data
1✔
121
                        if isBase64Encoded(container.Data.(string)) {
2✔
122
                                if data, err := base64ToBytes(container.Data.(string)); err != nil {
1✔
123
                                        common.Throw(k.vu.Runtime(), err)
×
124
                                        return nil
×
125
                                } else {
1✔
126
                                        jsonBytes = data
1✔
127
                                }
1✔
128
                        }
129
                }
130

131
                // Remove wire format prefix
132
                jsonBytes = k.decodeWireFormat(jsonBytes)
1✔
133

1✔
134
                switch container.SchemaType {
1✔
135
                case srclient.Avro, srclient.Json:
1✔
136
                        serde, err := GetSerdes(container.SchemaType)
1✔
137
                        if err != nil {
1✔
138
                                common.Throw(k.vu.Runtime(), err)
×
139
                                return nil
×
140
                        }
×
141

142
                        deserialized, err := serde.Deserialize(jsonBytes, container.Schema)
1✔
143
                        if err != nil {
1✔
144
                                common.Throw(k.vu.Runtime(), err)
×
145
                                return nil
×
146
                        }
×
147

148
                        if jsonObj, ok := deserialized.(map[string]interface{}); ok {
2✔
149
                                return jsonObj
1✔
150
                        } else {
1✔
151
                                common.Throw(k.vu.Runtime(), ErrInvalidDataType)
×
152
                                return nil
×
153
                        }
×
154
                default:
×
155
                        common.Throw(runtime, ErrUnsupportedOperation)
×
156
                        return nil
×
157
                }
158
        }
159
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc