• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187e35d-de80-41f6-84da-3647aed378b2

03 May 2023 09:04PM UTC coverage: 57.259% (+0.02%) from 57.24%
0187e35d-de80-41f6-84da-3647aed378b2

push

buildkite

David Porter
Merge branch 'master' into feature/zonal-partitioning

86908 of 151780 relevant lines covered (57.26%)

2492.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.86
/common/persistence/dataStoreInterfaces.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package persistence
23

24
import (
25
        "context"
26
        "fmt"
27
        "time"
28

29
        workflow "github.com/uber/cadence/.gen/go/shared"
30
        "github.com/uber/cadence/common"
31
        "github.com/uber/cadence/common/checksum"
32
        "github.com/uber/cadence/common/types"
33
)
34

35
type (
36
        //////////////////////////////////////////////////////////////////////
37
        // Persistence interface is a lower layer of dataInterface.
38
        // The intention is to let different persistence implementation(SQL,Cassandra/etc) share some common logic
39
        // Right now the only common part is serialization/deserialization, and only ExecutionManager/HistoryManager need it.
40
        // TaskManager are the same.
41
        //////////////////////////////////////////////////////////////////////
42

43
        // ShardStore is the lower level of ShardManager
44
        ShardStore interface {
45
                Closeable
46
                GetName() string
47
                CreateShard(ctx context.Context, request *InternalCreateShardRequest) error
48
                GetShard(ctx context.Context, request *InternalGetShardRequest) (*InternalGetShardResponse, error)
49
                UpdateShard(ctx context.Context, request *InternalUpdateShardRequest) error
50
        }
51

52
        // TaskStore is a lower level of TaskManager
53
        TaskStore interface {
54
                Closeable
55
                GetName() string
56
                LeaseTaskList(ctx context.Context, request *LeaseTaskListRequest) (*LeaseTaskListResponse, error)
57
                UpdateTaskList(ctx context.Context, request *UpdateTaskListRequest) (*UpdateTaskListResponse, error)
58
                ListTaskList(ctx context.Context, request *ListTaskListRequest) (*ListTaskListResponse, error)
59
                DeleteTaskList(ctx context.Context, request *DeleteTaskListRequest) error
60
                CreateTasks(ctx context.Context, request *InternalCreateTasksRequest) (*CreateTasksResponse, error)
61
                GetTasks(ctx context.Context, request *GetTasksRequest) (*InternalGetTasksResponse, error)
62
                CompleteTask(ctx context.Context, request *CompleteTaskRequest) error
63
                // CompleteTasksLessThan completes tasks less than or equal to the given task id
64
                // This API takes a limit parameter which specifies the count of maxRows that
65
                // can be deleted. This parameter may be ignored by the underlying storage, but
66
                // its mandatory to specify it. On success this method returns the number of rows
67
                // actually deleted. If the underlying storage doesn't support "limit", all rows
68
                // less than or equal to taskID will be deleted.
69
                // On success, this method returns:
70
                //  - number of rows actually deleted, if limit is honored
71
                //  - UnknownNumRowsDeleted, when all rows below value are deleted
72
                CompleteTasksLessThan(ctx context.Context, request *CompleteTasksLessThanRequest) (*CompleteTasksLessThanResponse, error)
73
                // GetOrphanTasks returns tasks that exist as records in the database but are part of task lists which
74
                // _do not_ exist in the database. They are therefore unreachable and no longer represent valid items
75
                // that can be legitimately acted upon.
76
                GetOrphanTasks(ctx context.Context, request *GetOrphanTasksRequest) (*GetOrphanTasksResponse, error)
77
        }
78

79
        // DomainStore is a lower level of DomainManager
80
        DomainStore interface {
81
                Closeable
82
                GetName() string
83
                CreateDomain(ctx context.Context, request *InternalCreateDomainRequest) (*CreateDomainResponse, error)
84
                GetDomain(ctx context.Context, request *GetDomainRequest) (*InternalGetDomainResponse, error)
85
                UpdateDomain(ctx context.Context, request *InternalUpdateDomainRequest) error
86
                DeleteDomain(ctx context.Context, request *DeleteDomainRequest) error
87
                DeleteDomainByName(ctx context.Context, request *DeleteDomainByNameRequest) error
88
                ListDomains(ctx context.Context, request *ListDomainsRequest) (*InternalListDomainsResponse, error)
89
                GetMetadata(ctx context.Context) (*GetMetadataResponse, error)
90
        }
91

92
        // ExecutionStore is used to manage workflow executions for Persistence layer
93
        ExecutionStore interface {
94
                Closeable
95
                GetName() string
96
                GetShardID() int
97
                //The below three APIs are related to serialization/deserialization
98
                GetWorkflowExecution(ctx context.Context, request *InternalGetWorkflowExecutionRequest) (*InternalGetWorkflowExecutionResponse, error)
99
                UpdateWorkflowExecution(ctx context.Context, request *InternalUpdateWorkflowExecutionRequest) error
100
                ConflictResolveWorkflowExecution(ctx context.Context, request *InternalConflictResolveWorkflowExecutionRequest) error
101

102
                CreateWorkflowExecution(ctx context.Context, request *InternalCreateWorkflowExecutionRequest) (*CreateWorkflowExecutionResponse, error)
103
                DeleteWorkflowExecution(ctx context.Context, request *DeleteWorkflowExecutionRequest) error
104
                DeleteCurrentWorkflowExecution(ctx context.Context, request *DeleteCurrentWorkflowExecutionRequest) error
105
                GetCurrentExecution(ctx context.Context, request *GetCurrentExecutionRequest) (*GetCurrentExecutionResponse, error)
106
                IsWorkflowExecutionExists(ctx context.Context, request *IsWorkflowExecutionExistsRequest) (*IsWorkflowExecutionExistsResponse, error)
107

108
                // Transfer task related methods
109
                GetTransferTasks(ctx context.Context, request *GetTransferTasksRequest) (*GetTransferTasksResponse, error)
110
                CompleteTransferTask(ctx context.Context, request *CompleteTransferTaskRequest) error
111
                RangeCompleteTransferTask(ctx context.Context, request *RangeCompleteTransferTaskRequest) (*RangeCompleteTransferTaskResponse, error)
112

113
                // Cross-cluster task related methods
114
                GetCrossClusterTasks(ctx context.Context, request *GetCrossClusterTasksRequest) (*GetCrossClusterTasksResponse, error)
115
                CompleteCrossClusterTask(ctx context.Context, request *CompleteCrossClusterTaskRequest) error
116
                RangeCompleteCrossClusterTask(ctx context.Context, request *RangeCompleteCrossClusterTaskRequest) (*RangeCompleteCrossClusterTaskResponse, error)
117

118
                // Replication task related methods
119
                GetReplicationTasks(ctx context.Context, request *GetReplicationTasksRequest) (*InternalGetReplicationTasksResponse, error)
120
                CompleteReplicationTask(ctx context.Context, request *CompleteReplicationTaskRequest) error
121
                RangeCompleteReplicationTask(ctx context.Context, request *RangeCompleteReplicationTaskRequest) (*RangeCompleteReplicationTaskResponse, error)
122
                PutReplicationTaskToDLQ(ctx context.Context, request *InternalPutReplicationTaskToDLQRequest) error
123
                GetReplicationTasksFromDLQ(ctx context.Context, request *GetReplicationTasksFromDLQRequest) (*InternalGetReplicationTasksFromDLQResponse, error)
124
                GetReplicationDLQSize(ctx context.Context, request *GetReplicationDLQSizeRequest) (*GetReplicationDLQSizeResponse, error)
125
                DeleteReplicationTaskFromDLQ(ctx context.Context, request *DeleteReplicationTaskFromDLQRequest) error
126
                RangeDeleteReplicationTaskFromDLQ(ctx context.Context, request *RangeDeleteReplicationTaskFromDLQRequest) (*RangeDeleteReplicationTaskFromDLQResponse, error)
127
                CreateFailoverMarkerTasks(ctx context.Context, request *CreateFailoverMarkersRequest) error
128

129
                // Timer related methods.
130
                GetTimerIndexTasks(ctx context.Context, request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error)
131
                CompleteTimerTask(ctx context.Context, request *CompleteTimerTaskRequest) error
132
                RangeCompleteTimerTask(ctx context.Context, request *RangeCompleteTimerTaskRequest) (*RangeCompleteTimerTaskResponse, error)
133

134
                // Scan related methods
135
                ListConcreteExecutions(ctx context.Context, request *ListConcreteExecutionsRequest) (*InternalListConcreteExecutionsResponse, error)
136
                ListCurrentExecutions(ctx context.Context, request *ListCurrentExecutionsRequest) (*ListCurrentExecutionsResponse, error)
137
        }
138

139
        // HistoryStore is to manager workflow history events
140
        HistoryStore interface {
141
                Closeable
142
                GetName() string
143

144
                // The below are history V2 APIs
145
                // V2 regards history events growing as a tree, decoupled from workflow concepts
146

147
                // AppendHistoryNodes add(or override) a node to a history branch
148
                AppendHistoryNodes(ctx context.Context, request *InternalAppendHistoryNodesRequest) error
149
                // ReadHistoryBranch returns history node data for a branch
150
                ReadHistoryBranch(ctx context.Context, request *InternalReadHistoryBranchRequest) (*InternalReadHistoryBranchResponse, error)
151
                // ForkHistoryBranch forks a new branch from a old branch
152
                ForkHistoryBranch(ctx context.Context, request *InternalForkHistoryBranchRequest) (*InternalForkHistoryBranchResponse, error)
153
                // DeleteHistoryBranch removes a branch
154
                DeleteHistoryBranch(ctx context.Context, request *InternalDeleteHistoryBranchRequest) error
155
                // GetHistoryTree returns all branch information of a tree
156
                GetHistoryTree(ctx context.Context, request *InternalGetHistoryTreeRequest) (*InternalGetHistoryTreeResponse, error)
157
                // GetAllHistoryTreeBranches returns all branches of all trees
158
                GetAllHistoryTreeBranches(ctx context.Context, request *GetAllHistoryTreeBranchesRequest) (*GetAllHistoryTreeBranchesResponse, error)
159
        }
160

161
        // VisibilityStore is the store interface for visibility
162
        VisibilityStore interface {
163
                Closeable
164
                GetName() string
165
                RecordWorkflowExecutionStarted(ctx context.Context, request *InternalRecordWorkflowExecutionStartedRequest) error
166
                RecordWorkflowExecutionClosed(ctx context.Context, request *InternalRecordWorkflowExecutionClosedRequest) error
167
                RecordWorkflowExecutionUninitialized(ctx context.Context, request *InternalRecordWorkflowExecutionUninitializedRequest) error
168
                UpsertWorkflowExecution(ctx context.Context, request *InternalUpsertWorkflowExecutionRequest) error
169
                ListOpenWorkflowExecutions(ctx context.Context, request *InternalListWorkflowExecutionsRequest) (*InternalListWorkflowExecutionsResponse, error)
170
                ListClosedWorkflowExecutions(ctx context.Context, request *InternalListWorkflowExecutionsRequest) (*InternalListWorkflowExecutionsResponse, error)
171
                ListOpenWorkflowExecutionsByType(ctx context.Context, request *InternalListWorkflowExecutionsByTypeRequest) (*InternalListWorkflowExecutionsResponse, error)
172
                ListClosedWorkflowExecutionsByType(ctx context.Context, request *InternalListWorkflowExecutionsByTypeRequest) (*InternalListWorkflowExecutionsResponse, error)
173
                ListOpenWorkflowExecutionsByWorkflowID(ctx context.Context, request *InternalListWorkflowExecutionsByWorkflowIDRequest) (*InternalListWorkflowExecutionsResponse, error)
174
                ListClosedWorkflowExecutionsByWorkflowID(ctx context.Context, request *InternalListWorkflowExecutionsByWorkflowIDRequest) (*InternalListWorkflowExecutionsResponse, error)
175
                ListClosedWorkflowExecutionsByStatus(ctx context.Context, request *InternalListClosedWorkflowExecutionsByStatusRequest) (*InternalListWorkflowExecutionsResponse, error)
176
                GetClosedWorkflowExecution(ctx context.Context, request *InternalGetClosedWorkflowExecutionRequest) (*InternalGetClosedWorkflowExecutionResponse, error)
177
                DeleteWorkflowExecution(ctx context.Context, request *VisibilityDeleteWorkflowExecutionRequest) error
178
                ListWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*InternalListWorkflowExecutionsResponse, error)
179
                ScanWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*InternalListWorkflowExecutionsResponse, error)
180
                CountWorkflowExecutions(ctx context.Context, request *CountWorkflowExecutionsRequest) (*CountWorkflowExecutionsResponse, error)
181
                DeleteUninitializedWorkflowExecution(ctx context.Context, request *VisibilityDeleteWorkflowExecutionRequest) error
182
        }
183

184
        ConfigStore interface {
185
                Closeable
186
                FetchConfig(ctx context.Context, configType ConfigType) (*InternalConfigStoreEntry, error)
187
                UpdateConfig(ctx context.Context, value *InternalConfigStoreEntry) error
188
        }
189

190
        InternalConfigStoreEntry struct {
191
                RowType   int
192
                Version   int64
193
                Timestamp time.Time
194
                Values    *DataBlob
195
        }
196

197
        // Queue is a store to enqueue and get messages
198
        Queue interface {
199
                Closeable
200
                EnqueueMessage(ctx context.Context, messagePayload []byte) error
201
                ReadMessages(ctx context.Context, lastMessageID int64, maxCount int) ([]*InternalQueueMessage, error)
202
                DeleteMessagesBefore(ctx context.Context, messageID int64) error
203
                UpdateAckLevel(ctx context.Context, messageID int64, clusterName string) error
204
                GetAckLevels(ctx context.Context) (map[string]int64, error)
205
                EnqueueMessageToDLQ(ctx context.Context, messagePayload []byte) error
206
                ReadMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64, pageSize int, pageToken []byte) ([]*InternalQueueMessage, []byte, error)
207
                DeleteMessageFromDLQ(ctx context.Context, messageID int64) error
208
                RangeDeleteMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64) error
209
                UpdateDLQAckLevel(ctx context.Context, messageID int64, clusterName string) error
210
                GetDLQAckLevels(ctx context.Context) (map[string]int64, error)
211
                GetDLQSize(ctx context.Context) (int64, error)
212
        }
213

214
        // InternalQueueMessage is the message that stores in the queue
215
        InternalQueueMessage struct {
216
                ID        int64     `json:"message_id"`
217
                QueueType QueueType `json:"queue_type"`
218
                Payload   []byte    `json:"message_payload"`
219
        }
220

221
        // DataBlob represents a blob for any binary data.
222
        // It contains raw data, and metadata(right now only encoding) in other field
223
        // Note that it should be only used for Persistence layer, below dataInterface and application(historyEngine/etc)
224
        DataBlob struct {
225
                Encoding common.EncodingType
226
                Data     []byte
227
        }
228

229
        // InternalCreateWorkflowExecutionRequest is used to write a new workflow execution
230
        InternalCreateWorkflowExecutionRequest struct {
231
                RangeID int64
232

233
                Mode CreateWorkflowMode
234

235
                PreviousRunID            string
236
                PreviousLastWriteVersion int64
237

238
                NewWorkflowSnapshot InternalWorkflowSnapshot
239
        }
240

241
        // InternalGetReplicationTasksResponse is the response to GetReplicationTask
242
        InternalGetReplicationTasksResponse struct {
243
                Tasks         []*InternalReplicationTaskInfo
244
                NextPageToken []byte
245
        }
246

247
        // InternalPutReplicationTaskToDLQRequest is used to put a replication task to dlq
248
        InternalPutReplicationTaskToDLQRequest struct {
249
                SourceClusterName string
250
                TaskInfo          *InternalReplicationTaskInfo
251
        }
252

253
        // InternalGetReplicationTasksFromDLQResponse is the response for GetReplicationTasksFromDLQ
254
        InternalGetReplicationTasksFromDLQResponse = InternalGetReplicationTasksResponse
255

256
        // InternalReplicationTaskInfo describes the replication task created for replication of history events
257
        InternalReplicationTaskInfo struct {
258
                DomainID          string
259
                WorkflowID        string
260
                RunID             string
261
                TaskID            int64
262
                TaskType          int
263
                FirstEventID      int64
264
                NextEventID       int64
265
                Version           int64
266
                ScheduledID       int64
267
                BranchToken       []byte
268
                NewRunBranchToken []byte
269
                CreationTime      time.Time
270
        }
271

272
        // InternalWorkflowExecutionInfo describes a workflow execution for Persistence Interface
273
        InternalWorkflowExecutionInfo struct {
274
                DomainID                           string
275
                WorkflowID                         string
276
                RunID                              string
277
                FirstExecutionRunID                string
278
                ParentDomainID                     string
279
                ParentWorkflowID                   string
280
                ParentRunID                        string
281
                InitiatedID                        int64
282
                CompletionEventBatchID             int64
283
                CompletionEvent                    *DataBlob
284
                TaskList                           string
285
                WorkflowTypeName                   string
286
                WorkflowTimeout                    time.Duration
287
                DecisionStartToCloseTimeout        time.Duration
288
                ExecutionContext                   []byte
289
                State                              int
290
                CloseStatus                        int
291
                LastFirstEventID                   int64
292
                LastEventTaskID                    int64
293
                NextEventID                        int64
294
                LastProcessedEvent                 int64
295
                StartTimestamp                     time.Time
296
                LastUpdatedTimestamp               time.Time
297
                CreateRequestID                    string
298
                SignalCount                        int32
299
                DecisionVersion                    int64
300
                DecisionScheduleID                 int64
301
                DecisionStartedID                  int64
302
                DecisionRequestID                  string
303
                DecisionTimeout                    time.Duration
304
                DecisionAttempt                    int64
305
                DecisionStartedTimestamp           time.Time
306
                DecisionScheduledTimestamp         time.Time
307
                DecisionOriginalScheduledTimestamp time.Time
308
                CancelRequested                    bool
309
                CancelRequestID                    string
310
                StickyTaskList                     string
311
                StickyScheduleToStartTimeout       time.Duration
312
                ClientLibraryVersion               string
313
                ClientFeatureVersion               string
314
                ClientImpl                         string
315
                AutoResetPoints                    *DataBlob
316
                // for retry
317
                Attempt            int32
318
                HasRetryPolicy     bool
319
                InitialInterval    time.Duration
320
                BackoffCoefficient float64
321
                MaximumInterval    time.Duration
322
                ExpirationTime     time.Time
323
                MaximumAttempts    int32
324
                NonRetriableErrors []string
325
                BranchToken        []byte
326
                CronSchedule       string
327
                ExpirationInterval time.Duration
328
                Memo               map[string][]byte
329
                SearchAttributes   map[string][]byte
330
                PartitionConfig    map[string]string
331

332
                // attributes which are not related to mutable state at all
333
                HistorySize int64
334
        }
335

336
        // InternalWorkflowMutableState indicates workflow related state for Persistence Interface
337
        InternalWorkflowMutableState struct {
338
                ExecutionInfo    *InternalWorkflowExecutionInfo
339
                VersionHistories *DataBlob
340
                ReplicationState *ReplicationState // TODO: remove this after all 2DC workflows complete
341
                ActivityInfos    map[int64]*InternalActivityInfo
342

343
                TimerInfos          map[string]*TimerInfo
344
                ChildExecutionInfos map[int64]*InternalChildExecutionInfo
345
                RequestCancelInfos  map[int64]*RequestCancelInfo
346
                SignalInfos         map[int64]*SignalInfo
347
                SignalRequestedIDs  map[string]struct{}
348
                BufferedEvents      []*DataBlob
349

350
                Checksum checksum.Checksum
351
        }
352

353
        // InternalActivityInfo details  for Persistence Interface
354
        InternalActivityInfo struct {
355
                Version                  int64
356
                ScheduleID               int64
357
                ScheduledEventBatchID    int64
358
                ScheduledEvent           *DataBlob
359
                ScheduledTime            time.Time
360
                StartedID                int64
361
                StartedEvent             *DataBlob
362
                StartedTime              time.Time
363
                ActivityID               string
364
                RequestID                string
365
                Details                  []byte
366
                ScheduleToStartTimeout   time.Duration
367
                ScheduleToCloseTimeout   time.Duration
368
                StartToCloseTimeout      time.Duration
369
                HeartbeatTimeout         time.Duration
370
                CancelRequested          bool
371
                CancelRequestID          int64
372
                LastHeartBeatUpdatedTime time.Time
373
                TimerTaskStatus          int32
374
                // For retry
375
                Attempt            int32
376
                DomainID           string
377
                StartedIdentity    string
378
                TaskList           string
379
                HasRetryPolicy     bool
380
                InitialInterval    time.Duration
381
                BackoffCoefficient float64
382
                MaximumInterval    time.Duration
383
                ExpirationTime     time.Time
384
                MaximumAttempts    int32
385
                NonRetriableErrors []string
386
                LastFailureReason  string
387
                LastWorkerIdentity string
388
                LastFailureDetails []byte
389
                // Not written to database - This is used only for deduping heartbeat timer creation
390
                LastHeartbeatTimeoutVisibilityInSeconds int64
391
        }
392

393
        // InternalChildExecutionInfo has details for pending child executions for Persistence Interface
394
        InternalChildExecutionInfo struct {
395
                Version               int64
396
                InitiatedID           int64
397
                InitiatedEventBatchID int64
398
                InitiatedEvent        *DataBlob
399
                StartedID             int64
400
                StartedWorkflowID     string
401
                StartedRunID          string
402
                StartedEvent          *DataBlob
403
                CreateRequestID       string
404
                DomainID              string
405
                DomainNameDEPRECATED  string // deprecated: use DomainID field
406
                WorkflowTypeName      string
407
                ParentClosePolicy     types.ParentClosePolicy
408
        }
409

410
        // InternalUpdateWorkflowExecutionRequest is used to update a workflow execution for Persistence Interface
411
        InternalUpdateWorkflowExecutionRequest struct {
412
                RangeID int64
413

414
                Mode UpdateWorkflowMode
415

416
                UpdateWorkflowMutation InternalWorkflowMutation
417

418
                NewWorkflowSnapshot *InternalWorkflowSnapshot
419
        }
420

421
        // InternalConflictResolveWorkflowExecutionRequest is used to reset workflow execution state for Persistence Interface
422
        InternalConflictResolveWorkflowExecutionRequest struct {
423
                RangeID int64
424

425
                Mode ConflictResolveWorkflowMode
426

427
                // workflow to be resetted
428
                ResetWorkflowSnapshot InternalWorkflowSnapshot
429

430
                // maybe new workflow
431
                NewWorkflowSnapshot *InternalWorkflowSnapshot
432

433
                // current workflow
434
                CurrentWorkflowMutation *InternalWorkflowMutation
435
        }
436

437
        // InternalWorkflowMutation is used as generic workflow execution state mutation for Persistence Interface
438
        InternalWorkflowMutation struct {
439
                ExecutionInfo    *InternalWorkflowExecutionInfo
440
                VersionHistories *DataBlob
441
                StartVersion     int64
442
                LastWriteVersion int64
443

444
                UpsertActivityInfos       []*InternalActivityInfo
445
                DeleteActivityInfos       []int64
446
                UpsertTimerInfos          []*TimerInfo
447
                DeleteTimerInfos          []string
448
                UpsertChildExecutionInfos []*InternalChildExecutionInfo
449
                DeleteChildExecutionInfos []int64
450
                UpsertRequestCancelInfos  []*RequestCancelInfo
451
                DeleteRequestCancelInfos  []int64
452
                UpsertSignalInfos         []*SignalInfo
453
                DeleteSignalInfos         []int64
454
                UpsertSignalRequestedIDs  []string
455
                DeleteSignalRequestedIDs  []string
456
                NewBufferedEvents         *DataBlob
457
                ClearBufferedEvents       bool
458

459
                TransferTasks     []Task
460
                CrossClusterTasks []Task
461
                TimerTasks        []Task
462
                ReplicationTasks  []Task
463

464
                Condition int64
465

466
                Checksum checksum.Checksum
467
        }
468

469
        // InternalWorkflowSnapshot is used as generic workflow execution state snapshot for Persistence Interface
470
        InternalWorkflowSnapshot struct {
471
                ExecutionInfo    *InternalWorkflowExecutionInfo
472
                VersionHistories *DataBlob
473
                StartVersion     int64
474
                LastWriteVersion int64
475

476
                ActivityInfos       []*InternalActivityInfo
477
                TimerInfos          []*TimerInfo
478
                ChildExecutionInfos []*InternalChildExecutionInfo
479
                RequestCancelInfos  []*RequestCancelInfo
480
                SignalInfos         []*SignalInfo
481
                SignalRequestedIDs  []string
482

483
                TransferTasks     []Task
484
                CrossClusterTasks []Task
485
                TimerTasks        []Task
486
                ReplicationTasks  []Task
487

488
                Condition int64
489

490
                Checksum checksum.Checksum
491
        }
492

493
        // InternalAppendHistoryEventsRequest is used to append new events to workflow execution history  for Persistence Interface
494
        InternalAppendHistoryEventsRequest struct {
495
                DomainID          string
496
                Execution         workflow.WorkflowExecution
497
                FirstEventID      int64
498
                EventBatchVersion int64
499
                RangeID           int64
500
                TransactionID     int64
501
                Events            *DataBlob
502
                Overwrite         bool
503
        }
504

505
        // InternalAppendHistoryNodesRequest is used to append a batch of history nodes
506
        InternalAppendHistoryNodesRequest struct {
507
                // True if it is the first append request to the branch
508
                IsNewBranch bool
509
                // The info for clean up data in background
510
                Info string
511
                // The branch to be appended
512
                BranchInfo types.HistoryBranch
513
                // The first eventID becomes the nodeID to be appended
514
                NodeID int64
515
                // The events to be appended
516
                Events *DataBlob
517
                // Requested TransactionID for conditional update
518
                TransactionID int64
519
                // Used in sharded data stores to identify which shard to use
520
                ShardID int
521
        }
522

523
        // InternalGetWorkflowExecutionRequest is used to retrieve the info of a workflow execution
524
        InternalGetWorkflowExecutionRequest struct {
525
                DomainID  string
526
                Execution types.WorkflowExecution
527
        }
528

529
        // InternalGetWorkflowExecutionResponse is the response to GetWorkflowExecution for Persistence Interface
530
        InternalGetWorkflowExecutionResponse struct {
531
                State *InternalWorkflowMutableState
532
        }
533

534
        // InternalListConcreteExecutionsResponse is the response to ListConcreteExecutions for Persistence Interface
535
        InternalListConcreteExecutionsResponse struct {
536
                Executions    []*InternalListConcreteExecutionsEntity
537
                NextPageToken []byte
538
        }
539

540
        // InternalListConcreteExecutionsEntity is a single entity in InternalListConcreteExecutionsResponse
541
        InternalListConcreteExecutionsEntity struct {
542
                ExecutionInfo    *InternalWorkflowExecutionInfo
543
                VersionHistories *DataBlob
544
        }
545

546
        // InternalForkHistoryBranchRequest is used to fork a history branch
547
        InternalForkHistoryBranchRequest struct {
548
                // The base branch to fork from
549
                ForkBranchInfo types.HistoryBranch
550
                // The nodeID to fork from, the new branch will start from ( inclusive ), the base branch will stop at(exclusive)
551
                ForkNodeID int64
552
                // branchID of the new branch
553
                NewBranchID string
554
                // the info for clean up data in background
555
                Info string
556
                // Used in sharded data stores to identify which shard to use
557
                ShardID int
558
        }
559

560
        // InternalForkHistoryBranchResponse is the response to ForkHistoryBranchRequest
561
        InternalForkHistoryBranchResponse struct {
562
                // branchInfo to represent the new branch
563
                NewBranchInfo types.HistoryBranch
564
        }
565

566
        // InternalDeleteHistoryBranchRequest is used to remove a history branch
567
        InternalDeleteHistoryBranchRequest struct {
568
                // branch to be deleted
569
                BranchInfo types.HistoryBranch
570
                // Used in sharded data stores to identify which shard to use
571
                ShardID int
572
        }
573

574
        // InternalReadHistoryBranchRequest is used to read a history branch
575
        InternalReadHistoryBranchRequest struct {
576
                // The tree of branch range to be read
577
                TreeID string
578
                // The branch range to be read
579
                BranchID string
580
                // Get the history nodes from MinNodeID. Inclusive.
581
                MinNodeID int64
582
                // Get the history nodes upto MaxNodeID.  Exclusive.
583
                MaxNodeID int64
584
                // passing thru for pagination
585
                PageSize int
586
                // Pagination token
587
                NextPageToken []byte
588
                // LastNodeID is the last known node ID attached to a history node
589
                LastNodeID int64
590
                // LastTransactionID is the last known transaction ID attached to a history node
591
                LastTransactionID int64
592
                // Used in sharded data stores to identify which shard to use
593
                ShardID int
594
        }
595

596
        // InternalCompleteForkBranchRequest is used to update some tree/branch meta data for forking
597
        InternalCompleteForkBranchRequest struct {
598
                // branch to be updated
599
                BranchInfo workflow.HistoryBranch
600
                // whether fork is successful
601
                Success bool
602
                // Used in sharded data stores to identify which shard to use
603
                ShardID int
604
        }
605

606
        // InternalReadHistoryBranchResponse is the response to ReadHistoryBranchRequest
607
        InternalReadHistoryBranchResponse struct {
608
                // History events
609
                History []*DataBlob
610
                // Pagination token
611
                NextPageToken []byte
612
                // LastNodeID is the last known node ID attached to a history node
613
                LastNodeID int64
614
                // LastTransactionID is the last known transaction ID attached to a history node
615
                LastTransactionID int64
616
        }
617

618
        // InternalGetHistoryTreeRequest is used to get history tree
619
        InternalGetHistoryTreeRequest struct {
620
                // A UUID of a tree
621
                TreeID string
622
                // Get data from this shard
623
                ShardID *int
624
                // optional: can provide treeID via branchToken if treeID is empty
625
                BranchToken []byte
626
        }
627

628
        // InternalGetHistoryTreeResponse is the response to GetHistoryTree
629
        InternalGetHistoryTreeResponse struct {
630
                // all branches of a tree
631
                Branches []*types.HistoryBranch
632
        }
633

634
        // InternalVisibilityWorkflowExecutionInfo is visibility info for internal response
635
        InternalVisibilityWorkflowExecutionInfo struct {
636
                DomainID         string
637
                WorkflowType     string
638
                WorkflowID       string
639
                RunID            string
640
                TypeName         string
641
                StartTime        time.Time
642
                ExecutionTime    time.Time
643
                CloseTime        time.Time
644
                Status           *types.WorkflowExecutionCloseStatus
645
                HistoryLength    int64
646
                Memo             *DataBlob
647
                TaskList         string
648
                IsCron           bool
649
                NumClusters      int16
650
                UpdateTime       time.Time
651
                SearchAttributes map[string]interface{}
652
                ShardID          int16
653
        }
654

655
        // InternalListWorkflowExecutionsResponse is response from ListWorkflowExecutions
656
        InternalListWorkflowExecutionsResponse struct {
657
                Executions []*InternalVisibilityWorkflowExecutionInfo
658
                // Token to read next page if there are more workflow executions beyond page size.
659
                // Use this to set NextPageToken on ListWorkflowExecutionsRequest to read the next page.
660
                NextPageToken []byte
661
        }
662

663
        // InternalGetClosedWorkflowExecutionRequest is used retrieve the record for a specific execution
664
        InternalGetClosedWorkflowExecutionRequest struct {
665
                DomainUUID string
666
                Domain     string // domain name is not persisted, but used as config filter key
667
                Execution  types.WorkflowExecution
668
        }
669

670
        // InternalListClosedWorkflowExecutionsByStatusRequest is used to list executions that have specific close status
671
        InternalListClosedWorkflowExecutionsByStatusRequest struct {
672
                InternalListWorkflowExecutionsRequest
673
                Status types.WorkflowExecutionCloseStatus
674
        }
675

676
        // InternalListWorkflowExecutionsByWorkflowIDRequest is used to list executions that have specific WorkflowID in a domain
677
        InternalListWorkflowExecutionsByWorkflowIDRequest struct {
678
                InternalListWorkflowExecutionsRequest
679
                WorkflowID string
680
        }
681

682
        // InternalListWorkflowExecutionsByTypeRequest is used to list executions of a specific type in a domain
683
        InternalListWorkflowExecutionsByTypeRequest struct {
684
                InternalListWorkflowExecutionsRequest
685
                WorkflowTypeName string
686
        }
687

688
        // InternalGetClosedWorkflowExecutionResponse is response from GetWorkflowExecution
689
        InternalGetClosedWorkflowExecutionResponse struct {
690
                Execution *InternalVisibilityWorkflowExecutionInfo
691
        }
692

693
        // InternalRecordWorkflowExecutionStartedRequest request to RecordWorkflowExecutionStarted
694
        InternalRecordWorkflowExecutionStartedRequest struct {
695
                DomainUUID         string
696
                WorkflowID         string
697
                RunID              string
698
                WorkflowTypeName   string
699
                StartTimestamp     time.Time
700
                ExecutionTimestamp time.Time
701
                WorkflowTimeout    time.Duration
702
                TaskID             int64
703
                Memo               *DataBlob
704
                TaskList           string
705
                IsCron             bool
706
                NumClusters        int16
707
                UpdateTimestamp    time.Time
708
                SearchAttributes   map[string][]byte
709
                ShardID            int16
710
        }
711

712
        // InternalRecordWorkflowExecutionClosedRequest is request to RecordWorkflowExecutionClosed
713
        InternalRecordWorkflowExecutionClosedRequest struct {
714
                DomainUUID         string
715
                WorkflowID         string
716
                RunID              string
717
                WorkflowTypeName   string
718
                StartTimestamp     time.Time
719
                ExecutionTimestamp time.Time
720
                TaskID             int64
721
                Memo               *DataBlob
722
                TaskList           string
723
                SearchAttributes   map[string][]byte
724
                CloseTimestamp     time.Time
725
                Status             types.WorkflowExecutionCloseStatus
726
                HistoryLength      int64
727
                RetentionPeriod    time.Duration
728
                IsCron             bool
729
                NumClusters        int16
730
                UpdateTimestamp    time.Time
731
                ShardID            int16
732
        }
733

734
        // InternalRecordWorkflowExecutionUninitializedRequest is used to add a record of a newly uninitialized execution
735
        InternalRecordWorkflowExecutionUninitializedRequest struct {
736
                DomainUUID       string
737
                WorkflowID       string
738
                RunID            string
739
                WorkflowTypeName string
740
                UpdateTimestamp  time.Time
741
                ShardID          int64
742
        }
743

744
        // InternalUpsertWorkflowExecutionRequest is request to UpsertWorkflowExecution
745
        InternalUpsertWorkflowExecutionRequest struct {
746
                DomainUUID         string
747
                WorkflowID         string
748
                RunID              string
749
                WorkflowTypeName   string
750
                StartTimestamp     time.Time
751
                ExecutionTimestamp time.Time
752
                WorkflowTimeout    time.Duration
753
                TaskID             int64
754
                Memo               *DataBlob
755
                TaskList           string
756
                IsCron             bool
757
                NumClusters        int16
758
                UpdateTimestamp    time.Time
759
                SearchAttributes   map[string][]byte
760
                ShardID            int64
761
        }
762

763
        // InternalListWorkflowExecutionsRequest is used to list executions in a domain
764
        InternalListWorkflowExecutionsRequest struct {
765
                DomainUUID string
766
                Domain     string // domain name is not persisted, but used as config filter key
767
                // The earliest end of the time range
768
                EarliestTime time.Time
769
                // The latest end of the time range
770
                LatestTime time.Time
771
                // Maximum number of workflow executions per page
772
                PageSize int
773
                // Token to continue reading next page of workflow executions.
774
                // Pass in empty slice for first page.
775
                NextPageToken []byte
776
        }
777

778
        // InternalDomainConfig describes the domain configuration
779
        InternalDomainConfig struct {
780
                Retention                time.Duration
781
                EmitMetric               bool                 // deprecated
782
                ArchivalBucket           string               // deprecated
783
                ArchivalStatus           types.ArchivalStatus // deprecated
784
                HistoryArchivalStatus    types.ArchivalStatus
785
                HistoryArchivalURI       string
786
                VisibilityArchivalStatus types.ArchivalStatus
787
                VisibilityArchivalURI    string
788
                BadBinaries              *DataBlob
789
                IsolationGroups          *DataBlob
790
        }
791

792
        // InternalCreateDomainRequest is used to create the domain
793
        InternalCreateDomainRequest struct {
794
                Info              *DomainInfo
795
                Config            *InternalDomainConfig
796
                ReplicationConfig *DomainReplicationConfig
797
                IsGlobalDomain    bool
798
                ConfigVersion     int64
799
                FailoverVersion   int64
800
                LastUpdatedTime   time.Time
801
        }
802

803
        // InternalGetDomainResponse is the response for GetDomain
804
        InternalGetDomainResponse struct {
805
                Info                        *DomainInfo
806
                Config                      *InternalDomainConfig
807
                ReplicationConfig           *DomainReplicationConfig
808
                IsGlobalDomain              bool
809
                ConfigVersion               int64
810
                FailoverVersion             int64
811
                FailoverNotificationVersion int64
812
                PreviousFailoverVersion     int64
813
                FailoverEndTime             *time.Time
814
                LastUpdatedTime             time.Time
815
                NotificationVersion         int64
816
        }
817

818
        // InternalUpdateDomainRequest is used to update domain
819
        InternalUpdateDomainRequest struct {
820
                Info                        *DomainInfo
821
                Config                      *InternalDomainConfig
822
                ReplicationConfig           *DomainReplicationConfig
823
                ConfigVersion               int64
824
                FailoverVersion             int64
825
                FailoverNotificationVersion int64
826
                PreviousFailoverVersion     int64
827
                FailoverEndTime             *time.Time
828
                LastUpdatedTime             time.Time
829
                NotificationVersion         int64
830
        }
831

832
        // InternalListDomainsResponse is the response for GetDomain
833
        InternalListDomainsResponse struct {
834
                Domains       []*InternalGetDomainResponse
835
                NextPageToken []byte
836
        }
837

838
        // InternalShardInfo describes a shard
839
        InternalShardInfo struct {
840
                ShardID                           int                  `json:"shard_id"`
841
                Owner                             string               `json:"owner"`
842
                RangeID                           int64                `json:"range_id"`
843
                StolenSinceRenew                  int                  `json:"stolen_since_renew"`
844
                UpdatedAt                         time.Time            `json:"updated_at"`
845
                ReplicationAckLevel               int64                `json:"replication_ack_level"`
846
                ReplicationDLQAckLevel            map[string]int64     `json:"replication_dlq_ack_level"`
847
                TransferAckLevel                  int64                `json:"transfer_ack_level"`
848
                TimerAckLevel                     time.Time            `json:"timer_ack_level"`
849
                ClusterTransferAckLevel           map[string]int64     `json:"cluster_transfer_ack_level"`
850
                ClusterTimerAckLevel              map[string]time.Time `json:"cluster_timer_ack_level"`
851
                TransferProcessingQueueStates     *DataBlob            `json:"transfer_processing_queue_states"`
852
                CrossClusterProcessingQueueStates *DataBlob            `json:"cross_cluster_processing_queue_states"`
853
                TimerProcessingQueueStates        *DataBlob            `json:"timer_processing_queue_states"`
854
                ClusterReplicationLevel           map[string]int64     `json:"cluster_replication_level"`
855
                DomainNotificationVersion         int64                `json:"domain_notification_version"`
856
                PendingFailoverMarkers            *DataBlob            `json:"pending_failover_markers"`
857
        }
858

859
        // InternalCreateShardRequest is request to CreateShard
860
        InternalCreateShardRequest struct {
861
                ShardInfo *InternalShardInfo
862
        }
863

864
        // InternalGetShardRequest is used to get shard information
865
        InternalGetShardRequest struct {
866
                ShardID int
867
        }
868

869
        // InternalUpdateShardRequest  is used to update shard information
870
        InternalUpdateShardRequest struct {
871
                ShardInfo       *InternalShardInfo
872
                PreviousRangeID int64
873
        }
874

875
        // InternalGetShardResponse is the response to GetShard
876
        InternalGetShardResponse struct {
877
                ShardInfo *InternalShardInfo
878
        }
879

880
        // InternalTaskInfo describes a Task
881
        InternalTaskInfo struct {
882
                DomainID               string
883
                WorkflowID             string
884
                RunID                  string
885
                TaskID                 int64
886
                ScheduleID             int64
887
                ScheduleToStartTimeout time.Duration
888
                Expiry                 time.Time
889
                CreatedTime            time.Time
890
                PartitionConfig        map[string]string
891
        }
892

893
        // InternalCreateTasksInfo describes a task to be created in InternalCreateTasksRequest
894
        InternalCreateTasksInfo struct {
895
                Execution types.WorkflowExecution
896
                Data      *InternalTaskInfo
897
                TaskID    int64
898
        }
899

900
        // InternalCreateTasksRequest is request to CreateTasks
901
        InternalCreateTasksRequest struct {
902
                TaskListInfo *TaskListInfo
903
                Tasks        []*InternalCreateTasksInfo
904
        }
905

906
        // InternalGetTasksResponse is response from GetTasks
907
        InternalGetTasksResponse struct {
908
                Tasks []*InternalTaskInfo
909
        }
910
)
911

912
// NewDataBlob returns a new DataBlob
913
func NewDataBlob(data []byte, encodingType common.EncodingType) *DataBlob {
26,829✔
914
        if len(data) == 0 {
28,359✔
915
                return nil
1,530✔
916
        }
1,530✔
917
        if encodingType != "thriftrw" && data[0] == 'Y' {
25,302✔
918
                panic(fmt.Sprintf("Invalid incoding: \"%v\"", encodingType))
×
919
        }
920
        return &DataBlob{
25,302✔
921
                Data:     data,
25,302✔
922
                Encoding: encodingType,
25,302✔
923
        }
25,302✔
924
}
925

926
// FromDataBlob decodes a datablob into a (payload, encodingType) tuple
927
func FromDataBlob(blob *DataBlob) ([]byte, string) {
2,811✔
928
        if blob == nil || len(blob.Data) == 0 {
5,518✔
929
                return nil, ""
2,707✔
930
        }
2,707✔
931
        return blob.Data, string(blob.Encoding)
105✔
932
}
933

934
// Convert a *Datablob to safe that calling its method won't run into NPE
935
func (d *DataBlob) ToNilSafeDataBlob() *DataBlob {
5,943✔
936
        if d != nil {
9,332✔
937
                return d
3,389✔
938
        }
3,389✔
939
        return &DataBlob{}
2,555✔
940
}
941

942
func (d *DataBlob) GetEncodingString() string {
4,106✔
943
        return string(d.Encoding)
4,106✔
944
}
4,106✔
945

946
// GetData is a safe way to get the byte array or nil
947
func (d *DataBlob) GetData() []byte {
66✔
948
        if d == nil || d.Data == nil {
68✔
949
                return []byte{}
2✔
950
        }
2✔
951
        return d.Data
64✔
952
}
953

954
// GetEncoding returns encoding type
955
func (d *DataBlob) GetEncoding() common.EncodingType {
30,419✔
956
        encodingStr := string(d.Encoding)
30,419✔
957

30,419✔
958
        switch common.EncodingType(encodingStr) {
30,419✔
959
        case common.EncodingTypeGob:
×
960
                return common.EncodingTypeGob
×
961
        case common.EncodingTypeJSON:
17✔
962
                return common.EncodingTypeJSON
17✔
963
        case common.EncodingTypeThriftRW:
29,001✔
964
                return common.EncodingTypeThriftRW
29,001✔
965
        case common.EncodingTypeEmpty:
1,404✔
966
                return common.EncodingTypeEmpty
1,404✔
967
        default:
×
968
                return common.EncodingTypeUnknown
×
969
        }
970
}
971

972
// ToInternal convert data blob to internal representation
973
func (d *DataBlob) ToInternal() *types.DataBlob {
3✔
974
        switch d.Encoding {
3✔
975
        case common.EncodingTypeJSON:
×
976
                return &types.DataBlob{
×
977
                        EncodingType: types.EncodingTypeJSON.Ptr(),
×
978
                        Data:         d.Data,
×
979
                }
×
980
        case common.EncodingTypeThriftRW:
3✔
981
                return &types.DataBlob{
3✔
982
                        EncodingType: types.EncodingTypeThriftRW.Ptr(),
3✔
983
                        Data:         d.Data,
3✔
984
                }
3✔
985
        default:
×
986
                panic(fmt.Sprintf("DataBlob seeing unsupported enconding type: %v", d.Encoding))
×
987
        }
988
}
989

990
// NewDataBlobFromInternal convert data blob from internal representation
991
func NewDataBlobFromInternal(blob *types.DataBlob) *DataBlob {
3✔
992
        switch blob.GetEncodingType() {
3✔
993
        case types.EncodingTypeJSON:
×
994
                return &DataBlob{
×
995
                        Encoding: common.EncodingTypeJSON,
×
996
                        Data:     blob.Data,
×
997
                }
×
998
        case types.EncodingTypeThriftRW:
3✔
999
                return &DataBlob{
3✔
1000
                        Encoding: common.EncodingTypeThriftRW,
3✔
1001
                        Data:     blob.Data,
3✔
1002
                }
3✔
1003
        default:
×
1004
                panic(fmt.Sprintf("NewDataBlobFromInternal seeing unsupported enconding type: %v", blob.GetEncodingType()))
×
1005
        }
1006
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc