• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hamba / avro / 11297064024

11 Oct 2024 05:57PM UTC coverage: 96.158% (-0.01%) from 96.168%
11297064024

push

github

web-flow
feat: improve OCF encoder/decoder handling of dynamic types (#467)

52 of 52 new or added lines in 1 file covered. (100.0%)

2 existing lines in 1 file now uncovered.

5706 of 5934 relevant lines covered (96.16%)

319.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.24
/ocf/ocf.go
1
// Package ocf implements encoding and decoding of Avro Object Container Files as defined by the Avro specification.
2
//
3
// See the Avro specification for an understanding of Avro: http://avro.apache.org/docs/current/
4
package ocf
5

6
import (
7
        "bytes"
8
        "crypto/rand"
9
        "encoding/json"
10
        "errors"
11
        "fmt"
12
        "io"
13
        "os"
14

15
        "github.com/hamba/avro/v2"
16
        "github.com/hamba/avro/v2/internal/bytesx"
17
)
18

19
const (
20
        schemaKey = "avro.schema"
21
        codecKey  = "avro.codec"
22
)
23

24
var (
25
        magicBytes = [4]byte{'O', 'b', 'j', 1}
26

27
        // HeaderSchema is the Avro schema of a container file header.
28
        HeaderSchema = avro.MustParse(`{
29
        "type": "record",
30
        "name": "org.apache.avro.file.Header",
31
        "fields": [
32
                {"name": "magic", "type": {"type": "fixed", "name": "Magic", "size": 4}},
33
                {"name": "meta", "type": {"type": "map", "values": "bytes"}},
34
                {"name": "sync", "type": {"type": "fixed", "name": "Sync", "size": 16}}
35
        ]
36
}`)
37

38
        // DefaultSchemaMarshaler calls the schema's String() method, to produce
39
        // a "canonical" schema.
40
        DefaultSchemaMarshaler = defaultMarshalSchema
41
        // FullSchemaMarshaler calls the schema's MarshalJSON() method, to produce
42
        // a schema with all details preserved. The "canonical" schema returned by
43
        // the default marshaler does not preserve a type's extra properties.
44
        FullSchemaMarshaler = fullMarshalSchema
45
)
46

47
// Header represents an Avro container file header.
48
type Header struct {
49
        Magic [4]byte           `avro:"magic"`
50
        Meta  map[string][]byte `avro:"meta"`
51
        Sync  [16]byte          `avro:"sync"`
52
}
53

54
type decoderConfig struct {
55
        DecoderConfig avro.API
56
        SchemaCache   *avro.SchemaCache
57
}
58

59
// DecoderFunc represents a configuration function for Decoder.
60
type DecoderFunc func(cfg *decoderConfig)
61

62
// WithDecoderConfig sets the value decoder config on the OCF decoder.
63
func WithDecoderConfig(wCfg avro.API) DecoderFunc {
2✔
64
        return func(cfg *decoderConfig) {
4✔
65
                cfg.DecoderConfig = wCfg
2✔
66
        }
2✔
67
}
68

69
// WithDecoderSchemaCache sets the schema cache for the decoder.
70
// If not specified, defaults to avro.DefaultSchemaCache.
71
func WithDecoderSchemaCache(cache *avro.SchemaCache) DecoderFunc {
2✔
72
        return func(cfg *decoderConfig) {
4✔
73
                cfg.SchemaCache = cache
2✔
74
        }
2✔
75
}
76

77
// Decoder reads and decodes Avro values from a container file.
78
type Decoder struct {
79
        reader      *avro.Reader
80
        resetReader *bytesx.ResetReader
81
        decoder     *avro.Decoder
82
        meta        map[string][]byte
83
        sync        [16]byte
84
        schema      avro.Schema
85

86
        codec Codec
87

88
        count int64
89
}
90

91
// NewDecoder returns a new decoder that reads from reader r.
92
func NewDecoder(r io.Reader, opts ...DecoderFunc) (*Decoder, error) {
44✔
93
        cfg := decoderConfig{
44✔
94
                DecoderConfig: avro.DefaultConfig,
44✔
95
                SchemaCache:   avro.DefaultSchemaCache,
44✔
96
        }
44✔
97
        for _, opt := range opts {
48✔
98
                opt(&cfg)
4✔
99
        }
4✔
100

101
        reader := avro.NewReader(r, 1024)
44✔
102

44✔
103
        h, err := readHeader(reader, cfg.SchemaCache)
44✔
104
        if err != nil {
52✔
105
                return nil, fmt.Errorf("decoder: %w", err)
8✔
106
        }
8✔
107

108
        decReader := bytesx.NewResetReader([]byte{})
36✔
109

36✔
110
        return &Decoder{
36✔
111
                reader:      reader,
36✔
112
                resetReader: decReader,
36✔
113
                decoder:     cfg.DecoderConfig.NewDecoder(h.Schema, decReader),
36✔
114
                meta:        h.Meta,
36✔
115
                sync:        h.Sync,
36✔
116
                codec:       h.Codec,
36✔
117
                schema:      h.Schema,
36✔
118
        }, nil
36✔
119
}
120

121
// Metadata returns the header metadata.
122
func (d *Decoder) Metadata() map[string][]byte {
2✔
123
        return d.meta
2✔
124
}
2✔
125

126
// Schema returns the schema that was parsed from the file's metadata
127
// and that is used to interpret the file's contents.
128
func (d *Decoder) Schema() avro.Schema {
2✔
129
        return d.schema
2✔
130
}
2✔
131

132
// HasNext determines if there is another value to read.
133
func (d *Decoder) HasNext() bool {
46✔
134
        if d.count <= 0 {
92✔
135
                count := d.readBlock()
46✔
136
                d.count = count
46✔
137
        }
46✔
138

139
        if d.reader.Error != nil {
72✔
140
                return false
26✔
141
        }
26✔
142

143
        return d.count > 0
20✔
144
}
145

146
// Decode reads the next Avro encoded value from its input and stores it in the value pointed to by v.
147
func (d *Decoder) Decode(v any) error {
22✔
148
        if d.count <= 0 {
24✔
149
                return errors.New("decoder: no data found, call HasNext first")
2✔
150
        }
2✔
151

152
        d.count--
20✔
153

20✔
154
        return d.decoder.Decode(v)
20✔
155
}
156

157
// Error returns the last reader error.
158
func (d *Decoder) Error() error {
22✔
159
        if errors.Is(d.reader.Error, io.EOF) {
32✔
160
                return nil
10✔
161
        }
10✔
162

163
        return d.reader.Error
12✔
164
}
165

166
func (d *Decoder) readBlock() int64 {
46✔
167
        _ = d.reader.Peek()
46✔
168
        if errors.Is(d.reader.Error, io.EOF) {
60✔
169
                // There is no next block
14✔
170
                return 0
14✔
171
        }
14✔
172

173
        count := d.reader.ReadLong()
32✔
174
        size := d.reader.ReadLong()
32✔
175

32✔
176
        // Read the blocks data
32✔
177
        if count > 0 {
64✔
178
                data := make([]byte, size)
32✔
179
                d.reader.Read(data)
32✔
180

32✔
181
                data, err := d.codec.Decode(data)
32✔
182
                if err != nil {
42✔
183
                        d.reader.Error = err
10✔
184
                }
10✔
185

186
                d.resetReader.Reset(data)
32✔
187
        }
188

189
        // Read the sync.
190
        var sync [16]byte
32✔
191
        d.reader.Read(sync[:])
32✔
192
        if d.sync != sync && !errors.Is(d.reader.Error, io.EOF) {
34✔
193
                d.reader.Error = errors.New("decoder: invalid block")
2✔
194
        }
2✔
195

196
        return count
32✔
197
}
198

199
type encoderConfig struct {
200
        BlockLength      int
201
        CodecName        CodecName
202
        CodecCompression int
203
        Metadata         map[string][]byte
204
        Sync             [16]byte
205
        EncodingConfig   avro.API
206
        SchemaCache      *avro.SchemaCache
207
        SchemaMarshaler  func(avro.Schema) ([]byte, error)
208
}
209

210
// EncoderFunc represents a configuration function for Encoder.
211
type EncoderFunc func(cfg *encoderConfig)
212

213
// WithBlockLength sets the block length on the encoder.
214
func WithBlockLength(length int) EncoderFunc {
6✔
215
        return func(cfg *encoderConfig) {
12✔
216
                cfg.BlockLength = length
6✔
217
        }
6✔
218
}
219

220
// WithCodec sets the compression codec on the encoder.
221
func WithCodec(codec CodecName) EncoderFunc {
8✔
222
        return func(cfg *encoderConfig) {
16✔
223
                cfg.CodecName = codec
8✔
224
        }
8✔
225
}
226

227
// WithCompressionLevel sets the compression codec to deflate and
228
// the compression level on the encoder.
229
func WithCompressionLevel(compLvl int) EncoderFunc {
2✔
230
        return func(cfg *encoderConfig) {
4✔
231
                cfg.CodecName = Deflate
2✔
232
                cfg.CodecCompression = compLvl
2✔
233
        }
2✔
234
}
235

236
// WithMetadata sets the metadata on the encoder header.
237
func WithMetadata(meta map[string][]byte) EncoderFunc {
2✔
238
        return func(cfg *encoderConfig) {
4✔
239
                cfg.Metadata = meta
2✔
240
        }
2✔
241
}
242

243
// WithEncoderSchemaCache sets the schema cache for the encoder.
244
// If not specified, defaults to avro.DefaultSchemaCache.
245
func WithEncoderSchemaCache(cache *avro.SchemaCache) EncoderFunc {
2✔
246
        return func(cfg *encoderConfig) {
4✔
247
                cfg.SchemaCache = cache
2✔
248
        }
2✔
249
}
250

251
// WithSchemaMarshaler sets the schema marshaler for the encoder.
252
// If not specified, defaults to DefaultSchemaMarshaler.
253
func WithSchemaMarshaler(m func(avro.Schema) ([]byte, error)) EncoderFunc {
2✔
254
        return func(cfg *encoderConfig) {
4✔
255
                cfg.SchemaMarshaler = m
2✔
256
        }
2✔
257
}
258

259
// WithSyncBlock sets the sync block.
260
func WithSyncBlock(sync [16]byte) EncoderFunc {
8✔
261
        return func(cfg *encoderConfig) {
16✔
262
                cfg.Sync = sync
8✔
263
        }
8✔
264
}
265

266
// WithEncodingConfig sets the value encoder config on the OCF encoder.
267
func WithEncodingConfig(wCfg avro.API) EncoderFunc {
4✔
268
        return func(cfg *encoderConfig) {
8✔
269
                cfg.EncodingConfig = wCfg
4✔
270
        }
4✔
271
}
272

273
// Encoder writes Avro container file to an output stream.
274
type Encoder struct {
275
        writer  *avro.Writer
276
        buf     *bytes.Buffer
277
        encoder *avro.Encoder
278
        sync    [16]byte
279

280
        codec Codec
281

282
        blockLength int
283
        count       int
284
}
285

286
// NewEncoder returns a new encoder that writes to w using schema s.
287
//
288
// If the writer is an existing ocf file, it will append data using the
289
// existing schema.
290
func NewEncoder(s string, w io.Writer, opts ...EncoderFunc) (*Encoder, error) {
46✔
291
        cfg := computeEncoderConfig(opts)
46✔
292
        schema, err := avro.ParseWithCache(s, "", cfg.SchemaCache)
46✔
293
        if err != nil {
48✔
294
                return nil, err
2✔
295
        }
2✔
296
        return newEncoder(schema, w, cfg)
44✔
297
}
298

299
// NewEncoderWithSchema returns a new encoder that writes to w using schema s.
300
//
301
// If the writer is an existing ocf file, it will append data using the
302
// existing schema.
303
func NewEncoderWithSchema(schema avro.Schema, w io.Writer, opts ...EncoderFunc) (*Encoder, error) {
2✔
304
        return newEncoder(schema, w, computeEncoderConfig(opts))
2✔
305
}
2✔
306

307
func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, error) {
46✔
308
        switch file := w.(type) {
46✔
309
        case nil:
2✔
310
                return nil, errors.New("writer cannot be nil")
2✔
311
        case *os.File:
2✔
312
                info, err := file.Stat()
2✔
313
                if err != nil {
2✔
314
                        return nil, err
×
315
                }
×
316

317
                if info.Size() > 0 {
4✔
318
                        reader := avro.NewReader(file, 1024)
2✔
319
                        h, err := readHeader(reader, cfg.SchemaCache)
2✔
320
                        if err != nil {
2✔
321
                                return nil, err
×
322
                        }
×
323
                        if err = skipToEnd(reader, h.Sync); err != nil {
2✔
324
                                return nil, err
×
325
                        }
×
326

327
                        writer := avro.NewWriter(w, 512, avro.WithWriterConfig(cfg.EncodingConfig))
2✔
328
                        buf := &bytes.Buffer{}
2✔
329
                        e := &Encoder{
2✔
330
                                writer:      writer,
2✔
331
                                buf:         buf,
2✔
332
                                encoder:     cfg.EncodingConfig.NewEncoder(h.Schema, buf),
2✔
333
                                sync:        h.Sync,
2✔
334
                                codec:       h.Codec,
2✔
335
                                blockLength: cfg.BlockLength,
2✔
336
                        }
2✔
337
                        return e, nil
2✔
338
                }
339
        }
340

341
        schemaJSON, err := cfg.SchemaMarshaler(schema)
42✔
342
        if err != nil {
42✔
UNCOV
343
                return nil, err
×
UNCOV
344
        }
×
345

346
        cfg.Metadata[schemaKey] = schemaJSON
42✔
347
        cfg.Metadata[codecKey] = []byte(cfg.CodecName)
42✔
348
        header := Header{
42✔
349
                Magic: magicBytes,
42✔
350
                Meta:  cfg.Metadata,
42✔
351
        }
42✔
352
        header.Sync = cfg.Sync
42✔
353
        if header.Sync == [16]byte{} {
76✔
354
                _, _ = rand.Read(header.Sync[:])
34✔
355
        }
34✔
356

357
        codec, err := resolveCodec(cfg.CodecName, cfg.CodecCompression)
42✔
358
        if err != nil {
44✔
359
                return nil, err
2✔
360
        }
2✔
361

362
        writer := avro.NewWriter(w, 512, avro.WithWriterConfig(cfg.EncodingConfig))
40✔
363
        writer.WriteVal(HeaderSchema, header)
40✔
364
        if err = writer.Flush(); err != nil {
42✔
365
                return nil, err
2✔
366
        }
2✔
367

368
        buf := &bytes.Buffer{}
38✔
369
        e := &Encoder{
38✔
370
                writer:      writer,
38✔
371
                buf:         buf,
38✔
372
                encoder:     cfg.EncodingConfig.NewEncoder(schema, buf),
38✔
373
                sync:        header.Sync,
38✔
374
                codec:       codec,
38✔
375
                blockLength: cfg.BlockLength,
38✔
376
        }
38✔
377
        return e, nil
38✔
378
}
379

380
func computeEncoderConfig(opts []EncoderFunc) encoderConfig {
48✔
381
        cfg := encoderConfig{
48✔
382
                BlockLength:      100,
48✔
383
                CodecName:        Null,
48✔
384
                CodecCompression: -1,
48✔
385
                Metadata:         map[string][]byte{},
48✔
386
                EncodingConfig:   avro.DefaultConfig,
48✔
387
                SchemaCache:      avro.DefaultSchemaCache,
48✔
388
                SchemaMarshaler:  DefaultSchemaMarshaler,
48✔
389
        }
48✔
390
        for _, opt := range opts {
82✔
391
                opt(&cfg)
34✔
392
        }
34✔
393
        return cfg
48✔
394
}
395

396
// Write v to the internal buffer. This method skips the internal encoder and
397
// therefore the caller is responsible for encoding the bytes. No error will be
398
// thrown if the bytes does not conform to the schema given to NewEncoder, but
399
// the final ocf data will be corrupted.
400
func (e *Encoder) Write(p []byte) (n int, err error) {
2✔
401
        n, err = e.buf.Write(p)
2✔
402
        if err != nil {
2✔
403
                return n, err
×
404
        }
×
405

406
        e.count++
2✔
407
        if e.count >= e.blockLength {
2✔
408
                if err = e.writerBlock(); err != nil {
×
409
                        return n, err
×
410
                }
×
411
        }
412

413
        return n, e.writer.Error
2✔
414
}
415

416
// Encode writes the Avro encoding of v to the stream.
417
func (e *Encoder) Encode(v any) error {
34✔
418
        if err := e.encoder.Encode(v); err != nil {
36✔
419
                return err
2✔
420
        }
2✔
421

422
        e.count++
32✔
423
        if e.count >= e.blockLength {
38✔
424
                if err := e.writerBlock(); err != nil {
8✔
425
                        return err
2✔
426
                }
2✔
427
        }
428

429
        return e.writer.Error
30✔
430
}
431

432
// Flush flushes the underlying writer.
433
func (e *Encoder) Flush() error {
36✔
434
        if e.count == 0 {
44✔
435
                return nil
8✔
436
        }
8✔
437

438
        if err := e.writerBlock(); err != nil {
30✔
439
                return err
2✔
440
        }
2✔
441

442
        return e.writer.Error
26✔
443
}
444

445
// Close closes the encoder, flushing the writer.
446
func (e *Encoder) Close() error {
36✔
447
        return e.Flush()
36✔
448
}
36✔
449

450
func (e *Encoder) writerBlock() error {
34✔
451
        e.writer.WriteLong(int64(e.count))
34✔
452

34✔
453
        b := e.codec.Encode(e.buf.Bytes())
34✔
454

34✔
455
        e.writer.WriteLong(int64(len(b)))
34✔
456
        _, _ = e.writer.Write(b)
34✔
457

34✔
458
        _, _ = e.writer.Write(e.sync[:])
34✔
459

34✔
460
        e.count = 0
34✔
461
        e.buf.Reset()
34✔
462
        return e.writer.Flush()
34✔
463
}
34✔
464

465
type ocfHeader struct {
466
        Schema avro.Schema
467
        Codec  Codec
468
        Meta   map[string][]byte
469
        Sync   [16]byte
470
}
471

472
func readHeader(reader *avro.Reader, schemaCache *avro.SchemaCache) (*ocfHeader, error) {
46✔
473
        var h Header
46✔
474
        reader.ReadVal(HeaderSchema, &h)
46✔
475
        if reader.Error != nil {
48✔
476
                return nil, fmt.Errorf("unexpected error: %w", reader.Error)
2✔
477
        }
2✔
478

479
        if h.Magic != magicBytes {
46✔
480
                return nil, errors.New("invalid avro file")
2✔
481
        }
2✔
482
        schema, err := avro.ParseBytesWithCache(h.Meta[schemaKey], "", schemaCache)
42✔
483
        if err != nil {
44✔
484
                return nil, err
2✔
485
        }
2✔
486

487
        codec, err := resolveCodec(CodecName(h.Meta[codecKey]), -1)
40✔
488
        if err != nil {
42✔
489
                return nil, err
2✔
490
        }
2✔
491

492
        return &ocfHeader{
38✔
493
                Schema: schema,
38✔
494
                Codec:  codec,
38✔
495
                Meta:   h.Meta,
38✔
496
                Sync:   h.Sync,
38✔
497
        }, nil
38✔
498
}
499

500
func skipToEnd(reader *avro.Reader, sync [16]byte) error {
2✔
501
        for {
6✔
502
                _ = reader.ReadLong()
4✔
503
                if errors.Is(reader.Error, io.EOF) {
6✔
504
                        return nil
2✔
505
                }
2✔
506
                size := reader.ReadLong()
2✔
507
                reader.SkipNBytes(int(size))
2✔
508
                if reader.Error != nil {
2✔
509
                        return reader.Error
×
510
                }
×
511

512
                var synMark [16]byte
2✔
513
                reader.Read(synMark[:])
2✔
514
                if sync != synMark && !errors.Is(reader.Error, io.EOF) {
2✔
515
                        reader.Error = errors.New("invalid block")
×
516
                }
×
517
        }
518
}
519

520
func defaultMarshalSchema(schema avro.Schema) ([]byte, error) {
40✔
521
        return []byte(schema.String()), nil
40✔
522
}
40✔
523

524
func fullMarshalSchema(schema avro.Schema) ([]byte, error) {
2✔
525
        return json.Marshal(schema)
2✔
526
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc