• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0188dfdb-d61a-4522-b858-d93ddfebefc8

21 Jun 2023 09:47PM UTC coverage: 57.266% (-0.03%) from 57.299%
0188dfdb-d61a-4522-b858-d93ddfebefc8

push

buildkite

web-flow
Update consistency level for cassandra visibility (#5330)

87074 of 152053 relevant lines covered (57.27%)

2464.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.94
/common/persistence/executionManager.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package persistence
23

24
import (
25
        "context"
26
        "time"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/log"
30
        "github.com/uber/cadence/common/types"
31
)
32

33
type (
34
        // executionManagerImpl implements ExecutionManager based on ExecutionStore, statsComputer and PayloadSerializer
35
        executionManagerImpl struct {
36
                serializer    PayloadSerializer
37
                persistence   ExecutionStore
38
                statsComputer statsComputer
39
                logger        log.Logger
40
        }
41
)
42

43
var _ ExecutionManager = (*executionManagerImpl)(nil)
44

45
// NewExecutionManagerImpl returns new ExecutionManager
46
func NewExecutionManagerImpl(
47
        persistence ExecutionStore,
48
        logger log.Logger,
49
) ExecutionManager {
63✔
50

63✔
51
        return &executionManagerImpl{
63✔
52
                serializer:    NewPayloadSerializer(),
63✔
53
                persistence:   persistence,
63✔
54
                statsComputer: statsComputer{},
63✔
55
                logger:        logger,
63✔
56
        }
63✔
57
}
63✔
58

59
func (m *executionManagerImpl) GetName() string {
×
60
        return m.persistence.GetName()
×
61
}
×
62

63
func (m *executionManagerImpl) GetShardID() int {
11,586✔
64
        return m.persistence.GetShardID()
11,586✔
65
}
11,586✔
66

67
// The below three APIs are related to serialization/deserialization
68
func (m *executionManagerImpl) GetWorkflowExecution(
69
        ctx context.Context,
70
        request *GetWorkflowExecutionRequest,
71
) (*GetWorkflowExecutionResponse, error) {
1,144✔
72

1,144✔
73
        internalRequest := &InternalGetWorkflowExecutionRequest{
1,144✔
74
                DomainID:  request.DomainID,
1,144✔
75
                Execution: request.Execution,
1,144✔
76
        }
1,144✔
77
        response, err := m.persistence.GetWorkflowExecution(ctx, internalRequest)
1,144✔
78
        if err != nil {
1,549✔
79
                return nil, err
405✔
80
        }
405✔
81
        newResponse := &GetWorkflowExecutionResponse{
742✔
82
                State: &WorkflowMutableState{
742✔
83
                        TimerInfos:         response.State.TimerInfos,
742✔
84
                        RequestCancelInfos: response.State.RequestCancelInfos,
742✔
85
                        SignalInfos:        response.State.SignalInfos,
742✔
86
                        SignalRequestedIDs: response.State.SignalRequestedIDs,
742✔
87
                        ReplicationState:   response.State.ReplicationState, // TODO: remove this after all 2DC workflows complete
742✔
88
                        Checksum:           response.State.Checksum,
742✔
89
                },
742✔
90
        }
742✔
91

742✔
92
        newResponse.State.ActivityInfos, err = m.DeserializeActivityInfos(response.State.ActivityInfos)
742✔
93
        if err != nil {
742✔
94
                return nil, err
×
95
        }
×
96
        newResponse.State.ChildExecutionInfos, err = m.DeserializeChildExecutionInfos(response.State.ChildExecutionInfos)
742✔
97
        if err != nil {
742✔
98
                return nil, err
×
99
        }
×
100
        newResponse.State.BufferedEvents, err = m.DeserializeBufferedEvents(response.State.BufferedEvents)
742✔
101
        if err != nil {
742✔
102
                return nil, err
×
103
        }
×
104
        newResponse.State.ExecutionInfo, newResponse.State.ExecutionStats, err = m.DeserializeExecutionInfo(response.State.ExecutionInfo)
742✔
105
        if err != nil {
742✔
106
                return nil, err
×
107
        }
×
108
        versionHistories, err := m.DeserializeVersionHistories(response.State.VersionHistories)
742✔
109
        if err != nil {
742✔
110
                return nil, err
×
111
        }
×
112
        newResponse.State.VersionHistories = versionHistories
742✔
113
        newResponse.MutableStateStats = m.statsComputer.computeMutableStateStats(response)
742✔
114

742✔
115
        return newResponse, nil
742✔
116
}
117

118
func (m *executionManagerImpl) DeserializeExecutionInfo(
119
        info *InternalWorkflowExecutionInfo,
120
) (*WorkflowExecutionInfo, *ExecutionStats, error) {
742✔
121

742✔
122
        completionEvent, err := m.serializer.DeserializeEvent(info.CompletionEvent)
742✔
123
        if err != nil {
742✔
124
                return nil, nil, err
×
125
        }
×
126

127
        autoResetPoints, err := m.serializer.DeserializeResetPoints(info.AutoResetPoints)
742✔
128
        if err != nil {
742✔
129
                return nil, nil, err
×
130
        }
×
131

132
        newInfo := &WorkflowExecutionInfo{
742✔
133
                CompletionEvent: completionEvent,
742✔
134

742✔
135
                DomainID:                           info.DomainID,
742✔
136
                WorkflowID:                         info.WorkflowID,
742✔
137
                RunID:                              info.RunID,
742✔
138
                FirstExecutionRunID:                info.FirstExecutionRunID,
742✔
139
                ParentDomainID:                     info.ParentDomainID,
742✔
140
                ParentWorkflowID:                   info.ParentWorkflowID,
742✔
141
                ParentRunID:                        info.ParentRunID,
742✔
142
                InitiatedID:                        info.InitiatedID,
742✔
143
                CompletionEventBatchID:             info.CompletionEventBatchID,
742✔
144
                TaskList:                           info.TaskList,
742✔
145
                IsCron:                             len(info.CronSchedule) > 0,
742✔
146
                WorkflowTypeName:                   info.WorkflowTypeName,
742✔
147
                WorkflowTimeout:                    int32(info.WorkflowTimeout.Seconds()),
742✔
148
                DecisionStartToCloseTimeout:        int32(info.DecisionStartToCloseTimeout.Seconds()),
742✔
149
                ExecutionContext:                   info.ExecutionContext,
742✔
150
                State:                              info.State,
742✔
151
                CloseStatus:                        info.CloseStatus,
742✔
152
                LastFirstEventID:                   info.LastFirstEventID,
742✔
153
                LastEventTaskID:                    info.LastEventTaskID,
742✔
154
                NextEventID:                        info.NextEventID,
742✔
155
                LastProcessedEvent:                 info.LastProcessedEvent,
742✔
156
                StartTimestamp:                     info.StartTimestamp,
742✔
157
                LastUpdatedTimestamp:               info.LastUpdatedTimestamp,
742✔
158
                CreateRequestID:                    info.CreateRequestID,
742✔
159
                SignalCount:                        info.SignalCount,
742✔
160
                DecisionVersion:                    info.DecisionVersion,
742✔
161
                DecisionScheduleID:                 info.DecisionScheduleID,
742✔
162
                DecisionStartedID:                  info.DecisionStartedID,
742✔
163
                DecisionRequestID:                  info.DecisionRequestID,
742✔
164
                DecisionTimeout:                    int32(info.DecisionTimeout.Seconds()),
742✔
165
                DecisionAttempt:                    info.DecisionAttempt,
742✔
166
                DecisionStartedTimestamp:           info.DecisionStartedTimestamp.UnixNano(),
742✔
167
                DecisionScheduledTimestamp:         info.DecisionScheduledTimestamp.UnixNano(),
742✔
168
                DecisionOriginalScheduledTimestamp: info.DecisionOriginalScheduledTimestamp.UnixNano(),
742✔
169
                CancelRequested:                    info.CancelRequested,
742✔
170
                CancelRequestID:                    info.CancelRequestID,
742✔
171
                StickyTaskList:                     info.StickyTaskList,
742✔
172
                StickyScheduleToStartTimeout:       int32(info.StickyScheduleToStartTimeout.Seconds()),
742✔
173
                ClientLibraryVersion:               info.ClientLibraryVersion,
742✔
174
                ClientFeatureVersion:               info.ClientFeatureVersion,
742✔
175
                ClientImpl:                         info.ClientImpl,
742✔
176
                Attempt:                            info.Attempt,
742✔
177
                HasRetryPolicy:                     info.HasRetryPolicy,
742✔
178
                InitialInterval:                    int32(info.InitialInterval.Seconds()),
742✔
179
                BackoffCoefficient:                 info.BackoffCoefficient,
742✔
180
                MaximumInterval:                    int32(info.MaximumInterval.Seconds()),
742✔
181
                ExpirationTime:                     info.ExpirationTime,
742✔
182
                MaximumAttempts:                    info.MaximumAttempts,
742✔
183
                NonRetriableErrors:                 info.NonRetriableErrors,
742✔
184
                BranchToken:                        info.BranchToken,
742✔
185
                CronSchedule:                       info.CronSchedule,
742✔
186
                ExpirationSeconds:                  int32(info.ExpirationInterval.Seconds()),
742✔
187
                AutoResetPoints:                    autoResetPoints,
742✔
188
                SearchAttributes:                   info.SearchAttributes,
742✔
189
                Memo:                               info.Memo,
742✔
190
                PartitionConfig:                    info.PartitionConfig,
742✔
191
        }
742✔
192
        newStats := &ExecutionStats{
742✔
193
                HistorySize: info.HistorySize,
742✔
194
        }
742✔
195
        return newInfo, newStats, nil
742✔
196
}
197

198
func (m *executionManagerImpl) DeserializeBufferedEvents(
199
        blobs []*DataBlob,
200
) ([]*types.HistoryEvent, error) {
742✔
201

742✔
202
        events := make([]*types.HistoryEvent, 0)
742✔
203
        for _, b := range blobs {
746✔
204
                history, err := m.serializer.DeserializeBatchEvents(b)
4✔
205
                if err != nil {
4✔
206
                        return nil, err
×
207
                }
×
208
                events = append(events, history...)
4✔
209
        }
210
        return events, nil
742✔
211
}
212

213
func (m *executionManagerImpl) DeserializeChildExecutionInfos(
214
        infos map[int64]*InternalChildExecutionInfo,
215
) (map[int64]*ChildExecutionInfo, error) {
742✔
216

742✔
217
        newInfos := make(map[int64]*ChildExecutionInfo)
742✔
218
        for k, v := range infos {
747✔
219
                initiatedEvent, err := m.serializer.DeserializeEvent(v.InitiatedEvent)
5✔
220
                if err != nil {
5✔
221
                        return nil, err
×
222
                }
×
223
                startedEvent, err := m.serializer.DeserializeEvent(v.StartedEvent)
5✔
224
                if err != nil {
5✔
225
                        return nil, err
×
226
                }
×
227
                c := &ChildExecutionInfo{
5✔
228
                        InitiatedEvent: initiatedEvent,
5✔
229
                        StartedEvent:   startedEvent,
5✔
230

5✔
231
                        Version:               v.Version,
5✔
232
                        InitiatedID:           v.InitiatedID,
5✔
233
                        InitiatedEventBatchID: v.InitiatedEventBatchID,
5✔
234
                        StartedID:             v.StartedID,
5✔
235
                        StartedWorkflowID:     v.StartedWorkflowID,
5✔
236
                        StartedRunID:          v.StartedRunID,
5✔
237
                        CreateRequestID:       v.CreateRequestID,
5✔
238
                        DomainID:              v.DomainID,
5✔
239
                        DomainNameDEPRECATED:  v.DomainNameDEPRECATED,
5✔
240
                        WorkflowTypeName:      v.WorkflowTypeName,
5✔
241
                        ParentClosePolicy:     v.ParentClosePolicy,
5✔
242
                }
5✔
243

5✔
244
                // Needed for backward compatibility reason.
5✔
245
                // ChildWorkflowExecutionStartedEvent was only used by transfer queue processing of StartChildWorkflow.
5✔
246
                // Updated the code to instead directly read WorkflowId and RunId from mutable state
5✔
247
                // Existing mutable state won't have those values set so instead use started event to set StartedWorkflowID and
5✔
248
                // StartedRunID on the mutable state before passing it to application
5✔
249
                if startedEvent != nil && startedEvent.ChildWorkflowExecutionStartedEventAttributes != nil &&
5✔
250
                        startedEvent.ChildWorkflowExecutionStartedEventAttributes.WorkflowExecution != nil {
5✔
251
                        startedExecution := startedEvent.ChildWorkflowExecutionStartedEventAttributes.WorkflowExecution
×
252
                        c.StartedWorkflowID = startedExecution.GetWorkflowID()
×
253
                        c.StartedRunID = startedExecution.GetRunID()
×
254
                }
×
255
                newInfos[k] = c
5✔
256
        }
257
        return newInfos, nil
742✔
258
}
259

260
func (m *executionManagerImpl) DeserializeActivityInfos(
261
        infos map[int64]*InternalActivityInfo,
262
) (map[int64]*ActivityInfo, error) {
742✔
263

742✔
264
        newInfos := make(map[int64]*ActivityInfo)
742✔
265
        for k, v := range infos {
848✔
266
                scheduledEvent, err := m.serializer.DeserializeEvent(v.ScheduledEvent)
106✔
267
                if err != nil {
106✔
268
                        return nil, err
×
269
                }
×
270
                startedEvent, err := m.serializer.DeserializeEvent(v.StartedEvent)
106✔
271
                if err != nil {
106✔
272
                        return nil, err
×
273
                }
×
274
                a := &ActivityInfo{
106✔
275
                        ScheduledEvent: scheduledEvent,
106✔
276
                        StartedEvent:   startedEvent,
106✔
277

106✔
278
                        Version:                                 v.Version,
106✔
279
                        ScheduleID:                              v.ScheduleID,
106✔
280
                        ScheduledEventBatchID:                   v.ScheduledEventBatchID,
106✔
281
                        ScheduledTime:                           v.ScheduledTime,
106✔
282
                        StartedID:                               v.StartedID,
106✔
283
                        StartedTime:                             v.StartedTime,
106✔
284
                        ActivityID:                              v.ActivityID,
106✔
285
                        RequestID:                               v.RequestID,
106✔
286
                        Details:                                 v.Details,
106✔
287
                        ScheduleToStartTimeout:                  int32(v.ScheduleToStartTimeout.Seconds()),
106✔
288
                        ScheduleToCloseTimeout:                  int32(v.ScheduleToCloseTimeout.Seconds()),
106✔
289
                        StartToCloseTimeout:                     int32(v.StartToCloseTimeout.Seconds()),
106✔
290
                        HeartbeatTimeout:                        int32(v.HeartbeatTimeout.Seconds()),
106✔
291
                        CancelRequested:                         v.CancelRequested,
106✔
292
                        CancelRequestID:                         v.CancelRequestID,
106✔
293
                        LastHeartBeatUpdatedTime:                v.LastHeartBeatUpdatedTime,
106✔
294
                        TimerTaskStatus:                         v.TimerTaskStatus,
106✔
295
                        Attempt:                                 v.Attempt,
106✔
296
                        DomainID:                                v.DomainID,
106✔
297
                        StartedIdentity:                         v.StartedIdentity,
106✔
298
                        TaskList:                                v.TaskList,
106✔
299
                        HasRetryPolicy:                          v.HasRetryPolicy,
106✔
300
                        InitialInterval:                         int32(v.InitialInterval.Seconds()),
106✔
301
                        BackoffCoefficient:                      v.BackoffCoefficient,
106✔
302
                        MaximumInterval:                         int32(v.MaximumInterval.Seconds()),
106✔
303
                        ExpirationTime:                          v.ExpirationTime,
106✔
304
                        MaximumAttempts:                         v.MaximumAttempts,
106✔
305
                        NonRetriableErrors:                      v.NonRetriableErrors,
106✔
306
                        LastFailureReason:                       v.LastFailureReason,
106✔
307
                        LastWorkerIdentity:                      v.LastWorkerIdentity,
106✔
308
                        LastFailureDetails:                      v.LastFailureDetails,
106✔
309
                        LastHeartbeatTimeoutVisibilityInSeconds: v.LastHeartbeatTimeoutVisibilityInSeconds,
106✔
310
                }
106✔
311
                newInfos[k] = a
106✔
312
        }
313
        return newInfos, nil
742✔
314
}
315

316
func (m *executionManagerImpl) UpdateWorkflowExecution(
317
        ctx context.Context,
318
        request *UpdateWorkflowExecutionRequest,
319
) (*UpdateWorkflowExecutionResponse, error) {
4,411✔
320

4,411✔
321
        serializedWorkflowMutation, err := m.SerializeWorkflowMutation(&request.UpdateWorkflowMutation, request.Encoding)
4,411✔
322
        if err != nil {
4,411✔
323
                return nil, err
×
324
        }
×
325
        var serializedNewWorkflowSnapshot *InternalWorkflowSnapshot
4,411✔
326
        if request.NewWorkflowSnapshot != nil {
4,588✔
327
                serializedNewWorkflowSnapshot, err = m.SerializeWorkflowSnapshot(request.NewWorkflowSnapshot, request.Encoding)
177✔
328
                if err != nil {
177✔
329
                        return nil, err
×
330
                }
×
331
        }
332

333
        newRequest := &InternalUpdateWorkflowExecutionRequest{
4,411✔
334
                RangeID: request.RangeID,
4,411✔
335

4,411✔
336
                Mode: request.Mode,
4,411✔
337

4,411✔
338
                UpdateWorkflowMutation: *serializedWorkflowMutation,
4,411✔
339
                NewWorkflowSnapshot:    serializedNewWorkflowSnapshot,
4,411✔
340
        }
4,411✔
341
        msuss := m.statsComputer.computeMutableStateUpdateStats(newRequest)
4,411✔
342
        err = m.persistence.UpdateWorkflowExecution(ctx, newRequest)
4,411✔
343
        if err != nil {
4,411✔
344
                return nil, err
×
345
        }
×
346
        return &UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: msuss}, nil
4,411✔
347
}
348

349
func (m *executionManagerImpl) SerializeUpsertChildExecutionInfos(
350
        infos []*ChildExecutionInfo,
351
        encoding common.EncodingType,
352
) ([]*InternalChildExecutionInfo, error) {
5,086✔
353

5,086✔
354
        newInfos := make([]*InternalChildExecutionInfo, 0)
5,086✔
355
        for _, v := range infos {
5,125✔
356
                initiatedEvent, err := m.serializer.SerializeEvent(v.InitiatedEvent, encoding)
39✔
357
                if err != nil {
39✔
358
                        return nil, err
×
359
                }
×
360
                startedEvent, err := m.serializer.SerializeEvent(v.StartedEvent, encoding)
39✔
361
                if err != nil {
39✔
362
                        return nil, err
×
363
                }
×
364
                i := &InternalChildExecutionInfo{
39✔
365
                        InitiatedEvent: initiatedEvent,
39✔
366
                        StartedEvent:   startedEvent,
39✔
367

39✔
368
                        Version:               v.Version,
39✔
369
                        InitiatedID:           v.InitiatedID,
39✔
370
                        InitiatedEventBatchID: v.InitiatedEventBatchID,
39✔
371
                        CreateRequestID:       v.CreateRequestID,
39✔
372
                        StartedID:             v.StartedID,
39✔
373
                        StartedWorkflowID:     v.StartedWorkflowID,
39✔
374
                        StartedRunID:          v.StartedRunID,
39✔
375
                        DomainID:              v.DomainID,
39✔
376
                        DomainNameDEPRECATED:  v.DomainNameDEPRECATED,
39✔
377
                        WorkflowTypeName:      v.WorkflowTypeName,
39✔
378
                        ParentClosePolicy:     v.ParentClosePolicy,
39✔
379
                }
39✔
380
                newInfos = append(newInfos, i)
39✔
381
        }
382
        return newInfos, nil
5,086✔
383
}
384

385
func (m *executionManagerImpl) SerializeUpsertActivityInfos(
386
        infos []*ActivityInfo,
387
        encoding common.EncodingType,
388
) ([]*InternalActivityInfo, error) {
5,086✔
389

5,086✔
390
        newInfos := make([]*InternalActivityInfo, 0)
5,086✔
391
        for _, v := range infos {
6,345✔
392
                scheduledEvent, err := m.serializer.SerializeEvent(v.ScheduledEvent, encoding)
1,259✔
393
                if err != nil {
1,259✔
394
                        return nil, err
×
395
                }
×
396
                startedEvent, err := m.serializer.SerializeEvent(v.StartedEvent, encoding)
1,259✔
397
                if err != nil {
1,259✔
398
                        return nil, err
×
399
                }
×
400
                i := &InternalActivityInfo{
1,259✔
401
                        Version:                                 v.Version,
1,259✔
402
                        ScheduleID:                              v.ScheduleID,
1,259✔
403
                        ScheduledEventBatchID:                   v.ScheduledEventBatchID,
1,259✔
404
                        ScheduledEvent:                          scheduledEvent,
1,259✔
405
                        ScheduledTime:                           v.ScheduledTime,
1,259✔
406
                        StartedID:                               v.StartedID,
1,259✔
407
                        StartedEvent:                            startedEvent,
1,259✔
408
                        StartedTime:                             v.StartedTime,
1,259✔
409
                        ActivityID:                              v.ActivityID,
1,259✔
410
                        RequestID:                               v.RequestID,
1,259✔
411
                        Details:                                 v.Details,
1,259✔
412
                        ScheduleToStartTimeout:                  common.SecondsToDuration(int64(v.ScheduleToStartTimeout)),
1,259✔
413
                        ScheduleToCloseTimeout:                  common.SecondsToDuration(int64(v.ScheduleToCloseTimeout)),
1,259✔
414
                        StartToCloseTimeout:                     common.SecondsToDuration(int64(v.StartToCloseTimeout)),
1,259✔
415
                        HeartbeatTimeout:                        common.SecondsToDuration(int64(v.HeartbeatTimeout)),
1,259✔
416
                        CancelRequested:                         v.CancelRequested,
1,259✔
417
                        CancelRequestID:                         v.CancelRequestID,
1,259✔
418
                        LastHeartBeatUpdatedTime:                v.LastHeartBeatUpdatedTime,
1,259✔
419
                        TimerTaskStatus:                         v.TimerTaskStatus,
1,259✔
420
                        Attempt:                                 v.Attempt,
1,259✔
421
                        DomainID:                                v.DomainID,
1,259✔
422
                        StartedIdentity:                         v.StartedIdentity,
1,259✔
423
                        TaskList:                                v.TaskList,
1,259✔
424
                        HasRetryPolicy:                          v.HasRetryPolicy,
1,259✔
425
                        InitialInterval:                         common.SecondsToDuration(int64(v.InitialInterval)),
1,259✔
426
                        BackoffCoefficient:                      v.BackoffCoefficient,
1,259✔
427
                        MaximumInterval:                         common.SecondsToDuration(int64(v.MaximumInterval)),
1,259✔
428
                        ExpirationTime:                          v.ExpirationTime,
1,259✔
429
                        MaximumAttempts:                         v.MaximumAttempts,
1,259✔
430
                        NonRetriableErrors:                      v.NonRetriableErrors,
1,259✔
431
                        LastFailureReason:                       v.LastFailureReason,
1,259✔
432
                        LastWorkerIdentity:                      v.LastWorkerIdentity,
1,259✔
433
                        LastFailureDetails:                      v.LastFailureDetails,
1,259✔
434
                        LastHeartbeatTimeoutVisibilityInSeconds: v.LastHeartbeatTimeoutVisibilityInSeconds,
1,259✔
435
                }
1,259✔
436
                newInfos = append(newInfos, i)
1,259✔
437
        }
438
        return newInfos, nil
5,086✔
439
}
440

441
func (m *executionManagerImpl) SerializeExecutionInfo(
442
        info *WorkflowExecutionInfo,
443
        stats *ExecutionStats,
444
        encoding common.EncodingType,
445
) (*InternalWorkflowExecutionInfo, error) {
5,086✔
446

5,086✔
447
        if info == nil {
5,086✔
448
                return &InternalWorkflowExecutionInfo{}, nil
×
449
        }
×
450
        completionEvent, err := m.serializer.SerializeEvent(info.CompletionEvent, encoding)
5,086✔
451
        if err != nil {
5,086✔
452
                return nil, err
×
453
        }
×
454

455
        resetPoints, err := m.serializer.SerializeResetPoints(info.AutoResetPoints, encoding)
5,086✔
456
        if err != nil {
5,086✔
457
                return nil, err
×
458
        }
×
459

460
        return &InternalWorkflowExecutionInfo{
5,086✔
461
                DomainID:                           info.DomainID,
5,086✔
462
                WorkflowID:                         info.WorkflowID,
5,086✔
463
                RunID:                              info.RunID,
5,086✔
464
                FirstExecutionRunID:                info.FirstExecutionRunID,
5,086✔
465
                ParentDomainID:                     info.ParentDomainID,
5,086✔
466
                ParentWorkflowID:                   info.ParentWorkflowID,
5,086✔
467
                ParentRunID:                        info.ParentRunID,
5,086✔
468
                InitiatedID:                        info.InitiatedID,
5,086✔
469
                CompletionEventBatchID:             info.CompletionEventBatchID,
5,086✔
470
                CompletionEvent:                    completionEvent,
5,086✔
471
                TaskList:                           info.TaskList,
5,086✔
472
                WorkflowTypeName:                   info.WorkflowTypeName,
5,086✔
473
                WorkflowTimeout:                    common.SecondsToDuration(int64(info.WorkflowTimeout)),
5,086✔
474
                DecisionStartToCloseTimeout:        common.SecondsToDuration(int64(info.DecisionStartToCloseTimeout)),
5,086✔
475
                ExecutionContext:                   info.ExecutionContext,
5,086✔
476
                State:                              info.State,
5,086✔
477
                CloseStatus:                        info.CloseStatus,
5,086✔
478
                LastFirstEventID:                   info.LastFirstEventID,
5,086✔
479
                LastEventTaskID:                    info.LastEventTaskID,
5,086✔
480
                NextEventID:                        info.NextEventID,
5,086✔
481
                LastProcessedEvent:                 info.LastProcessedEvent,
5,086✔
482
                StartTimestamp:                     info.StartTimestamp,
5,086✔
483
                LastUpdatedTimestamp:               info.LastUpdatedTimestamp,
5,086✔
484
                CreateRequestID:                    info.CreateRequestID,
5,086✔
485
                SignalCount:                        info.SignalCount,
5,086✔
486
                DecisionVersion:                    info.DecisionVersion,
5,086✔
487
                DecisionScheduleID:                 info.DecisionScheduleID,
5,086✔
488
                DecisionStartedID:                  info.DecisionStartedID,
5,086✔
489
                DecisionRequestID:                  info.DecisionRequestID,
5,086✔
490
                DecisionTimeout:                    common.SecondsToDuration(int64(info.DecisionTimeout)),
5,086✔
491
                DecisionAttempt:                    info.DecisionAttempt,
5,086✔
492
                DecisionStartedTimestamp:           time.Unix(0, info.DecisionStartedTimestamp),
5,086✔
493
                DecisionScheduledTimestamp:         time.Unix(0, info.DecisionScheduledTimestamp),
5,086✔
494
                DecisionOriginalScheduledTimestamp: time.Unix(0, info.DecisionOriginalScheduledTimestamp),
5,086✔
495
                CancelRequested:                    info.CancelRequested,
5,086✔
496
                CancelRequestID:                    info.CancelRequestID,
5,086✔
497
                StickyTaskList:                     info.StickyTaskList,
5,086✔
498
                StickyScheduleToStartTimeout:       common.SecondsToDuration(int64(info.StickyScheduleToStartTimeout)),
5,086✔
499
                ClientLibraryVersion:               info.ClientLibraryVersion,
5,086✔
500
                ClientFeatureVersion:               info.ClientFeatureVersion,
5,086✔
501
                ClientImpl:                         info.ClientImpl,
5,086✔
502
                AutoResetPoints:                    resetPoints,
5,086✔
503
                Attempt:                            info.Attempt,
5,086✔
504
                HasRetryPolicy:                     info.HasRetryPolicy,
5,086✔
505
                InitialInterval:                    common.SecondsToDuration(int64(info.InitialInterval)),
5,086✔
506
                BackoffCoefficient:                 info.BackoffCoefficient,
5,086✔
507
                MaximumInterval:                    common.SecondsToDuration(int64(info.MaximumInterval)),
5,086✔
508
                ExpirationTime:                     info.ExpirationTime,
5,086✔
509
                MaximumAttempts:                    info.MaximumAttempts,
5,086✔
510
                NonRetriableErrors:                 info.NonRetriableErrors,
5,086✔
511
                BranchToken:                        info.BranchToken,
5,086✔
512
                CronSchedule:                       info.CronSchedule,
5,086✔
513
                ExpirationInterval:                 common.SecondsToDuration(int64(info.ExpirationSeconds)),
5,086✔
514
                Memo:                               info.Memo,
5,086✔
515
                SearchAttributes:                   info.SearchAttributes,
5,086✔
516
                PartitionConfig:                    info.PartitionConfig,
5,086✔
517

5,086✔
518
                // attributes which are not related to mutable state
5,086✔
519
                HistorySize: stats.HistorySize,
5,086✔
520
        }, nil
5,086✔
521
}
522

523
func (m *executionManagerImpl) ConflictResolveWorkflowExecution(
524
        ctx context.Context,
525
        request *ConflictResolveWorkflowExecutionRequest,
526
) (*ConflictResolveWorkflowExecutionResponse, error) {
3✔
527

3✔
528
        serializedResetWorkflowSnapshot, err := m.SerializeWorkflowSnapshot(&request.ResetWorkflowSnapshot, request.Encoding)
3✔
529
        if err != nil {
3✔
530
                return nil, err
×
531
        }
×
532
        var serializedCurrentWorkflowMutation *InternalWorkflowMutation
3✔
533
        if request.CurrentWorkflowMutation != nil {
3✔
534
                serializedCurrentWorkflowMutation, err = m.SerializeWorkflowMutation(request.CurrentWorkflowMutation, request.Encoding)
×
535
                if err != nil {
×
536
                        return nil, err
×
537
                }
×
538
        }
539
        var serializedNewWorkflowMutation *InternalWorkflowSnapshot
3✔
540
        if request.NewWorkflowSnapshot != nil {
3✔
541
                serializedNewWorkflowMutation, err = m.SerializeWorkflowSnapshot(request.NewWorkflowSnapshot, request.Encoding)
×
542
                if err != nil {
×
543
                        return nil, err
×
544
                }
×
545
        }
546

547
        newRequest := &InternalConflictResolveWorkflowExecutionRequest{
3✔
548
                RangeID: request.RangeID,
3✔
549

3✔
550
                Mode: request.Mode,
3✔
551

3✔
552
                ResetWorkflowSnapshot: *serializedResetWorkflowSnapshot,
3✔
553

3✔
554
                NewWorkflowSnapshot: serializedNewWorkflowMutation,
3✔
555

3✔
556
                CurrentWorkflowMutation: serializedCurrentWorkflowMutation,
3✔
557
        }
3✔
558
        msuss := m.statsComputer.computeMutableStateConflictResolveStats(newRequest)
3✔
559
        err = m.persistence.ConflictResolveWorkflowExecution(ctx, newRequest)
3✔
560
        if err != nil {
3✔
561
                return nil, err
×
562
        }
×
563
        return &ConflictResolveWorkflowExecutionResponse{MutableStateUpdateSessionStats: msuss}, nil
3✔
564
}
565

566
func (m *executionManagerImpl) CreateWorkflowExecution(
567
        ctx context.Context,
568
        request *CreateWorkflowExecutionRequest,
569
) (*CreateWorkflowExecutionResponse, error) {
504✔
570

504✔
571
        encoding := common.EncodingTypeThriftRW
504✔
572

504✔
573
        serializedNewWorkflowSnapshot, err := m.SerializeWorkflowSnapshot(&request.NewWorkflowSnapshot, encoding)
504✔
574
        if err != nil {
504✔
575
                return nil, err
×
576
        }
×
577

578
        newRequest := &InternalCreateWorkflowExecutionRequest{
504✔
579
                RangeID: request.RangeID,
504✔
580

504✔
581
                Mode: request.Mode,
504✔
582

504✔
583
                PreviousRunID:            request.PreviousRunID,
504✔
584
                PreviousLastWriteVersion: request.PreviousLastWriteVersion,
504✔
585

504✔
586
                NewWorkflowSnapshot: *serializedNewWorkflowSnapshot,
504✔
587
        }
504✔
588

504✔
589
        msuss := m.statsComputer.computeMutableStateCreateStats(newRequest)
504✔
590
        _, err = m.persistence.CreateWorkflowExecution(ctx, newRequest)
504✔
591
        if err != nil {
549✔
592
                return nil, err
45✔
593
        }
45✔
594
        return &CreateWorkflowExecutionResponse{MutableStateUpdateSessionStats: msuss}, nil
462✔
595
}
596

597
func (m *executionManagerImpl) SerializeWorkflowMutation(
598
        input *WorkflowMutation,
599
        encoding common.EncodingType,
600
) (*InternalWorkflowMutation, error) {
4,411✔
601

4,411✔
602
        serializedExecutionInfo, err := m.SerializeExecutionInfo(
4,411✔
603
                input.ExecutionInfo,
4,411✔
604
                input.ExecutionStats,
4,411✔
605
                encoding,
4,411✔
606
        )
4,411✔
607
        if err != nil {
4,411✔
608
                return nil, err
×
609
        }
×
610
        serializedVersionHistories, err := m.SerializeVersionHistories(input.VersionHistories, encoding)
4,411✔
611
        if err != nil {
4,411✔
612
                return nil, err
×
613
        }
×
614
        serializedUpsertActivityInfos, err := m.SerializeUpsertActivityInfos(input.UpsertActivityInfos, encoding)
4,411✔
615
        if err != nil {
4,411✔
616
                return nil, err
×
617
        }
×
618
        serializedUpsertChildExecutionInfos, err := m.SerializeUpsertChildExecutionInfos(input.UpsertChildExecutionInfos, encoding)
4,411✔
619
        if err != nil {
4,411✔
620
                return nil, err
×
621
        }
×
622
        var serializedNewBufferedEvents *DataBlob
4,411✔
623
        if input.NewBufferedEvents != nil {
5,054✔
624
                serializedNewBufferedEvents, err = m.serializer.SerializeBatchEvents(input.NewBufferedEvents, encoding)
643✔
625
                if err != nil {
643✔
626
                        return nil, err
×
627
                }
×
628
        }
629

630
        startVersion, err := getStartVersion(input.VersionHistories)
4,411✔
631
        if err != nil {
4,411✔
632
                return nil, err
×
633
        }
×
634
        lastWriteVersion, err := getLastWriteVersion(input.VersionHistories)
4,411✔
635
        if err != nil {
4,411✔
636
                return nil, err
×
637
        }
×
638

639
        return &InternalWorkflowMutation{
4,411✔
640
                ExecutionInfo:    serializedExecutionInfo,
4,411✔
641
                VersionHistories: serializedVersionHistories,
4,411✔
642
                StartVersion:     startVersion,
4,411✔
643
                LastWriteVersion: lastWriteVersion,
4,411✔
644

4,411✔
645
                UpsertActivityInfos:       serializedUpsertActivityInfos,
4,411✔
646
                DeleteActivityInfos:       input.DeleteActivityInfos,
4,411✔
647
                UpsertTimerInfos:          input.UpsertTimerInfos,
4,411✔
648
                DeleteTimerInfos:          input.DeleteTimerInfos,
4,411✔
649
                UpsertChildExecutionInfos: serializedUpsertChildExecutionInfos,
4,411✔
650
                DeleteChildExecutionInfos: input.DeleteChildExecutionInfos,
4,411✔
651
                UpsertRequestCancelInfos:  input.UpsertRequestCancelInfos,
4,411✔
652
                DeleteRequestCancelInfos:  input.DeleteRequestCancelInfos,
4,411✔
653
                UpsertSignalInfos:         input.UpsertSignalInfos,
4,411✔
654
                DeleteSignalInfos:         input.DeleteSignalInfos,
4,411✔
655
                UpsertSignalRequestedIDs:  input.UpsertSignalRequestedIDs,
4,411✔
656
                DeleteSignalRequestedIDs:  input.DeleteSignalRequestedIDs,
4,411✔
657
                NewBufferedEvents:         serializedNewBufferedEvents,
4,411✔
658
                ClearBufferedEvents:       input.ClearBufferedEvents,
4,411✔
659

4,411✔
660
                TransferTasks:     input.TransferTasks,
4,411✔
661
                CrossClusterTasks: input.CrossClusterTasks,
4,411✔
662
                ReplicationTasks:  input.ReplicationTasks,
4,411✔
663
                TimerTasks:        input.TimerTasks,
4,411✔
664

4,411✔
665
                Condition: input.Condition,
4,411✔
666
                Checksum:  input.Checksum,
4,411✔
667
        }, nil
4,411✔
668
}
669

670
func (m *executionManagerImpl) SerializeWorkflowSnapshot(
671
        input *WorkflowSnapshot,
672
        encoding common.EncodingType,
673
) (*InternalWorkflowSnapshot, error) {
678✔
674

678✔
675
        serializedExecutionInfo, err := m.SerializeExecutionInfo(
678✔
676
                input.ExecutionInfo,
678✔
677
                input.ExecutionStats,
678✔
678
                encoding,
678✔
679
        )
678✔
680
        if err != nil {
678✔
681
                return nil, err
×
682
        }
×
683
        serializedVersionHistories, err := m.SerializeVersionHistories(input.VersionHistories, encoding)
678✔
684
        if err != nil {
678✔
685
                return nil, err
×
686
        }
×
687
        serializedActivityInfos, err := m.SerializeUpsertActivityInfos(input.ActivityInfos, encoding)
678✔
688
        if err != nil {
678✔
689
                return nil, err
×
690
        }
×
691
        serializedChildExecutionInfos, err := m.SerializeUpsertChildExecutionInfos(input.ChildExecutionInfos, encoding)
678✔
692
        if err != nil {
678✔
693
                return nil, err
×
694
        }
×
695

696
        startVersion, err := getStartVersion(input.VersionHistories)
678✔
697
        if err != nil {
678✔
698
                return nil, err
×
699
        }
×
700
        lastWriteVersion, err := getLastWriteVersion(input.VersionHistories)
678✔
701
        if err != nil {
678✔
702
                return nil, err
×
703
        }
×
704

705
        return &InternalWorkflowSnapshot{
678✔
706
                ExecutionInfo:    serializedExecutionInfo,
678✔
707
                VersionHistories: serializedVersionHistories,
678✔
708
                StartVersion:     startVersion,
678✔
709
                LastWriteVersion: lastWriteVersion,
678✔
710

678✔
711
                ActivityInfos:       serializedActivityInfos,
678✔
712
                TimerInfos:          input.TimerInfos,
678✔
713
                ChildExecutionInfos: serializedChildExecutionInfos,
678✔
714
                RequestCancelInfos:  input.RequestCancelInfos,
678✔
715
                SignalInfos:         input.SignalInfos,
678✔
716
                SignalRequestedIDs:  input.SignalRequestedIDs,
678✔
717

678✔
718
                TransferTasks:     input.TransferTasks,
678✔
719
                CrossClusterTasks: input.CrossClusterTasks,
678✔
720
                ReplicationTasks:  input.ReplicationTasks,
678✔
721
                TimerTasks:        input.TimerTasks,
678✔
722

678✔
723
                Condition: input.Condition,
678✔
724
                Checksum:  input.Checksum,
678✔
725
        }, nil
678✔
726
}
727

728
func (m *executionManagerImpl) SerializeVersionHistories(
729
        versionHistories *VersionHistories,
730
        encoding common.EncodingType,
731
) (*DataBlob, error) {
5,086✔
732

5,086✔
733
        if versionHistories == nil {
5,086✔
734
                return nil, nil
×
735
        }
×
736
        return m.serializer.SerializeVersionHistories(versionHistories.ToInternalType(), encoding)
5,086✔
737
}
738

739
func (m *executionManagerImpl) DeserializeVersionHistories(
740
        blob *DataBlob,
741
) (*VersionHistories, error) {
742✔
742

742✔
743
        if blob == nil {
742✔
744
                return nil, nil
×
745
        }
×
746
        versionHistories, err := m.serializer.DeserializeVersionHistories(blob)
742✔
747
        if err != nil {
742✔
748
                return nil, err
×
749
        }
×
750
        return NewVersionHistoriesFromInternalType(versionHistories), nil
742✔
751
}
752

753
func (m *executionManagerImpl) DeleteWorkflowExecution(
754
        ctx context.Context,
755
        request *DeleteWorkflowExecutionRequest,
756
) error {
54✔
757
        return m.persistence.DeleteWorkflowExecution(ctx, request)
54✔
758
}
54✔
759

760
func (m *executionManagerImpl) DeleteCurrentWorkflowExecution(
761
        ctx context.Context,
762
        request *DeleteCurrentWorkflowExecutionRequest,
763
) error {
54✔
764
        return m.persistence.DeleteCurrentWorkflowExecution(ctx, request)
54✔
765
}
54✔
766

767
func (m *executionManagerImpl) GetCurrentExecution(
768
        ctx context.Context,
769
        request *GetCurrentExecutionRequest,
770
) (*GetCurrentExecutionResponse, error) {
183✔
771
        return m.persistence.GetCurrentExecution(ctx, request)
183✔
772
}
183✔
773

774
func (m *executionManagerImpl) ListCurrentExecutions(
775
        ctx context.Context,
776
        request *ListCurrentExecutionsRequest,
777
) (*ListCurrentExecutionsResponse, error) {
×
778
        return m.persistence.ListCurrentExecutions(ctx, request)
×
779
}
×
780

781
func (m *executionManagerImpl) IsWorkflowExecutionExists(
782
        ctx context.Context,
783
        request *IsWorkflowExecutionExistsRequest,
784
) (*IsWorkflowExecutionExistsResponse, error) {
×
785
        return m.persistence.IsWorkflowExecutionExists(ctx, request)
×
786
}
×
787

788
func (m *executionManagerImpl) ListConcreteExecutions(
789
        ctx context.Context,
790
        request *ListConcreteExecutionsRequest,
791
) (*ListConcreteExecutionsResponse, error) {
×
792
        response, err := m.persistence.ListConcreteExecutions(ctx, request)
×
793
        if err != nil {
×
794
                return nil, err
×
795
        }
×
796
        newResponse := &ListConcreteExecutionsResponse{
×
797
                Executions: make([]*ListConcreteExecutionsEntity, len(response.Executions)),
×
798
                PageToken:  response.NextPageToken,
×
799
        }
×
800
        for i, e := range response.Executions {
×
801
                info, _, err := m.DeserializeExecutionInfo(e.ExecutionInfo)
×
802
                if err != nil {
×
803
                        return nil, err
×
804
                }
×
805
                vh, err := m.DeserializeVersionHistories(e.VersionHistories)
×
806
                if err != nil {
×
807
                        return nil, err
×
808
                }
×
809
                newResponse.Executions[i] = &ListConcreteExecutionsEntity{
×
810
                        ExecutionInfo:    info,
×
811
                        VersionHistories: vh,
×
812
                }
×
813
        }
814
        return newResponse, nil
×
815
}
816

817
// Transfer task related methods
818
func (m *executionManagerImpl) GetTransferTasks(
819
        ctx context.Context,
820
        request *GetTransferTasksRequest,
821
) (*GetTransferTasksResponse, error) {
2,316✔
822
        return m.persistence.GetTransferTasks(ctx, request)
2,316✔
823
}
2,316✔
824

825
func (m *executionManagerImpl) CompleteTransferTask(
826
        ctx context.Context,
827
        request *CompleteTransferTaskRequest,
828
) error {
×
829
        return m.persistence.CompleteTransferTask(ctx, request)
×
830
}
×
831

832
func (m *executionManagerImpl) RangeCompleteTransferTask(
833
        ctx context.Context,
834
        request *RangeCompleteTransferTaskRequest,
835
) (*RangeCompleteTransferTaskResponse, error) {
111✔
836
        return m.persistence.RangeCompleteTransferTask(ctx, request)
111✔
837
}
111✔
838

839
// Cross-cluster task related methods
840
func (m *executionManagerImpl) GetCrossClusterTasks(
841
        ctx context.Context,
842
        request *GetCrossClusterTasksRequest,
843
) (*GetCrossClusterTasksResponse, error) {
165✔
844
        return m.persistence.GetCrossClusterTasks(ctx, request)
165✔
845
}
165✔
846

847
func (m *executionManagerImpl) CompleteCrossClusterTask(
848
        ctx context.Context,
849
        request *CompleteCrossClusterTaskRequest,
850
) error {
×
851
        return m.persistence.CompleteCrossClusterTask(ctx, request)
×
852
}
×
853

854
func (m *executionManagerImpl) RangeCompleteCrossClusterTask(
855
        ctx context.Context,
856
        request *RangeCompleteCrossClusterTaskRequest,
857
) (*RangeCompleteCrossClusterTaskResponse, error) {
141✔
858
        return m.persistence.RangeCompleteCrossClusterTask(ctx, request)
141✔
859
}
141✔
860

861
// Replication task related methods
862
func (m *executionManagerImpl) GetReplicationTasks(
863
        ctx context.Context,
864
        request *GetReplicationTasksRequest,
865
) (*GetReplicationTasksResponse, error) {
120✔
866
        resp, err := m.persistence.GetReplicationTasks(ctx, request)
120✔
867
        if err != nil {
120✔
868
                return nil, err
×
869
        }
×
870

871
        return &GetReplicationTasksResponse{
120✔
872
                Tasks:         m.fromInternalReplicationTaskInfos(resp.Tasks),
120✔
873
                NextPageToken: resp.NextPageToken,
120✔
874
        }, nil
120✔
875
}
876

877
func (m *executionManagerImpl) CompleteReplicationTask(
878
        ctx context.Context,
879
        request *CompleteReplicationTaskRequest,
880
) error {
×
881
        return m.persistence.CompleteReplicationTask(ctx, request)
×
882
}
×
883

884
func (m *executionManagerImpl) RangeCompleteReplicationTask(
885
        ctx context.Context,
886
        request *RangeCompleteReplicationTaskRequest,
887
) (*RangeCompleteReplicationTaskResponse, error) {
119✔
888
        return m.persistence.RangeCompleteReplicationTask(ctx, request)
119✔
889
}
119✔
890

891
func (m *executionManagerImpl) PutReplicationTaskToDLQ(
892
        ctx context.Context,
893
        request *PutReplicationTaskToDLQRequest,
894
) error {
3✔
895
        internalRequest := &InternalPutReplicationTaskToDLQRequest{
3✔
896
                SourceClusterName: request.SourceClusterName,
3✔
897
                TaskInfo:          m.toInternalReplicationTaskInfo(request.TaskInfo),
3✔
898
        }
3✔
899
        return m.persistence.PutReplicationTaskToDLQ(ctx, internalRequest)
3✔
900
}
3✔
901

902
func (m *executionManagerImpl) GetReplicationTasksFromDLQ(
903
        ctx context.Context,
904
        request *GetReplicationTasksFromDLQRequest,
905
) (*GetReplicationTasksFromDLQResponse, error) {
3✔
906
        resp, err := m.persistence.GetReplicationTasksFromDLQ(ctx, request)
3✔
907
        if err != nil {
3✔
908
                return nil, err
×
909
        }
×
910
        return &GetReplicationTasksFromDLQResponse{
3✔
911
                Tasks:         m.fromInternalReplicationTaskInfos(resp.Tasks),
3✔
912
                NextPageToken: resp.NextPageToken,
3✔
913
        }, nil
3✔
914
}
915

916
func (m *executionManagerImpl) GetReplicationDLQSize(
917
        ctx context.Context,
918
        request *GetReplicationDLQSizeRequest,
919
) (*GetReplicationDLQSizeResponse, error) {
12✔
920
        return m.persistence.GetReplicationDLQSize(ctx, request)
12✔
921
}
12✔
922

923
func (m *executionManagerImpl) DeleteReplicationTaskFromDLQ(
924
        ctx context.Context,
925
        request *DeleteReplicationTaskFromDLQRequest,
926
) error {
×
927
        return m.persistence.DeleteReplicationTaskFromDLQ(ctx, request)
×
928
}
×
929

930
func (m *executionManagerImpl) RangeDeleteReplicationTaskFromDLQ(
931
        ctx context.Context,
932
        request *RangeDeleteReplicationTaskFromDLQRequest,
933
) (*RangeDeleteReplicationTaskFromDLQResponse, error) {
×
934
        return m.persistence.RangeDeleteReplicationTaskFromDLQ(ctx, request)
×
935
}
×
936

937
func (m *executionManagerImpl) CreateFailoverMarkerTasks(
938
        ctx context.Context,
939
        request *CreateFailoverMarkersRequest,
940
) error {
×
941
        return m.persistence.CreateFailoverMarkerTasks(ctx, request)
×
942
}
×
943

944
// Timer related methods.
945
func (m *executionManagerImpl) GetTimerIndexTasks(
946
        ctx context.Context,
947
        request *GetTimerIndexTasksRequest,
948
) (*GetTimerIndexTasksResponse, error) {
3,238✔
949
        return m.persistence.GetTimerIndexTasks(ctx, request)
3,238✔
950
}
3,238✔
951

952
func (m *executionManagerImpl) CompleteTimerTask(
953
        ctx context.Context,
954
        request *CompleteTimerTaskRequest,
955
) error {
×
956
        return m.persistence.CompleteTimerTask(ctx, request)
×
957
}
×
958

959
func (m *executionManagerImpl) RangeCompleteTimerTask(
960
        ctx context.Context,
961
        request *RangeCompleteTimerTaskRequest,
962
) (*RangeCompleteTimerTaskResponse, error) {
36✔
963
        return m.persistence.RangeCompleteTimerTask(ctx, request)
36✔
964
}
36✔
965

966
func (m *executionManagerImpl) Close() {
51✔
967
        m.persistence.Close()
51✔
968
}
51✔
969

970
func (m *executionManagerImpl) fromInternalReplicationTaskInfos(internalInfos []*InternalReplicationTaskInfo) []*ReplicationTaskInfo {
123✔
971
        if internalInfos == nil {
246✔
972
                return nil
123✔
973
        }
123✔
974
        infos := make([]*ReplicationTaskInfo, len(internalInfos))
3✔
975
        for i := 0; i < len(internalInfos); i++ {
6✔
976
                infos[i] = m.fromInternalReplicationTaskInfo(internalInfos[i])
3✔
977
        }
3✔
978
        return infos
3✔
979
}
980

981
func (m *executionManagerImpl) fromInternalReplicationTaskInfo(internalInfo *InternalReplicationTaskInfo) *ReplicationTaskInfo {
3✔
982
        if internalInfo == nil {
3✔
983
                return nil
×
984
        }
×
985
        return &ReplicationTaskInfo{
3✔
986
                DomainID:          internalInfo.DomainID,
3✔
987
                WorkflowID:        internalInfo.WorkflowID,
3✔
988
                RunID:             internalInfo.RunID,
3✔
989
                TaskID:            internalInfo.TaskID,
3✔
990
                TaskType:          internalInfo.TaskType,
3✔
991
                FirstEventID:      internalInfo.FirstEventID,
3✔
992
                NextEventID:       internalInfo.NextEventID,
3✔
993
                Version:           internalInfo.Version,
3✔
994
                ScheduledID:       internalInfo.ScheduledID,
3✔
995
                BranchToken:       internalInfo.BranchToken,
3✔
996
                NewRunBranchToken: internalInfo.NewRunBranchToken,
3✔
997
                CreationTime:      internalInfo.CreationTime.UnixNano(),
3✔
998
        }
3✔
999
}
1000

1001
func (m *executionManagerImpl) toInternalReplicationTaskInfo(info *ReplicationTaskInfo) *InternalReplicationTaskInfo {
3✔
1002
        if info == nil {
3✔
1003
                return nil
×
1004
        }
×
1005
        return &InternalReplicationTaskInfo{
3✔
1006
                DomainID:          info.DomainID,
3✔
1007
                WorkflowID:        info.WorkflowID,
3✔
1008
                RunID:             info.RunID,
3✔
1009
                TaskID:            info.TaskID,
3✔
1010
                TaskType:          info.TaskType,
3✔
1011
                FirstEventID:      info.FirstEventID,
3✔
1012
                NextEventID:       info.NextEventID,
3✔
1013
                Version:           info.Version,
3✔
1014
                ScheduledID:       info.ScheduledID,
3✔
1015
                BranchToken:       info.BranchToken,
3✔
1016
                NewRunBranchToken: info.NewRunBranchToken,
3✔
1017
                CreationTime:      time.Unix(0, info.CreationTime),
3✔
1018
        }
3✔
1019
}
1020

1021
func getStartVersion(
1022
        versionHistories *VersionHistories,
1023
) (int64, error) {
5,086✔
1024

5,086✔
1025
        if versionHistories == nil {
5,086✔
1026
                return common.EmptyVersion, nil
×
1027
        }
×
1028

1029
        versionHistory, err := versionHistories.GetCurrentVersionHistory()
5,086✔
1030
        if err != nil {
5,086✔
1031
                return 0, err
×
1032
        }
×
1033
        versionHistoryItem, err := versionHistory.GetFirstItem()
5,086✔
1034
        if err != nil {
5,086✔
1035
                return 0, err
×
1036
        }
×
1037
        return versionHistoryItem.Version, nil
5,086✔
1038
}
1039

1040
func getLastWriteVersion(
1041
        versionHistories *VersionHistories,
1042
) (int64, error) {
5,086✔
1043

5,086✔
1044
        if versionHistories == nil {
5,086✔
1045
                return common.EmptyVersion, nil
×
1046
        }
×
1047

1048
        versionHistory, err := versionHistories.GetCurrentVersionHistory()
5,086✔
1049
        if err != nil {
5,086✔
1050
                return 0, err
×
1051
        }
×
1052
        versionHistoryItem, err := versionHistory.GetLastItem()
5,086✔
1053
        if err != nil {
5,086✔
1054
                return 0, err
×
1055
        }
×
1056
        return versionHistoryItem.Version, nil
5,086✔
1057
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc