• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018965f0-fdaa-4535-a39b-924d1b4fe28b

17 Jul 2023 10:20PM UTC coverage: 57.058% (-0.09%) from 57.146%
018965f0-fdaa-4535-a39b-924d1b4fe28b

push

buildkite

web-flow
[dynamic config] add Filters method to dynamic config Key (#5346)

What changed?

Add Filters method to Key interface
Add implementations on most keys by parsing the comments on keys (assuming they are correct)
Why?

This is needed to know what dynamic config is domain specific. And this could possible simplify the collection struct by consolidating all GetPropertyFilterBy** methods.

How did you test it?

Potential risks

no risk since the method will be read only in non-critical path

Release notes

Documentation Changes

21 of 21 new or added lines in 1 file covered. (100.0%)

87154 of 152745 relevant lines covered (57.06%)

2500.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.94
/common/persistence/executionManager.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package persistence
23

24
import (
25
        "context"
26
        "time"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/log"
30
        "github.com/uber/cadence/common/types"
31
)
32

33
type (
34
        // executionManagerImpl implements ExecutionManager based on ExecutionStore, statsComputer and PayloadSerializer
35
        executionManagerImpl struct {
36
                serializer    PayloadSerializer
37
                persistence   ExecutionStore
38
                statsComputer statsComputer
39
                logger        log.Logger
40
        }
41
)
42

43
var _ ExecutionManager = (*executionManagerImpl)(nil)
44

45
// NewExecutionManagerImpl returns new ExecutionManager
46
func NewExecutionManagerImpl(
47
        persistence ExecutionStore,
48
        logger log.Logger,
49
) ExecutionManager {
63✔
50

63✔
51
        return &executionManagerImpl{
63✔
52
                serializer:    NewPayloadSerializer(),
63✔
53
                persistence:   persistence,
63✔
54
                statsComputer: statsComputer{},
63✔
55
                logger:        logger,
63✔
56
        }
63✔
57
}
63✔
58

59
func (m *executionManagerImpl) GetName() string {
×
60
        return m.persistence.GetName()
×
61
}
×
62

63
func (m *executionManagerImpl) GetShardID() int {
11,539✔
64
        return m.persistence.GetShardID()
11,539✔
65
}
11,539✔
66

67
// The below three APIs are related to serialization/deserialization
68
func (m *executionManagerImpl) GetWorkflowExecution(
69
        ctx context.Context,
70
        request *GetWorkflowExecutionRequest,
71
) (*GetWorkflowExecutionResponse, error) {
1,135✔
72

1,135✔
73
        internalRequest := &InternalGetWorkflowExecutionRequest{
1,135✔
74
                DomainID:  request.DomainID,
1,135✔
75
                Execution: request.Execution,
1,135✔
76
        }
1,135✔
77
        response, err := m.persistence.GetWorkflowExecution(ctx, internalRequest)
1,135✔
78
        if err != nil {
1,532✔
79
                return nil, err
397✔
80
        }
397✔
81
        newResponse := &GetWorkflowExecutionResponse{
741✔
82
                State: &WorkflowMutableState{
741✔
83
                        TimerInfos:         response.State.TimerInfos,
741✔
84
                        RequestCancelInfos: response.State.RequestCancelInfos,
741✔
85
                        SignalInfos:        response.State.SignalInfos,
741✔
86
                        SignalRequestedIDs: response.State.SignalRequestedIDs,
741✔
87
                        ReplicationState:   response.State.ReplicationState, // TODO: remove this after all 2DC workflows complete
741✔
88
                        Checksum:           response.State.Checksum,
741✔
89
                },
741✔
90
        }
741✔
91

741✔
92
        newResponse.State.ActivityInfos, err = m.DeserializeActivityInfos(response.State.ActivityInfos)
741✔
93
        if err != nil {
741✔
94
                return nil, err
×
95
        }
×
96
        newResponse.State.ChildExecutionInfos, err = m.DeserializeChildExecutionInfos(response.State.ChildExecutionInfos)
741✔
97
        if err != nil {
741✔
98
                return nil, err
×
99
        }
×
100
        newResponse.State.BufferedEvents, err = m.DeserializeBufferedEvents(response.State.BufferedEvents)
741✔
101
        if err != nil {
741✔
102
                return nil, err
×
103
        }
×
104
        newResponse.State.ExecutionInfo, newResponse.State.ExecutionStats, err = m.DeserializeExecutionInfo(response.State.ExecutionInfo)
741✔
105
        if err != nil {
741✔
106
                return nil, err
×
107
        }
×
108
        versionHistories, err := m.DeserializeVersionHistories(response.State.VersionHistories)
741✔
109
        if err != nil {
741✔
110
                return nil, err
×
111
        }
×
112
        newResponse.State.VersionHistories = versionHistories
741✔
113
        newResponse.MutableStateStats = m.statsComputer.computeMutableStateStats(response)
741✔
114

741✔
115
        return newResponse, nil
741✔
116
}
117

118
func (m *executionManagerImpl) DeserializeExecutionInfo(
119
        info *InternalWorkflowExecutionInfo,
120
) (*WorkflowExecutionInfo, *ExecutionStats, error) {
741✔
121

741✔
122
        completionEvent, err := m.serializer.DeserializeEvent(info.CompletionEvent)
741✔
123
        if err != nil {
741✔
124
                return nil, nil, err
×
125
        }
×
126

127
        autoResetPoints, err := m.serializer.DeserializeResetPoints(info.AutoResetPoints)
741✔
128
        if err != nil {
741✔
129
                return nil, nil, err
×
130
        }
×
131

132
        newInfo := &WorkflowExecutionInfo{
741✔
133
                CompletionEvent: completionEvent,
741✔
134

741✔
135
                DomainID:                           info.DomainID,
741✔
136
                WorkflowID:                         info.WorkflowID,
741✔
137
                RunID:                              info.RunID,
741✔
138
                FirstExecutionRunID:                info.FirstExecutionRunID,
741✔
139
                ParentDomainID:                     info.ParentDomainID,
741✔
140
                ParentWorkflowID:                   info.ParentWorkflowID,
741✔
141
                ParentRunID:                        info.ParentRunID,
741✔
142
                InitiatedID:                        info.InitiatedID,
741✔
143
                CompletionEventBatchID:             info.CompletionEventBatchID,
741✔
144
                TaskList:                           info.TaskList,
741✔
145
                IsCron:                             len(info.CronSchedule) > 0,
741✔
146
                WorkflowTypeName:                   info.WorkflowTypeName,
741✔
147
                WorkflowTimeout:                    int32(info.WorkflowTimeout.Seconds()),
741✔
148
                DecisionStartToCloseTimeout:        int32(info.DecisionStartToCloseTimeout.Seconds()),
741✔
149
                ExecutionContext:                   info.ExecutionContext,
741✔
150
                State:                              info.State,
741✔
151
                CloseStatus:                        info.CloseStatus,
741✔
152
                LastFirstEventID:                   info.LastFirstEventID,
741✔
153
                LastEventTaskID:                    info.LastEventTaskID,
741✔
154
                NextEventID:                        info.NextEventID,
741✔
155
                LastProcessedEvent:                 info.LastProcessedEvent,
741✔
156
                StartTimestamp:                     info.StartTimestamp,
741✔
157
                LastUpdatedTimestamp:               info.LastUpdatedTimestamp,
741✔
158
                CreateRequestID:                    info.CreateRequestID,
741✔
159
                SignalCount:                        info.SignalCount,
741✔
160
                DecisionVersion:                    info.DecisionVersion,
741✔
161
                DecisionScheduleID:                 info.DecisionScheduleID,
741✔
162
                DecisionStartedID:                  info.DecisionStartedID,
741✔
163
                DecisionRequestID:                  info.DecisionRequestID,
741✔
164
                DecisionTimeout:                    int32(info.DecisionTimeout.Seconds()),
741✔
165
                DecisionAttempt:                    info.DecisionAttempt,
741✔
166
                DecisionStartedTimestamp:           info.DecisionStartedTimestamp.UnixNano(),
741✔
167
                DecisionScheduledTimestamp:         info.DecisionScheduledTimestamp.UnixNano(),
741✔
168
                DecisionOriginalScheduledTimestamp: info.DecisionOriginalScheduledTimestamp.UnixNano(),
741✔
169
                CancelRequested:                    info.CancelRequested,
741✔
170
                CancelRequestID:                    info.CancelRequestID,
741✔
171
                StickyTaskList:                     info.StickyTaskList,
741✔
172
                StickyScheduleToStartTimeout:       int32(info.StickyScheduleToStartTimeout.Seconds()),
741✔
173
                ClientLibraryVersion:               info.ClientLibraryVersion,
741✔
174
                ClientFeatureVersion:               info.ClientFeatureVersion,
741✔
175
                ClientImpl:                         info.ClientImpl,
741✔
176
                Attempt:                            info.Attempt,
741✔
177
                HasRetryPolicy:                     info.HasRetryPolicy,
741✔
178
                InitialInterval:                    int32(info.InitialInterval.Seconds()),
741✔
179
                BackoffCoefficient:                 info.BackoffCoefficient,
741✔
180
                MaximumInterval:                    int32(info.MaximumInterval.Seconds()),
741✔
181
                ExpirationTime:                     info.ExpirationTime,
741✔
182
                MaximumAttempts:                    info.MaximumAttempts,
741✔
183
                NonRetriableErrors:                 info.NonRetriableErrors,
741✔
184
                BranchToken:                        info.BranchToken,
741✔
185
                CronSchedule:                       info.CronSchedule,
741✔
186
                ExpirationSeconds:                  int32(info.ExpirationInterval.Seconds()),
741✔
187
                AutoResetPoints:                    autoResetPoints,
741✔
188
                SearchAttributes:                   info.SearchAttributes,
741✔
189
                Memo:                               info.Memo,
741✔
190
                PartitionConfig:                    info.PartitionConfig,
741✔
191
        }
741✔
192
        newStats := &ExecutionStats{
741✔
193
                HistorySize: info.HistorySize,
741✔
194
        }
741✔
195
        return newInfo, newStats, nil
741✔
196
}
197

198
func (m *executionManagerImpl) DeserializeBufferedEvents(
199
        blobs []*DataBlob,
200
) ([]*types.HistoryEvent, error) {
741✔
201

741✔
202
        events := make([]*types.HistoryEvent, 0)
741✔
203
        for _, b := range blobs {
744✔
204
                history, err := m.serializer.DeserializeBatchEvents(b)
3✔
205
                if err != nil {
3✔
206
                        return nil, err
×
207
                }
×
208
                events = append(events, history...)
3✔
209
        }
210
        return events, nil
741✔
211
}
212

213
func (m *executionManagerImpl) DeserializeChildExecutionInfos(
214
        infos map[int64]*InternalChildExecutionInfo,
215
) (map[int64]*ChildExecutionInfo, error) {
741✔
216

741✔
217
        newInfos := make(map[int64]*ChildExecutionInfo)
741✔
218
        for k, v := range infos {
745✔
219
                initiatedEvent, err := m.serializer.DeserializeEvent(v.InitiatedEvent)
4✔
220
                if err != nil {
4✔
221
                        return nil, err
×
222
                }
×
223
                startedEvent, err := m.serializer.DeserializeEvent(v.StartedEvent)
4✔
224
                if err != nil {
4✔
225
                        return nil, err
×
226
                }
×
227
                c := &ChildExecutionInfo{
4✔
228
                        InitiatedEvent: initiatedEvent,
4✔
229
                        StartedEvent:   startedEvent,
4✔
230

4✔
231
                        Version:               v.Version,
4✔
232
                        InitiatedID:           v.InitiatedID,
4✔
233
                        InitiatedEventBatchID: v.InitiatedEventBatchID,
4✔
234
                        StartedID:             v.StartedID,
4✔
235
                        StartedWorkflowID:     v.StartedWorkflowID,
4✔
236
                        StartedRunID:          v.StartedRunID,
4✔
237
                        CreateRequestID:       v.CreateRequestID,
4✔
238
                        DomainID:              v.DomainID,
4✔
239
                        DomainNameDEPRECATED:  v.DomainNameDEPRECATED,
4✔
240
                        WorkflowTypeName:      v.WorkflowTypeName,
4✔
241
                        ParentClosePolicy:     v.ParentClosePolicy,
4✔
242
                }
4✔
243

4✔
244
                // Needed for backward compatibility reason.
4✔
245
                // ChildWorkflowExecutionStartedEvent was only used by transfer queue processing of StartChildWorkflow.
4✔
246
                // Updated the code to instead directly read WorkflowId and RunId from mutable state
4✔
247
                // Existing mutable state won't have those values set so instead use started event to set StartedWorkflowID and
4✔
248
                // StartedRunID on the mutable state before passing it to application
4✔
249
                if startedEvent != nil && startedEvent.ChildWorkflowExecutionStartedEventAttributes != nil &&
4✔
250
                        startedEvent.ChildWorkflowExecutionStartedEventAttributes.WorkflowExecution != nil {
4✔
251
                        startedExecution := startedEvent.ChildWorkflowExecutionStartedEventAttributes.WorkflowExecution
×
252
                        c.StartedWorkflowID = startedExecution.GetWorkflowID()
×
253
                        c.StartedRunID = startedExecution.GetRunID()
×
254
                }
×
255
                newInfos[k] = c
4✔
256
        }
257
        return newInfos, nil
741✔
258
}
259

260
func (m *executionManagerImpl) DeserializeActivityInfos(
261
        infos map[int64]*InternalActivityInfo,
262
) (map[int64]*ActivityInfo, error) {
741✔
263

741✔
264
        newInfos := make(map[int64]*ActivityInfo)
741✔
265
        for k, v := range infos {
848✔
266
                scheduledEvent, err := m.serializer.DeserializeEvent(v.ScheduledEvent)
107✔
267
                if err != nil {
107✔
268
                        return nil, err
×
269
                }
×
270
                startedEvent, err := m.serializer.DeserializeEvent(v.StartedEvent)
107✔
271
                if err != nil {
107✔
272
                        return nil, err
×
273
                }
×
274
                a := &ActivityInfo{
107✔
275
                        ScheduledEvent: scheduledEvent,
107✔
276
                        StartedEvent:   startedEvent,
107✔
277

107✔
278
                        Version:                                 v.Version,
107✔
279
                        ScheduleID:                              v.ScheduleID,
107✔
280
                        ScheduledEventBatchID:                   v.ScheduledEventBatchID,
107✔
281
                        ScheduledTime:                           v.ScheduledTime,
107✔
282
                        StartedID:                               v.StartedID,
107✔
283
                        StartedTime:                             v.StartedTime,
107✔
284
                        ActivityID:                              v.ActivityID,
107✔
285
                        RequestID:                               v.RequestID,
107✔
286
                        Details:                                 v.Details,
107✔
287
                        ScheduleToStartTimeout:                  int32(v.ScheduleToStartTimeout.Seconds()),
107✔
288
                        ScheduleToCloseTimeout:                  int32(v.ScheduleToCloseTimeout.Seconds()),
107✔
289
                        StartToCloseTimeout:                     int32(v.StartToCloseTimeout.Seconds()),
107✔
290
                        HeartbeatTimeout:                        int32(v.HeartbeatTimeout.Seconds()),
107✔
291
                        CancelRequested:                         v.CancelRequested,
107✔
292
                        CancelRequestID:                         v.CancelRequestID,
107✔
293
                        LastHeartBeatUpdatedTime:                v.LastHeartBeatUpdatedTime,
107✔
294
                        TimerTaskStatus:                         v.TimerTaskStatus,
107✔
295
                        Attempt:                                 v.Attempt,
107✔
296
                        DomainID:                                v.DomainID,
107✔
297
                        StartedIdentity:                         v.StartedIdentity,
107✔
298
                        TaskList:                                v.TaskList,
107✔
299
                        HasRetryPolicy:                          v.HasRetryPolicy,
107✔
300
                        InitialInterval:                         int32(v.InitialInterval.Seconds()),
107✔
301
                        BackoffCoefficient:                      v.BackoffCoefficient,
107✔
302
                        MaximumInterval:                         int32(v.MaximumInterval.Seconds()),
107✔
303
                        ExpirationTime:                          v.ExpirationTime,
107✔
304
                        MaximumAttempts:                         v.MaximumAttempts,
107✔
305
                        NonRetriableErrors:                      v.NonRetriableErrors,
107✔
306
                        LastFailureReason:                       v.LastFailureReason,
107✔
307
                        LastWorkerIdentity:                      v.LastWorkerIdentity,
107✔
308
                        LastFailureDetails:                      v.LastFailureDetails,
107✔
309
                        LastHeartbeatTimeoutVisibilityInSeconds: v.LastHeartbeatTimeoutVisibilityInSeconds,
107✔
310
                }
107✔
311
                newInfos[k] = a
107✔
312
        }
313
        return newInfos, nil
741✔
314
}
315

316
func (m *executionManagerImpl) UpdateWorkflowExecution(
317
        ctx context.Context,
318
        request *UpdateWorkflowExecutionRequest,
319
) (*UpdateWorkflowExecutionResponse, error) {
4,392✔
320

4,392✔
321
        serializedWorkflowMutation, err := m.SerializeWorkflowMutation(&request.UpdateWorkflowMutation, request.Encoding)
4,392✔
322
        if err != nil {
4,392✔
323
                return nil, err
×
324
        }
×
325
        var serializedNewWorkflowSnapshot *InternalWorkflowSnapshot
4,392✔
326
        if request.NewWorkflowSnapshot != nil {
4,569✔
327
                serializedNewWorkflowSnapshot, err = m.SerializeWorkflowSnapshot(request.NewWorkflowSnapshot, request.Encoding)
177✔
328
                if err != nil {
177✔
329
                        return nil, err
×
330
                }
×
331
        }
332
        newRequest := &InternalUpdateWorkflowExecutionRequest{
4,392✔
333
                RangeID: request.RangeID,
4,392✔
334

4,392✔
335
                Mode: request.Mode,
4,392✔
336

4,392✔
337
                UpdateWorkflowMutation: *serializedWorkflowMutation,
4,392✔
338
                NewWorkflowSnapshot:    serializedNewWorkflowSnapshot,
4,392✔
339
        }
4,392✔
340
        msuss := m.statsComputer.computeMutableStateUpdateStats(newRequest)
4,392✔
341
        err = m.persistence.UpdateWorkflowExecution(ctx, newRequest)
4,392✔
342
        if err != nil {
4,392✔
343
                return nil, err
×
344
        }
×
345
        return &UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: msuss}, nil
4,392✔
346
}
347

348
func (m *executionManagerImpl) SerializeUpsertChildExecutionInfos(
349
        infos []*ChildExecutionInfo,
350
        encoding common.EncodingType,
351
) ([]*InternalChildExecutionInfo, error) {
5,067✔
352

5,067✔
353
        newInfos := make([]*InternalChildExecutionInfo, 0)
5,067✔
354
        for _, v := range infos {
5,106✔
355
                initiatedEvent, err := m.serializer.SerializeEvent(v.InitiatedEvent, encoding)
39✔
356
                if err != nil {
39✔
357
                        return nil, err
×
358
                }
×
359
                startedEvent, err := m.serializer.SerializeEvent(v.StartedEvent, encoding)
39✔
360
                if err != nil {
39✔
361
                        return nil, err
×
362
                }
×
363
                i := &InternalChildExecutionInfo{
39✔
364
                        InitiatedEvent: initiatedEvent,
39✔
365
                        StartedEvent:   startedEvent,
39✔
366

39✔
367
                        Version:               v.Version,
39✔
368
                        InitiatedID:           v.InitiatedID,
39✔
369
                        InitiatedEventBatchID: v.InitiatedEventBatchID,
39✔
370
                        CreateRequestID:       v.CreateRequestID,
39✔
371
                        StartedID:             v.StartedID,
39✔
372
                        StartedWorkflowID:     v.StartedWorkflowID,
39✔
373
                        StartedRunID:          v.StartedRunID,
39✔
374
                        DomainID:              v.DomainID,
39✔
375
                        DomainNameDEPRECATED:  v.DomainNameDEPRECATED,
39✔
376
                        WorkflowTypeName:      v.WorkflowTypeName,
39✔
377
                        ParentClosePolicy:     v.ParentClosePolicy,
39✔
378
                }
39✔
379
                newInfos = append(newInfos, i)
39✔
380
        }
381
        return newInfos, nil
5,067✔
382
}
383

384
func (m *executionManagerImpl) SerializeUpsertActivityInfos(
385
        infos []*ActivityInfo,
386
        encoding common.EncodingType,
387
) ([]*InternalActivityInfo, error) {
5,067✔
388

5,067✔
389
        newInfos := make([]*InternalActivityInfo, 0)
5,067✔
390
        for _, v := range infos {
6,310✔
391
                scheduledEvent, err := m.serializer.SerializeEvent(v.ScheduledEvent, encoding)
1,243✔
392
                if err != nil {
1,243✔
393
                        return nil, err
×
394
                }
×
395
                startedEvent, err := m.serializer.SerializeEvent(v.StartedEvent, encoding)
1,243✔
396
                if err != nil {
1,243✔
397
                        return nil, err
×
398
                }
×
399
                i := &InternalActivityInfo{
1,243✔
400
                        Version:                                 v.Version,
1,243✔
401
                        ScheduleID:                              v.ScheduleID,
1,243✔
402
                        ScheduledEventBatchID:                   v.ScheduledEventBatchID,
1,243✔
403
                        ScheduledEvent:                          scheduledEvent,
1,243✔
404
                        ScheduledTime:                           v.ScheduledTime,
1,243✔
405
                        StartedID:                               v.StartedID,
1,243✔
406
                        StartedEvent:                            startedEvent,
1,243✔
407
                        StartedTime:                             v.StartedTime,
1,243✔
408
                        ActivityID:                              v.ActivityID,
1,243✔
409
                        RequestID:                               v.RequestID,
1,243✔
410
                        Details:                                 v.Details,
1,243✔
411
                        ScheduleToStartTimeout:                  common.SecondsToDuration(int64(v.ScheduleToStartTimeout)),
1,243✔
412
                        ScheduleToCloseTimeout:                  common.SecondsToDuration(int64(v.ScheduleToCloseTimeout)),
1,243✔
413
                        StartToCloseTimeout:                     common.SecondsToDuration(int64(v.StartToCloseTimeout)),
1,243✔
414
                        HeartbeatTimeout:                        common.SecondsToDuration(int64(v.HeartbeatTimeout)),
1,243✔
415
                        CancelRequested:                         v.CancelRequested,
1,243✔
416
                        CancelRequestID:                         v.CancelRequestID,
1,243✔
417
                        LastHeartBeatUpdatedTime:                v.LastHeartBeatUpdatedTime,
1,243✔
418
                        TimerTaskStatus:                         v.TimerTaskStatus,
1,243✔
419
                        Attempt:                                 v.Attempt,
1,243✔
420
                        DomainID:                                v.DomainID,
1,243✔
421
                        StartedIdentity:                         v.StartedIdentity,
1,243✔
422
                        TaskList:                                v.TaskList,
1,243✔
423
                        HasRetryPolicy:                          v.HasRetryPolicy,
1,243✔
424
                        InitialInterval:                         common.SecondsToDuration(int64(v.InitialInterval)),
1,243✔
425
                        BackoffCoefficient:                      v.BackoffCoefficient,
1,243✔
426
                        MaximumInterval:                         common.SecondsToDuration(int64(v.MaximumInterval)),
1,243✔
427
                        ExpirationTime:                          v.ExpirationTime,
1,243✔
428
                        MaximumAttempts:                         v.MaximumAttempts,
1,243✔
429
                        NonRetriableErrors:                      v.NonRetriableErrors,
1,243✔
430
                        LastFailureReason:                       v.LastFailureReason,
1,243✔
431
                        LastWorkerIdentity:                      v.LastWorkerIdentity,
1,243✔
432
                        LastFailureDetails:                      v.LastFailureDetails,
1,243✔
433
                        LastHeartbeatTimeoutVisibilityInSeconds: v.LastHeartbeatTimeoutVisibilityInSeconds,
1,243✔
434
                }
1,243✔
435
                newInfos = append(newInfos, i)
1,243✔
436
        }
437
        return newInfos, nil
5,067✔
438
}
439

440
func (m *executionManagerImpl) SerializeExecutionInfo(
441
        info *WorkflowExecutionInfo,
442
        stats *ExecutionStats,
443
        encoding common.EncodingType,
444
) (*InternalWorkflowExecutionInfo, error) {
5,067✔
445

5,067✔
446
        if info == nil {
5,067✔
447
                return &InternalWorkflowExecutionInfo{}, nil
×
448
        }
×
449
        completionEvent, err := m.serializer.SerializeEvent(info.CompletionEvent, encoding)
5,067✔
450
        if err != nil {
5,067✔
451
                return nil, err
×
452
        }
×
453

454
        resetPoints, err := m.serializer.SerializeResetPoints(info.AutoResetPoints, encoding)
5,067✔
455
        if err != nil {
5,067✔
456
                return nil, err
×
457
        }
×
458

459
        return &InternalWorkflowExecutionInfo{
5,067✔
460
                DomainID:                           info.DomainID,
5,067✔
461
                WorkflowID:                         info.WorkflowID,
5,067✔
462
                RunID:                              info.RunID,
5,067✔
463
                FirstExecutionRunID:                info.FirstExecutionRunID,
5,067✔
464
                ParentDomainID:                     info.ParentDomainID,
5,067✔
465
                ParentWorkflowID:                   info.ParentWorkflowID,
5,067✔
466
                ParentRunID:                        info.ParentRunID,
5,067✔
467
                InitiatedID:                        info.InitiatedID,
5,067✔
468
                CompletionEventBatchID:             info.CompletionEventBatchID,
5,067✔
469
                CompletionEvent:                    completionEvent,
5,067✔
470
                TaskList:                           info.TaskList,
5,067✔
471
                WorkflowTypeName:                   info.WorkflowTypeName,
5,067✔
472
                WorkflowTimeout:                    common.SecondsToDuration(int64(info.WorkflowTimeout)),
5,067✔
473
                DecisionStartToCloseTimeout:        common.SecondsToDuration(int64(info.DecisionStartToCloseTimeout)),
5,067✔
474
                ExecutionContext:                   info.ExecutionContext,
5,067✔
475
                State:                              info.State,
5,067✔
476
                CloseStatus:                        info.CloseStatus,
5,067✔
477
                LastFirstEventID:                   info.LastFirstEventID,
5,067✔
478
                LastEventTaskID:                    info.LastEventTaskID,
5,067✔
479
                NextEventID:                        info.NextEventID,
5,067✔
480
                LastProcessedEvent:                 info.LastProcessedEvent,
5,067✔
481
                StartTimestamp:                     info.StartTimestamp,
5,067✔
482
                LastUpdatedTimestamp:               info.LastUpdatedTimestamp,
5,067✔
483
                CreateRequestID:                    info.CreateRequestID,
5,067✔
484
                SignalCount:                        info.SignalCount,
5,067✔
485
                DecisionVersion:                    info.DecisionVersion,
5,067✔
486
                DecisionScheduleID:                 info.DecisionScheduleID,
5,067✔
487
                DecisionStartedID:                  info.DecisionStartedID,
5,067✔
488
                DecisionRequestID:                  info.DecisionRequestID,
5,067✔
489
                DecisionTimeout:                    common.SecondsToDuration(int64(info.DecisionTimeout)),
5,067✔
490
                DecisionAttempt:                    info.DecisionAttempt,
5,067✔
491
                DecisionStartedTimestamp:           time.Unix(0, info.DecisionStartedTimestamp),
5,067✔
492
                DecisionScheduledTimestamp:         time.Unix(0, info.DecisionScheduledTimestamp),
5,067✔
493
                DecisionOriginalScheduledTimestamp: time.Unix(0, info.DecisionOriginalScheduledTimestamp),
5,067✔
494
                CancelRequested:                    info.CancelRequested,
5,067✔
495
                CancelRequestID:                    info.CancelRequestID,
5,067✔
496
                StickyTaskList:                     info.StickyTaskList,
5,067✔
497
                StickyScheduleToStartTimeout:       common.SecondsToDuration(int64(info.StickyScheduleToStartTimeout)),
5,067✔
498
                ClientLibraryVersion:               info.ClientLibraryVersion,
5,067✔
499
                ClientFeatureVersion:               info.ClientFeatureVersion,
5,067✔
500
                ClientImpl:                         info.ClientImpl,
5,067✔
501
                AutoResetPoints:                    resetPoints,
5,067✔
502
                Attempt:                            info.Attempt,
5,067✔
503
                HasRetryPolicy:                     info.HasRetryPolicy,
5,067✔
504
                InitialInterval:                    common.SecondsToDuration(int64(info.InitialInterval)),
5,067✔
505
                BackoffCoefficient:                 info.BackoffCoefficient,
5,067✔
506
                MaximumInterval:                    common.SecondsToDuration(int64(info.MaximumInterval)),
5,067✔
507
                ExpirationTime:                     info.ExpirationTime,
5,067✔
508
                MaximumAttempts:                    info.MaximumAttempts,
5,067✔
509
                NonRetriableErrors:                 info.NonRetriableErrors,
5,067✔
510
                BranchToken:                        info.BranchToken,
5,067✔
511
                CronSchedule:                       info.CronSchedule,
5,067✔
512
                ExpirationInterval:                 common.SecondsToDuration(int64(info.ExpirationSeconds)),
5,067✔
513
                Memo:                               info.Memo,
5,067✔
514
                SearchAttributes:                   info.SearchAttributes,
5,067✔
515
                PartitionConfig:                    info.PartitionConfig,
5,067✔
516

5,067✔
517
                // attributes which are not related to mutable state
5,067✔
518
                HistorySize: stats.HistorySize,
5,067✔
519
        }, nil
5,067✔
520
}
521

522
func (m *executionManagerImpl) ConflictResolveWorkflowExecution(
523
        ctx context.Context,
524
        request *ConflictResolveWorkflowExecutionRequest,
525
) (*ConflictResolveWorkflowExecutionResponse, error) {
3✔
526

3✔
527
        serializedResetWorkflowSnapshot, err := m.SerializeWorkflowSnapshot(&request.ResetWorkflowSnapshot, request.Encoding)
3✔
528
        if err != nil {
3✔
529
                return nil, err
×
530
        }
×
531
        var serializedCurrentWorkflowMutation *InternalWorkflowMutation
3✔
532
        if request.CurrentWorkflowMutation != nil {
3✔
533
                serializedCurrentWorkflowMutation, err = m.SerializeWorkflowMutation(request.CurrentWorkflowMutation, request.Encoding)
×
534
                if err != nil {
×
535
                        return nil, err
×
536
                }
×
537
        }
538
        var serializedNewWorkflowMutation *InternalWorkflowSnapshot
3✔
539
        if request.NewWorkflowSnapshot != nil {
3✔
540
                serializedNewWorkflowMutation, err = m.SerializeWorkflowSnapshot(request.NewWorkflowSnapshot, request.Encoding)
×
541
                if err != nil {
×
542
                        return nil, err
×
543
                }
×
544
        }
545

546
        newRequest := &InternalConflictResolveWorkflowExecutionRequest{
3✔
547
                RangeID: request.RangeID,
3✔
548

3✔
549
                Mode: request.Mode,
3✔
550

3✔
551
                ResetWorkflowSnapshot: *serializedResetWorkflowSnapshot,
3✔
552

3✔
553
                NewWorkflowSnapshot: serializedNewWorkflowMutation,
3✔
554

3✔
555
                CurrentWorkflowMutation: serializedCurrentWorkflowMutation,
3✔
556
        }
3✔
557
        msuss := m.statsComputer.computeMutableStateConflictResolveStats(newRequest)
3✔
558
        err = m.persistence.ConflictResolveWorkflowExecution(ctx, newRequest)
3✔
559
        if err != nil {
3✔
560
                return nil, err
×
561
        }
×
562
        return &ConflictResolveWorkflowExecutionResponse{MutableStateUpdateSessionStats: msuss}, nil
3✔
563
}
564

565
func (m *executionManagerImpl) CreateWorkflowExecution(
566
        ctx context.Context,
567
        request *CreateWorkflowExecutionRequest,
568
) (*CreateWorkflowExecutionResponse, error) {
504✔
569

504✔
570
        encoding := common.EncodingTypeThriftRW
504✔
571

504✔
572
        serializedNewWorkflowSnapshot, err := m.SerializeWorkflowSnapshot(&request.NewWorkflowSnapshot, encoding)
504✔
573
        if err != nil {
504✔
574
                return nil, err
×
575
        }
×
576

577
        newRequest := &InternalCreateWorkflowExecutionRequest{
504✔
578
                RangeID: request.RangeID,
504✔
579

504✔
580
                Mode: request.Mode,
504✔
581

504✔
582
                PreviousRunID:            request.PreviousRunID,
504✔
583
                PreviousLastWriteVersion: request.PreviousLastWriteVersion,
504✔
584

504✔
585
                NewWorkflowSnapshot: *serializedNewWorkflowSnapshot,
504✔
586
        }
504✔
587

504✔
588
        msuss := m.statsComputer.computeMutableStateCreateStats(newRequest)
504✔
589
        _, err = m.persistence.CreateWorkflowExecution(ctx, newRequest)
504✔
590
        if err != nil {
549✔
591
                return nil, err
45✔
592
        }
45✔
593
        return &CreateWorkflowExecutionResponse{MutableStateUpdateSessionStats: msuss}, nil
462✔
594
}
595

596
func (m *executionManagerImpl) SerializeWorkflowMutation(
597
        input *WorkflowMutation,
598
        encoding common.EncodingType,
599
) (*InternalWorkflowMutation, error) {
4,392✔
600

4,392✔
601
        serializedExecutionInfo, err := m.SerializeExecutionInfo(
4,392✔
602
                input.ExecutionInfo,
4,392✔
603
                input.ExecutionStats,
4,392✔
604
                encoding,
4,392✔
605
        )
4,392✔
606
        if err != nil {
4,392✔
607
                return nil, err
×
608
        }
×
609
        serializedVersionHistories, err := m.SerializeVersionHistories(input.VersionHistories, encoding)
4,392✔
610
        if err != nil {
4,392✔
611
                return nil, err
×
612
        }
×
613
        serializedUpsertActivityInfos, err := m.SerializeUpsertActivityInfos(input.UpsertActivityInfos, encoding)
4,392✔
614
        if err != nil {
4,392✔
615
                return nil, err
×
616
        }
×
617
        serializedUpsertChildExecutionInfos, err := m.SerializeUpsertChildExecutionInfos(input.UpsertChildExecutionInfos, encoding)
4,392✔
618
        if err != nil {
4,392✔
619
                return nil, err
×
620
        }
×
621
        var serializedNewBufferedEvents *DataBlob
4,392✔
622
        if input.NewBufferedEvents != nil {
5,034✔
623
                serializedNewBufferedEvents, err = m.serializer.SerializeBatchEvents(input.NewBufferedEvents, encoding)
642✔
624
                if err != nil {
642✔
625
                        return nil, err
×
626
                }
×
627
        }
628

629
        startVersion, err := getStartVersion(input.VersionHistories)
4,392✔
630
        if err != nil {
4,392✔
631
                return nil, err
×
632
        }
×
633
        lastWriteVersion, err := getLastWriteVersion(input.VersionHistories)
4,392✔
634
        if err != nil {
4,392✔
635
                return nil, err
×
636
        }
×
637

638
        return &InternalWorkflowMutation{
4,392✔
639
                ExecutionInfo:    serializedExecutionInfo,
4,392✔
640
                VersionHistories: serializedVersionHistories,
4,392✔
641
                StartVersion:     startVersion,
4,392✔
642
                LastWriteVersion: lastWriteVersion,
4,392✔
643

4,392✔
644
                UpsertActivityInfos:       serializedUpsertActivityInfos,
4,392✔
645
                DeleteActivityInfos:       input.DeleteActivityInfos,
4,392✔
646
                UpsertTimerInfos:          input.UpsertTimerInfos,
4,392✔
647
                DeleteTimerInfos:          input.DeleteTimerInfos,
4,392✔
648
                UpsertChildExecutionInfos: serializedUpsertChildExecutionInfos,
4,392✔
649
                DeleteChildExecutionInfos: input.DeleteChildExecutionInfos,
4,392✔
650
                UpsertRequestCancelInfos:  input.UpsertRequestCancelInfos,
4,392✔
651
                DeleteRequestCancelInfos:  input.DeleteRequestCancelInfos,
4,392✔
652
                UpsertSignalInfos:         input.UpsertSignalInfos,
4,392✔
653
                DeleteSignalInfos:         input.DeleteSignalInfos,
4,392✔
654
                UpsertSignalRequestedIDs:  input.UpsertSignalRequestedIDs,
4,392✔
655
                DeleteSignalRequestedIDs:  input.DeleteSignalRequestedIDs,
4,392✔
656
                NewBufferedEvents:         serializedNewBufferedEvents,
4,392✔
657
                ClearBufferedEvents:       input.ClearBufferedEvents,
4,392✔
658

4,392✔
659
                TransferTasks:     input.TransferTasks,
4,392✔
660
                CrossClusterTasks: input.CrossClusterTasks,
4,392✔
661
                ReplicationTasks:  input.ReplicationTasks,
4,392✔
662
                TimerTasks:        input.TimerTasks,
4,392✔
663
                TTLInSeconds:      input.TTLInSeconds,
4,392✔
664
                Condition:         input.Condition,
4,392✔
665
                Checksum:          input.Checksum,
4,392✔
666
        }, nil
4,392✔
667
}
668

669
func (m *executionManagerImpl) SerializeWorkflowSnapshot(
670
        input *WorkflowSnapshot,
671
        encoding common.EncodingType,
672
) (*InternalWorkflowSnapshot, error) {
678✔
673

678✔
674
        serializedExecutionInfo, err := m.SerializeExecutionInfo(
678✔
675
                input.ExecutionInfo,
678✔
676
                input.ExecutionStats,
678✔
677
                encoding,
678✔
678
        )
678✔
679
        if err != nil {
678✔
680
                return nil, err
×
681
        }
×
682
        serializedVersionHistories, err := m.SerializeVersionHistories(input.VersionHistories, encoding)
678✔
683
        if err != nil {
678✔
684
                return nil, err
×
685
        }
×
686
        serializedActivityInfos, err := m.SerializeUpsertActivityInfos(input.ActivityInfos, encoding)
678✔
687
        if err != nil {
678✔
688
                return nil, err
×
689
        }
×
690
        serializedChildExecutionInfos, err := m.SerializeUpsertChildExecutionInfos(input.ChildExecutionInfos, encoding)
678✔
691
        if err != nil {
678✔
692
                return nil, err
×
693
        }
×
694

695
        startVersion, err := getStartVersion(input.VersionHistories)
678✔
696
        if err != nil {
678✔
697
                return nil, err
×
698
        }
×
699
        lastWriteVersion, err := getLastWriteVersion(input.VersionHistories)
678✔
700
        if err != nil {
678✔
701
                return nil, err
×
702
        }
×
703

704
        return &InternalWorkflowSnapshot{
678✔
705
                ExecutionInfo:    serializedExecutionInfo,
678✔
706
                VersionHistories: serializedVersionHistories,
678✔
707
                StartVersion:     startVersion,
678✔
708
                LastWriteVersion: lastWriteVersion,
678✔
709

678✔
710
                ActivityInfos:       serializedActivityInfos,
678✔
711
                TimerInfos:          input.TimerInfos,
678✔
712
                ChildExecutionInfos: serializedChildExecutionInfos,
678✔
713
                RequestCancelInfos:  input.RequestCancelInfos,
678✔
714
                SignalInfos:         input.SignalInfos,
678✔
715
                SignalRequestedIDs:  input.SignalRequestedIDs,
678✔
716

678✔
717
                TransferTasks:     input.TransferTasks,
678✔
718
                CrossClusterTasks: input.CrossClusterTasks,
678✔
719
                ReplicationTasks:  input.ReplicationTasks,
678✔
720
                TimerTasks:        input.TimerTasks,
678✔
721

678✔
722
                Condition: input.Condition,
678✔
723
                Checksum:  input.Checksum,
678✔
724
        }, nil
678✔
725
}
726

727
func (m *executionManagerImpl) SerializeVersionHistories(
728
        versionHistories *VersionHistories,
729
        encoding common.EncodingType,
730
) (*DataBlob, error) {
5,067✔
731

5,067✔
732
        if versionHistories == nil {
5,067✔
733
                return nil, nil
×
734
        }
×
735
        return m.serializer.SerializeVersionHistories(versionHistories.ToInternalType(), encoding)
5,067✔
736
}
737

738
func (m *executionManagerImpl) DeserializeVersionHistories(
739
        blob *DataBlob,
740
) (*VersionHistories, error) {
741✔
741

741✔
742
        if blob == nil {
741✔
743
                return nil, nil
×
744
        }
×
745
        versionHistories, err := m.serializer.DeserializeVersionHistories(blob)
741✔
746
        if err != nil {
741✔
747
                return nil, err
×
748
        }
×
749
        return NewVersionHistoriesFromInternalType(versionHistories), nil
741✔
750
}
751

752
func (m *executionManagerImpl) DeleteWorkflowExecution(
753
        ctx context.Context,
754
        request *DeleteWorkflowExecutionRequest,
755
) error {
54✔
756
        return m.persistence.DeleteWorkflowExecution(ctx, request)
54✔
757
}
54✔
758

759
func (m *executionManagerImpl) DeleteCurrentWorkflowExecution(
760
        ctx context.Context,
761
        request *DeleteCurrentWorkflowExecutionRequest,
762
) error {
54✔
763
        return m.persistence.DeleteCurrentWorkflowExecution(ctx, request)
54✔
764
}
54✔
765

766
func (m *executionManagerImpl) GetCurrentExecution(
767
        ctx context.Context,
768
        request *GetCurrentExecutionRequest,
769
) (*GetCurrentExecutionResponse, error) {
183✔
770
        return m.persistence.GetCurrentExecution(ctx, request)
183✔
771
}
183✔
772

773
func (m *executionManagerImpl) ListCurrentExecutions(
774
        ctx context.Context,
775
        request *ListCurrentExecutionsRequest,
776
) (*ListCurrentExecutionsResponse, error) {
×
777
        return m.persistence.ListCurrentExecutions(ctx, request)
×
778
}
×
779

780
func (m *executionManagerImpl) IsWorkflowExecutionExists(
781
        ctx context.Context,
782
        request *IsWorkflowExecutionExistsRequest,
783
) (*IsWorkflowExecutionExistsResponse, error) {
×
784
        return m.persistence.IsWorkflowExecutionExists(ctx, request)
×
785
}
×
786

787
func (m *executionManagerImpl) ListConcreteExecutions(
788
        ctx context.Context,
789
        request *ListConcreteExecutionsRequest,
790
) (*ListConcreteExecutionsResponse, error) {
×
791
        response, err := m.persistence.ListConcreteExecutions(ctx, request)
×
792
        if err != nil {
×
793
                return nil, err
×
794
        }
×
795
        newResponse := &ListConcreteExecutionsResponse{
×
796
                Executions: make([]*ListConcreteExecutionsEntity, len(response.Executions)),
×
797
                PageToken:  response.NextPageToken,
×
798
        }
×
799
        for i, e := range response.Executions {
×
800
                info, _, err := m.DeserializeExecutionInfo(e.ExecutionInfo)
×
801
                if err != nil {
×
802
                        return nil, err
×
803
                }
×
804
                vh, err := m.DeserializeVersionHistories(e.VersionHistories)
×
805
                if err != nil {
×
806
                        return nil, err
×
807
                }
×
808
                newResponse.Executions[i] = &ListConcreteExecutionsEntity{
×
809
                        ExecutionInfo:    info,
×
810
                        VersionHistories: vh,
×
811
                }
×
812
        }
813
        return newResponse, nil
×
814
}
815

816
// Transfer task related methods
817
func (m *executionManagerImpl) GetTransferTasks(
818
        ctx context.Context,
819
        request *GetTransferTasksRequest,
820
) (*GetTransferTasksResponse, error) {
2,317✔
821
        return m.persistence.GetTransferTasks(ctx, request)
2,317✔
822
}
2,317✔
823

824
func (m *executionManagerImpl) CompleteTransferTask(
825
        ctx context.Context,
826
        request *CompleteTransferTaskRequest,
827
) error {
×
828
        return m.persistence.CompleteTransferTask(ctx, request)
×
829
}
×
830

831
func (m *executionManagerImpl) RangeCompleteTransferTask(
832
        ctx context.Context,
833
        request *RangeCompleteTransferTaskRequest,
834
) (*RangeCompleteTransferTaskResponse, error) {
110✔
835
        return m.persistence.RangeCompleteTransferTask(ctx, request)
110✔
836
}
110✔
837

838
// Cross-cluster task related methods
839
func (m *executionManagerImpl) GetCrossClusterTasks(
840
        ctx context.Context,
841
        request *GetCrossClusterTasksRequest,
842
) (*GetCrossClusterTasksResponse, error) {
159✔
843
        return m.persistence.GetCrossClusterTasks(ctx, request)
159✔
844
}
159✔
845

846
func (m *executionManagerImpl) CompleteCrossClusterTask(
847
        ctx context.Context,
848
        request *CompleteCrossClusterTaskRequest,
849
) error {
×
850
        return m.persistence.CompleteCrossClusterTask(ctx, request)
×
851
}
×
852

853
func (m *executionManagerImpl) RangeCompleteCrossClusterTask(
854
        ctx context.Context,
855
        request *RangeCompleteCrossClusterTaskRequest,
856
) (*RangeCompleteCrossClusterTaskResponse, error) {
136✔
857
        return m.persistence.RangeCompleteCrossClusterTask(ctx, request)
136✔
858
}
136✔
859

860
// Replication task related methods
861
func (m *executionManagerImpl) GetReplicationTasks(
862
        ctx context.Context,
863
        request *GetReplicationTasksRequest,
864
) (*GetReplicationTasksResponse, error) {
120✔
865
        resp, err := m.persistence.GetReplicationTasks(ctx, request)
120✔
866
        if err != nil {
120✔
867
                return nil, err
×
868
        }
×
869

870
        return &GetReplicationTasksResponse{
120✔
871
                Tasks:         m.fromInternalReplicationTaskInfos(resp.Tasks),
120✔
872
                NextPageToken: resp.NextPageToken,
120✔
873
        }, nil
120✔
874
}
875

876
func (m *executionManagerImpl) CompleteReplicationTask(
877
        ctx context.Context,
878
        request *CompleteReplicationTaskRequest,
879
) error {
×
880
        return m.persistence.CompleteReplicationTask(ctx, request)
×
881
}
×
882

883
func (m *executionManagerImpl) RangeCompleteReplicationTask(
884
        ctx context.Context,
885
        request *RangeCompleteReplicationTaskRequest,
886
) (*RangeCompleteReplicationTaskResponse, error) {
122✔
887
        return m.persistence.RangeCompleteReplicationTask(ctx, request)
122✔
888
}
122✔
889

890
func (m *executionManagerImpl) PutReplicationTaskToDLQ(
891
        ctx context.Context,
892
        request *PutReplicationTaskToDLQRequest,
893
) error {
3✔
894
        internalRequest := &InternalPutReplicationTaskToDLQRequest{
3✔
895
                SourceClusterName: request.SourceClusterName,
3✔
896
                TaskInfo:          m.toInternalReplicationTaskInfo(request.TaskInfo),
3✔
897
        }
3✔
898
        return m.persistence.PutReplicationTaskToDLQ(ctx, internalRequest)
3✔
899
}
3✔
900

901
func (m *executionManagerImpl) GetReplicationTasksFromDLQ(
902
        ctx context.Context,
903
        request *GetReplicationTasksFromDLQRequest,
904
) (*GetReplicationTasksFromDLQResponse, error) {
3✔
905
        resp, err := m.persistence.GetReplicationTasksFromDLQ(ctx, request)
3✔
906
        if err != nil {
3✔
907
                return nil, err
×
908
        }
×
909
        return &GetReplicationTasksFromDLQResponse{
3✔
910
                Tasks:         m.fromInternalReplicationTaskInfos(resp.Tasks),
3✔
911
                NextPageToken: resp.NextPageToken,
3✔
912
        }, nil
3✔
913
}
914

915
func (m *executionManagerImpl) GetReplicationDLQSize(
916
        ctx context.Context,
917
        request *GetReplicationDLQSizeRequest,
918
) (*GetReplicationDLQSizeResponse, error) {
12✔
919
        return m.persistence.GetReplicationDLQSize(ctx, request)
12✔
920
}
12✔
921

922
func (m *executionManagerImpl) DeleteReplicationTaskFromDLQ(
923
        ctx context.Context,
924
        request *DeleteReplicationTaskFromDLQRequest,
925
) error {
×
926
        return m.persistence.DeleteReplicationTaskFromDLQ(ctx, request)
×
927
}
×
928

929
func (m *executionManagerImpl) RangeDeleteReplicationTaskFromDLQ(
930
        ctx context.Context,
931
        request *RangeDeleteReplicationTaskFromDLQRequest,
932
) (*RangeDeleteReplicationTaskFromDLQResponse, error) {
×
933
        return m.persistence.RangeDeleteReplicationTaskFromDLQ(ctx, request)
×
934
}
×
935

936
func (m *executionManagerImpl) CreateFailoverMarkerTasks(
937
        ctx context.Context,
938
        request *CreateFailoverMarkersRequest,
939
) error {
×
940
        return m.persistence.CreateFailoverMarkerTasks(ctx, request)
×
941
}
×
942

943
// Timer related methods.
944
func (m *executionManagerImpl) GetTimerIndexTasks(
945
        ctx context.Context,
946
        request *GetTimerIndexTasksRequest,
947
) (*GetTimerIndexTasksResponse, error) {
3,232✔
948
        return m.persistence.GetTimerIndexTasks(ctx, request)
3,232✔
949
}
3,232✔
950

951
func (m *executionManagerImpl) CompleteTimerTask(
952
        ctx context.Context,
953
        request *CompleteTimerTaskRequest,
954
) error {
×
955
        return m.persistence.CompleteTimerTask(ctx, request)
×
956
}
×
957

958
func (m *executionManagerImpl) RangeCompleteTimerTask(
959
        ctx context.Context,
960
        request *RangeCompleteTimerTaskRequest,
961
) (*RangeCompleteTimerTaskResponse, error) {
36✔
962
        return m.persistence.RangeCompleteTimerTask(ctx, request)
36✔
963
}
36✔
964

965
func (m *executionManagerImpl) Close() {
51✔
966
        m.persistence.Close()
51✔
967
}
51✔
968

969
func (m *executionManagerImpl) fromInternalReplicationTaskInfos(internalInfos []*InternalReplicationTaskInfo) []*ReplicationTaskInfo {
123✔
970
        if internalInfos == nil {
246✔
971
                return nil
123✔
972
        }
123✔
973
        infos := make([]*ReplicationTaskInfo, len(internalInfos))
3✔
974
        for i := 0; i < len(internalInfos); i++ {
6✔
975
                infos[i] = m.fromInternalReplicationTaskInfo(internalInfos[i])
3✔
976
        }
3✔
977
        return infos
3✔
978
}
979

980
func (m *executionManagerImpl) fromInternalReplicationTaskInfo(internalInfo *InternalReplicationTaskInfo) *ReplicationTaskInfo {
3✔
981
        if internalInfo == nil {
3✔
982
                return nil
×
983
        }
×
984
        return &ReplicationTaskInfo{
3✔
985
                DomainID:          internalInfo.DomainID,
3✔
986
                WorkflowID:        internalInfo.WorkflowID,
3✔
987
                RunID:             internalInfo.RunID,
3✔
988
                TaskID:            internalInfo.TaskID,
3✔
989
                TaskType:          internalInfo.TaskType,
3✔
990
                FirstEventID:      internalInfo.FirstEventID,
3✔
991
                NextEventID:       internalInfo.NextEventID,
3✔
992
                Version:           internalInfo.Version,
3✔
993
                ScheduledID:       internalInfo.ScheduledID,
3✔
994
                BranchToken:       internalInfo.BranchToken,
3✔
995
                NewRunBranchToken: internalInfo.NewRunBranchToken,
3✔
996
                CreationTime:      internalInfo.CreationTime.UnixNano(),
3✔
997
        }
3✔
998
}
999

1000
func (m *executionManagerImpl) toInternalReplicationTaskInfo(info *ReplicationTaskInfo) *InternalReplicationTaskInfo {
3✔
1001
        if info == nil {
3✔
1002
                return nil
×
1003
        }
×
1004
        return &InternalReplicationTaskInfo{
3✔
1005
                DomainID:          info.DomainID,
3✔
1006
                WorkflowID:        info.WorkflowID,
3✔
1007
                RunID:             info.RunID,
3✔
1008
                TaskID:            info.TaskID,
3✔
1009
                TaskType:          info.TaskType,
3✔
1010
                FirstEventID:      info.FirstEventID,
3✔
1011
                NextEventID:       info.NextEventID,
3✔
1012
                Version:           info.Version,
3✔
1013
                ScheduledID:       info.ScheduledID,
3✔
1014
                BranchToken:       info.BranchToken,
3✔
1015
                NewRunBranchToken: info.NewRunBranchToken,
3✔
1016
                CreationTime:      time.Unix(0, info.CreationTime),
3✔
1017
        }
3✔
1018
}
1019

1020
func getStartVersion(
1021
        versionHistories *VersionHistories,
1022
) (int64, error) {
5,067✔
1023

5,067✔
1024
        if versionHistories == nil {
5,067✔
1025
                return common.EmptyVersion, nil
×
1026
        }
×
1027

1028
        versionHistory, err := versionHistories.GetCurrentVersionHistory()
5,067✔
1029
        if err != nil {
5,067✔
1030
                return 0, err
×
1031
        }
×
1032
        versionHistoryItem, err := versionHistory.GetFirstItem()
5,067✔
1033
        if err != nil {
5,067✔
1034
                return 0, err
×
1035
        }
×
1036
        return versionHistoryItem.Version, nil
5,067✔
1037
}
1038

1039
func getLastWriteVersion(
1040
        versionHistories *VersionHistories,
1041
) (int64, error) {
5,067✔
1042

5,067✔
1043
        if versionHistories == nil {
5,067✔
1044
                return common.EmptyVersion, nil
×
1045
        }
×
1046

1047
        versionHistory, err := versionHistories.GetCurrentVersionHistory()
5,067✔
1048
        if err != nil {
5,067✔
1049
                return 0, err
×
1050
        }
×
1051
        versionHistoryItem, err := versionHistory.GetLastItem()
5,067✔
1052
        if err != nil {
5,067✔
1053
                return 0, err
×
1054
        }
×
1055
        return versionHistoryItem.Version, nil
5,067✔
1056
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc