• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fdd2-f4a4-4c9a-97b4-6604937bf7be

09 May 2023 12:23AM UTC coverage: 57.253% (-0.002%) from 57.255%
0187fdd2-f4a4-4c9a-97b4-6604937bf7be

Pull #5252

buildkite

David Porter
Merge branch 'master' into feature/zonal-partitioning
Pull Request #5252: Feature/zonal partitioning

1460 of 1460 new or added lines in 51 files covered. (100.0%)

86909 of 151799 relevant lines covered (57.25%)

2482.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.83
/common/persistence/serializer.go
1
// Copyright (c) 2017 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package persistence
22

23
import (
24
        "encoding/json"
25
        "fmt"
26

27
        "github.com/uber/cadence/.gen/go/config"
28
        "github.com/uber/cadence/.gen/go/history"
29
        "github.com/uber/cadence/.gen/go/replicator"
30
        workflow "github.com/uber/cadence/.gen/go/shared"
31
        "github.com/uber/cadence/common"
32
        "github.com/uber/cadence/common/codec"
33
        "github.com/uber/cadence/common/types"
34
        "github.com/uber/cadence/common/types/mapper/thrift"
35
)
36

37
type (
38
        // PayloadSerializer is used by persistence to serialize/deserialize history event(s) and others
39
        // It will only be used inside persistence, so that serialize/deserialize is transparent for application
40
        PayloadSerializer interface {
41
                // serialize/deserialize history events
42
                SerializeBatchEvents(batch []*types.HistoryEvent, encodingType common.EncodingType) (*DataBlob, error)
43
                DeserializeBatchEvents(data *DataBlob) ([]*types.HistoryEvent, error)
44

45
                // serialize/deserialize a single history event
46
                SerializeEvent(event *types.HistoryEvent, encodingType common.EncodingType) (*DataBlob, error)
47
                DeserializeEvent(data *DataBlob) (*types.HistoryEvent, error)
48

49
                // serialize/deserialize visibility memo fields
50
                SerializeVisibilityMemo(memo *types.Memo, encodingType common.EncodingType) (*DataBlob, error)
51
                DeserializeVisibilityMemo(data *DataBlob) (*types.Memo, error)
52

53
                // serialize/deserialize reset points
54
                SerializeResetPoints(event *types.ResetPoints, encodingType common.EncodingType) (*DataBlob, error)
55
                DeserializeResetPoints(data *DataBlob) (*types.ResetPoints, error)
56

57
                // serialize/deserialize bad binaries
58
                SerializeBadBinaries(event *types.BadBinaries, encodingType common.EncodingType) (*DataBlob, error)
59
                DeserializeBadBinaries(data *DataBlob) (*types.BadBinaries, error)
60

61
                // serialize/deserialize version histories
62
                SerializeVersionHistories(histories *types.VersionHistories, encodingType common.EncodingType) (*DataBlob, error)
63
                DeserializeVersionHistories(data *DataBlob) (*types.VersionHistories, error)
64

65
                // serialize/deserialize pending failover markers
66
                SerializePendingFailoverMarkers(markers []*types.FailoverMarkerAttributes, encodingType common.EncodingType) (*DataBlob, error)
67
                DeserializePendingFailoverMarkers(data *DataBlob) ([]*types.FailoverMarkerAttributes, error)
68

69
                // serialize/deserialize processing queue statesss
70
                SerializeProcessingQueueStates(states *types.ProcessingQueueStates, encodingType common.EncodingType) (*DataBlob, error)
71
                DeserializeProcessingQueueStates(data *DataBlob) (*types.ProcessingQueueStates, error)
72

73
                // serialize/deserialize DynamicConfigBlob
74
                SerializeDynamicConfigBlob(blob *types.DynamicConfigBlob, encodingType common.EncodingType) (*DataBlob, error)
75
                DeserializeDynamicConfigBlob(data *DataBlob) (*types.DynamicConfigBlob, error)
76

77
                SerializeIsolationGroups(event *types.IsolationGroupConfiguration, encodingType common.EncodingType) (*DataBlob, error)
78
                DeserializeIsolationGroups(data *DataBlob) (*types.IsolationGroupConfiguration, error)
79
        }
80

81
        // CadenceSerializationError is an error type for cadence serialization
82
        CadenceSerializationError struct {
83
                msg string
84
        }
85

86
        // CadenceDeserializationError is an error type for cadence deserialization
87
        CadenceDeserializationError struct {
88
                msg string
89
        }
90

91
        // UnknownEncodingTypeError is an error type for unknown or unsupported encoding type
92
        UnknownEncodingTypeError struct {
93
                encodingType common.EncodingType
94
        }
95

96
        serializerImpl struct {
97
                thriftrwEncoder codec.BinaryEncoder
98
        }
99
)
100

101
// NewPayloadSerializer returns a PayloadSerializer
102
func NewPayloadSerializer() PayloadSerializer {
452✔
103
        return &serializerImpl{
452✔
104
                thriftrwEncoder: codec.NewThriftRWEncoder(),
452✔
105
        }
452✔
106
}
452✔
107

108
func (t *serializerImpl) SerializeBatchEvents(events []*types.HistoryEvent, encodingType common.EncodingType) (*DataBlob, error) {
4,330✔
109
        return t.serialize(events, encodingType)
4,330✔
110
}
4,330✔
111

112
func (t *serializerImpl) DeserializeBatchEvents(data *DataBlob) ([]*types.HistoryEvent, error) {
10,323✔
113
        if data == nil {
10,323✔
114
                return nil, nil
×
115
        }
×
116
        var events []*types.HistoryEvent
10,323✔
117
        if data != nil && len(data.Data) == 0 {
10,323✔
118
                return events, nil
×
119
        }
×
120
        err := t.deserialize(data, &events)
10,323✔
121
        return events, err
10,323✔
122
}
123

124
func (t *serializerImpl) SerializeEvent(event *types.HistoryEvent, encodingType common.EncodingType) (*DataBlob, error) {
7,724✔
125
        if event == nil {
15,444✔
126
                return nil, nil
7,720✔
127
        }
7,720✔
128
        return t.serialize(event, encodingType)
4✔
129
}
130

131
func (t *serializerImpl) DeserializeEvent(data *DataBlob) (*types.HistoryEvent, error) {
956✔
132
        if data == nil {
1,909✔
133
                return nil, nil
953✔
134
        }
953✔
135
        var event types.HistoryEvent
3✔
136
        err := t.deserialize(data, &event)
3✔
137
        return &event, err
3✔
138
}
139

140
func (t *serializerImpl) SerializeResetPoints(rp *types.ResetPoints, encodingType common.EncodingType) (*DataBlob, error) {
5,094✔
141
        if rp == nil {
5,617✔
142
                rp = &types.ResetPoints{}
523✔
143
        }
523✔
144
        return t.serialize(rp, encodingType)
5,094✔
145
}
146

147
func (t *serializerImpl) DeserializeResetPoints(data *DataBlob) (*types.ResetPoints, error) {
747✔
148
        var rp types.ResetPoints
747✔
149
        err := t.deserialize(data, &rp)
747✔
150
        return &rp, err
747✔
151
}
747✔
152

153
func (t *serializerImpl) SerializeBadBinaries(bb *types.BadBinaries, encodingType common.EncodingType) (*DataBlob, error) {
68✔
154
        if bb == nil {
69✔
155
                bb = &types.BadBinaries{}
1✔
156
        }
1✔
157
        return t.serialize(bb, encodingType)
68✔
158
}
159

160
func (t *serializerImpl) DeserializeBadBinaries(data *DataBlob) (*types.BadBinaries, error) {
4,669✔
161
        var bb types.BadBinaries
4,669✔
162
        err := t.deserialize(data, &bb)
4,669✔
163
        return &bb, err
4,669✔
164
}
4,669✔
165

166
func (t *serializerImpl) SerializeVisibilityMemo(memo *types.Memo, encodingType common.EncodingType) (*DataBlob, error) {
1,501✔
167
        if memo == nil {
2,878✔
168
                // Return nil here to be consistent with Event
1,377✔
169
                // This check is not duplicate as check in following serialize
1,377✔
170
                return nil, nil
1,377✔
171
        }
1,377✔
172
        return t.serialize(memo, encodingType)
124✔
173
}
174

175
func (t *serializerImpl) DeserializeVisibilityMemo(data *DataBlob) (*types.Memo, error) {
343✔
176
        var memo types.Memo
343✔
177
        err := t.deserialize(data, &memo)
343✔
178
        return &memo, err
343✔
179
}
343✔
180

181
func (t *serializerImpl) SerializeVersionHistories(histories *types.VersionHistories, encodingType common.EncodingType) (*DataBlob, error) {
5,093✔
182
        if histories == nil {
5,094✔
183
                return nil, nil
1✔
184
        }
1✔
185
        return t.serialize(histories, encodingType)
5,092✔
186
}
187

188
func (t *serializerImpl) DeserializeVersionHistories(data *DataBlob) (*types.VersionHistories, error) {
747✔
189
        var histories types.VersionHistories
747✔
190
        err := t.deserialize(data, &histories)
747✔
191
        return &histories, err
747✔
192
}
747✔
193

194
func (t *serializerImpl) SerializePendingFailoverMarkers(
195
        markers []*types.FailoverMarkerAttributes,
196
        encodingType common.EncodingType,
197
) (*DataBlob, error) {
167✔
198

167✔
199
        if markers == nil {
334✔
200
                return nil, nil
167✔
201
        }
167✔
202
        return t.serialize(markers, encodingType)
×
203
}
204

205
func (t *serializerImpl) DeserializePendingFailoverMarkers(
206
        data *DataBlob,
207
) ([]*types.FailoverMarkerAttributes, error) {
36✔
208

36✔
209
        if data == nil {
72✔
210
                return nil, nil
36✔
211
        }
36✔
212
        var markers []*types.FailoverMarkerAttributes
×
213
        if data != nil && len(data.Data) == 0 {
×
214
                return markers, nil
×
215
        }
×
216
        err := t.deserialize(data, &markers)
×
217
        return markers, err
×
218
}
219

220
func (t *serializerImpl) SerializeProcessingQueueStates(
221
        states *types.ProcessingQueueStates,
222
        encodingType common.EncodingType,
223
) (*DataBlob, error) {
500✔
224
        if states == nil {
660✔
225
                return nil, nil
160✔
226
        }
160✔
227
        return t.serialize(states, encodingType)
343✔
228
}
229

230
func (t *serializerImpl) DeserializeProcessingQueueStates(
231
        data *DataBlob,
232
) (*types.ProcessingQueueStates, error) {
113✔
233
        if data == nil {
223✔
234
                return nil, nil
110✔
235
        }
110✔
236

237
        var states types.ProcessingQueueStates
3✔
238
        if data != nil && len(data.Data) == 0 {
3✔
239
                return &states, nil
×
240
        }
×
241
        err := t.deserialize(data, &states)
3✔
242
        return &states, err
3✔
243
}
244

245
func (t *serializerImpl) SerializeDynamicConfigBlob(blob *types.DynamicConfigBlob, encodingType common.EncodingType) (*DataBlob, error) {
3✔
246
        if blob == nil {
4✔
247
                return nil, nil
1✔
248
        }
1✔
249
        return t.serialize(blob, encodingType)
2✔
250
}
251

252
func (t *serializerImpl) DeserializeDynamicConfigBlob(data *DataBlob) (*types.DynamicConfigBlob, error) {
3✔
253
        if data == nil {
4✔
254
                return nil, nil
1✔
255
        }
1✔
256

257
        var blob types.DynamicConfigBlob
2✔
258
        if len(data.Data) == 0 {
2✔
259
                return &blob, nil
×
260
        }
×
261

262
        err := t.deserialize(data, &blob)
2✔
263
        return &blob, err
2✔
264
}
265

266
func (t *serializerImpl) SerializeIsolationGroups(c *types.IsolationGroupConfiguration, encodingType common.EncodingType) (*DataBlob, error) {
69✔
267
        if c == nil {
71✔
268
                return nil, nil
2✔
269
        }
2✔
270
        return t.serialize(c, encodingType)
67✔
271
}
272

273
func (t *serializerImpl) DeserializeIsolationGroups(data *DataBlob) (*types.IsolationGroupConfiguration, error) {
4,670✔
274
        if data == nil {
4,992✔
275
                return nil, nil
322✔
276
        }
322✔
277

278
        var cfg types.IsolationGroupConfiguration
4,349✔
279
        if len(data.Data) == 0 {
4,349✔
280
                return &cfg, nil
×
281
        }
×
282

283
        err := t.deserialize(data, &cfg)
4,349✔
284
        return &cfg, err
4,349✔
285
}
286

287
func (t *serializerImpl) serialize(input interface{}, encodingType common.EncodingType) (*DataBlob, error) {
15,109✔
288
        if input == nil {
15,109✔
289
                return nil, nil
×
290
        }
×
291

292
        var data []byte
15,109✔
293
        var err error
15,109✔
294

15,109✔
295
        switch encodingType {
15,109✔
296
        case common.EncodingTypeThriftRW:
15,086✔
297
                data, err = t.thriftrwEncode(input)
15,086✔
298
        case common.EncodingTypeJSON, common.EncodingTypeUnknown, common.EncodingTypeEmpty: // For backward-compatibility
17✔
299
                encodingType = common.EncodingTypeJSON
17✔
300
                data, err = json.Marshal(input)
17✔
301
        default:
6✔
302
                return nil, NewUnknownEncodingTypeError(encodingType)
6✔
303
        }
304

305
        if err != nil {
15,103✔
306
                return nil, NewCadenceSerializationError(err.Error())
×
307
        }
×
308
        return NewDataBlob(data, encodingType), nil
15,103✔
309
}
310

311
func (t *serializerImpl) thriftrwEncode(input interface{}) ([]byte, error) {
15,086✔
312

15,086✔
313
        switch input := input.(type) {
15,086✔
314
        case []*types.HistoryEvent:
4,327✔
315
                return t.thriftrwEncoder.Encode(&workflow.History{Events: thrift.FromHistoryEventArray(input)})
4,327✔
316
        case *types.HistoryEvent:
1✔
317
                return t.thriftrwEncoder.Encode(thrift.FromHistoryEvent(input))
1✔
318
        case *types.Memo:
121✔
319
                return t.thriftrwEncoder.Encode(thrift.FromMemo(input))
121✔
320
        case *types.ResetPoints:
5,091✔
321
                return t.thriftrwEncoder.Encode(thrift.FromResetPoints(input))
5,091✔
322
        case *types.BadBinaries:
65✔
323
                return t.thriftrwEncoder.Encode(thrift.FromBadBinaries(input))
65✔
324
        case *types.VersionHistories:
5,090✔
325
                return t.thriftrwEncoder.Encode(thrift.FromVersionHistories(input))
5,090✔
326
        case []*types.FailoverMarkerAttributes:
×
327
                return t.thriftrwEncoder.Encode(&replicator.FailoverMarkers{FailoverMarkers: thrift.FromFailoverMarkerAttributesArray(input)})
×
328
        case *types.ProcessingQueueStates:
340✔
329
                return t.thriftrwEncoder.Encode(thrift.FromProcessingQueueStates(input))
340✔
330
        case *types.DynamicConfigBlob:
1✔
331
                return t.thriftrwEncoder.Encode(thrift.FromDynamicConfigBlob(input))
1✔
332
        case *types.IsolationGroupConfiguration:
65✔
333
                return t.thriftrwEncoder.Encode(thrift.FromIsolationGroupConfig(input))
65✔
334
        default:
×
335
                return nil, nil
×
336
        }
337
}
338

339
func (t *serializerImpl) deserialize(data *DataBlob, target interface{}) error {
21,174✔
340
        if data == nil {
21,811✔
341
                return nil
637✔
342
        }
637✔
343
        if len(data.Data) == 0 {
20,538✔
344
                return NewCadenceDeserializationError("DeserializeEvent empty data")
×
345
        }
×
346
        var err error
20,538✔
347

20,538✔
348
        switch data.GetEncoding() {
20,538✔
349
        case common.EncodingTypeThriftRW:
20,521✔
350
                err = t.thriftrwDecode(data.Data, target)
20,521✔
351
        case common.EncodingTypeJSON, common.EncodingTypeUnknown, common.EncodingTypeEmpty: // For backward-compatibility
17✔
352
                err = json.Unmarshal(data.Data, target)
17✔
353
        default:
×
354
                return NewUnknownEncodingTypeError(data.GetEncoding())
×
355
        }
356

357
        if err != nil {
20,538✔
358
                return NewCadenceDeserializationError(fmt.Sprintf("DeserializeBatchEvents encoding: \"%v\", error: %v", data.Encoding, err.Error()))
×
359
        }
×
360
        return nil
20,538✔
361
}
362

363
func (t *serializerImpl) thriftrwDecode(data []byte, target interface{}) error {
20,521✔
364
        switch target := target.(type) {
20,521✔
365
        case *[]*types.HistoryEvent:
10,321✔
366
                thriftTarget := workflow.History{}
10,321✔
367
                if err := t.thriftrwEncoder.Decode(data, &thriftTarget); err != nil {
10,321✔
368
                        return err
×
369
                }
×
370
                *target = thrift.ToHistoryEventArray(thriftTarget.GetEvents())
10,321✔
371
                return nil
10,321✔
372
        case *types.HistoryEvent:
1✔
373
                thriftTarget := workflow.HistoryEvent{}
1✔
374
                if err := t.thriftrwEncoder.Decode(data, &thriftTarget); err != nil {
1✔
375
                        return err
×
376
                }
×
377
                *target = *thrift.ToHistoryEvent(&thriftTarget)
1✔
378
                return nil
1✔
379
        case *types.Memo:
28✔
380
                thriftTarget := workflow.Memo{}
28✔
381
                if err := t.thriftrwEncoder.Decode(data, &thriftTarget); err != nil {
28✔
382
                        return err
×
383
                }
×
384
                *target = *thrift.ToMemo(&thriftTarget)
28✔
385
                return nil
28✔
386
        case *types.ResetPoints:
744✔
387
                thriftTarget := workflow.ResetPoints{}
744✔
388
                if err := t.thriftrwEncoder.Decode(data, &thriftTarget); err != nil {
744✔
389
                        return err
×
390
                }
×
391
                *target = *thrift.ToResetPoints(&thriftTarget)
744✔
392
                return nil
744✔
393
        case *types.BadBinaries:
4,347✔
394
                thriftTarget := workflow.BadBinaries{}
4,347✔
395
                if err := t.thriftrwEncoder.Decode(data, &thriftTarget); err != nil {
4,347✔
396
                        return err
×
397
                }
×
398
                *target = *thrift.ToBadBinaries(&thriftTarget)
4,347✔
399
                return nil
4,347✔
400
        case *types.VersionHistories:
743✔
401
                thriftTarget := workflow.VersionHistories{}
743✔
402
                if err := t.thriftrwEncoder.Decode(data, &thriftTarget); err != nil {
743✔
403
                        return err
×
404
                }
×
405
                *target = *thrift.ToVersionHistories(&thriftTarget)
743✔
406
                return nil
743✔
407
        case *[]*types.FailoverMarkerAttributes:
×
408
                thriftTarget := replicator.FailoverMarkers{}
×
409
                if err := t.thriftrwEncoder.Decode(data, &thriftTarget); err != nil {
×
410
                        return err
×
411
                }
×
412
                *target = thrift.ToFailoverMarkerAttributesArray(thriftTarget.GetFailoverMarkers())
×
413
                return nil
×
414
        case *types.ProcessingQueueStates:
1✔
415
                thriftTarget := history.ProcessingQueueStates{}
1✔
416
                if err := t.thriftrwEncoder.Decode(data, &thriftTarget); err != nil {
1✔
417
                        return err
×
418
                }
×
419
                *target = *thrift.ToProcessingQueueStates(&thriftTarget)
1✔
420
                return nil
1✔
421
        case *types.DynamicConfigBlob:
1✔
422
                thriftTarget := config.DynamicConfigBlob{}
1✔
423
                if err := t.thriftrwEncoder.Decode(data, &thriftTarget); err != nil {
1✔
424
                        return err
×
425
                }
×
426
                *target = *thrift.ToDynamicConfigBlob(&thriftTarget)
1✔
427
                return nil
1✔
428
        case *types.IsolationGroupConfiguration:
4,347✔
429
                thriftTarget := workflow.IsolationGroupConfiguration{}
4,347✔
430
                if err := t.thriftrwEncoder.Decode(data, &thriftTarget); err != nil {
4,347✔
431
                        return err
×
432
                }
×
433
                *target = *thrift.ToIsolationGroupConfig(&thriftTarget)
4,347✔
434
                return nil
4,347✔
435
        default:
×
436
                return nil
×
437
        }
438
}
439

440
// NewUnknownEncodingTypeError returns a new instance of encoding type error
441
func NewUnknownEncodingTypeError(encodingType common.EncodingType) error {
6✔
442
        return &UnknownEncodingTypeError{encodingType: encodingType}
6✔
443
}
6✔
444

445
func (e *UnknownEncodingTypeError) Error() string {
×
446
        return fmt.Sprintf("unknown or unsupported encoding type %v", e.encodingType)
×
447
}
×
448

449
// NewCadenceSerializationError returns a CadenceSerializationError
450
func NewCadenceSerializationError(msg string) *CadenceSerializationError {
×
451
        return &CadenceSerializationError{msg: msg}
×
452
}
×
453

454
func (e *CadenceSerializationError) Error() string {
×
455
        return fmt.Sprintf("cadence serialization error: %v", e.msg)
×
456
}
×
457

458
// NewCadenceDeserializationError returns a CadenceDeserializationError
459
func NewCadenceDeserializationError(msg string) *CadenceDeserializationError {
×
460
        return &CadenceDeserializationError{msg: msg}
×
461
}
×
462

463
func (e *CadenceDeserializationError) Error() string {
×
464
        return fmt.Sprintf("cadence deserialization error: %v", e.msg)
×
465
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc