• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187e7f4-61a9-46ed-8103-17edf71a5b77

04 May 2023 06:36PM UTC coverage: 57.183% (-0.04%) from 57.223%
0187e7f4-61a9-46ed-8103-17edf71a5b77

Pull #5249

buildkite

Ketsia
add logger in wf handler
Pull Request #5249: [WIP] Validate domain name

11 of 11 new or added lines in 2 files covered. (100.0%)

85694 of 149859 relevant lines covered (57.18%)

2441.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.89
/common/persistence/executionManager.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package persistence
23

24
import (
25
        "context"
26
        "time"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/log"
30
        "github.com/uber/cadence/common/types"
31
)
32

33
type (
34
        // executionManagerImpl implements ExecutionManager based on ExecutionStore, statsComputer and PayloadSerializer
35
        executionManagerImpl struct {
36
                serializer    PayloadSerializer
37
                persistence   ExecutionStore
38
                statsComputer statsComputer
39
                logger        log.Logger
40
        }
41
)
42

43
var _ ExecutionManager = (*executionManagerImpl)(nil)
44

45
// NewExecutionManagerImpl returns new ExecutionManager
46
func NewExecutionManagerImpl(
47
        persistence ExecutionStore,
48
        logger log.Logger,
49
) ExecutionManager {
63✔
50

63✔
51
        return &executionManagerImpl{
63✔
52
                serializer:    NewPayloadSerializer(),
63✔
53
                persistence:   persistence,
63✔
54
                statsComputer: statsComputer{},
63✔
55
                logger:        logger,
63✔
56
        }
63✔
57
}
63✔
58

59
func (m *executionManagerImpl) GetName() string {
×
60
        return m.persistence.GetName()
×
61
}
×
62

63
func (m *executionManagerImpl) GetShardID() int {
11,554✔
64
        return m.persistence.GetShardID()
11,554✔
65
}
11,554✔
66

67
// The below three APIs are related to serialization/deserialization
68
func (m *executionManagerImpl) GetWorkflowExecution(
69
        ctx context.Context,
70
        request *GetWorkflowExecutionRequest,
71
) (*GetWorkflowExecutionResponse, error) {
1,138✔
72

1,138✔
73
        internalRequest := &InternalGetWorkflowExecutionRequest{
1,138✔
74
                DomainID:  request.DomainID,
1,138✔
75
                Execution: request.Execution,
1,138✔
76
        }
1,138✔
77
        response, err := m.persistence.GetWorkflowExecution(ctx, internalRequest)
1,138✔
78
        if err != nil {
1,539✔
79
                return nil, err
401✔
80
        }
401✔
81
        newResponse := &GetWorkflowExecutionResponse{
740✔
82
                State: &WorkflowMutableState{
740✔
83
                        TimerInfos:         response.State.TimerInfos,
740✔
84
                        RequestCancelInfos: response.State.RequestCancelInfos,
740✔
85
                        SignalInfos:        response.State.SignalInfos,
740✔
86
                        SignalRequestedIDs: response.State.SignalRequestedIDs,
740✔
87
                        ReplicationState:   response.State.ReplicationState, // TODO: remove this after all 2DC workflows complete
740✔
88
                        Checksum:           response.State.Checksum,
740✔
89
                },
740✔
90
        }
740✔
91

740✔
92
        newResponse.State.ActivityInfos, err = m.DeserializeActivityInfos(response.State.ActivityInfos)
740✔
93
        if err != nil {
740✔
94
                return nil, err
×
95
        }
×
96
        newResponse.State.ChildExecutionInfos, err = m.DeserializeChildExecutionInfos(response.State.ChildExecutionInfos)
740✔
97
        if err != nil {
740✔
98
                return nil, err
×
99
        }
×
100
        newResponse.State.BufferedEvents, err = m.DeserializeBufferedEvents(response.State.BufferedEvents)
740✔
101
        if err != nil {
740✔
102
                return nil, err
×
103
        }
×
104
        newResponse.State.ExecutionInfo, newResponse.State.ExecutionStats, err = m.DeserializeExecutionInfo(response.State.ExecutionInfo)
740✔
105
        if err != nil {
740✔
106
                return nil, err
×
107
        }
×
108
        versionHistories, err := m.DeserializeVersionHistories(response.State.VersionHistories)
740✔
109
        if err != nil {
740✔
110
                return nil, err
×
111
        }
×
112
        newResponse.State.VersionHistories = versionHistories
740✔
113
        newResponse.MutableStateStats = m.statsComputer.computeMutableStateStats(response)
740✔
114

740✔
115
        return newResponse, nil
740✔
116
}
117

118
func (m *executionManagerImpl) DeserializeExecutionInfo(
119
        info *InternalWorkflowExecutionInfo,
120
) (*WorkflowExecutionInfo, *ExecutionStats, error) {
740✔
121

740✔
122
        completionEvent, err := m.serializer.DeserializeEvent(info.CompletionEvent)
740✔
123
        if err != nil {
740✔
124
                return nil, nil, err
×
125
        }
×
126

127
        autoResetPoints, err := m.serializer.DeserializeResetPoints(info.AutoResetPoints)
740✔
128
        if err != nil {
740✔
129
                return nil, nil, err
×
130
        }
×
131

132
        newInfo := &WorkflowExecutionInfo{
740✔
133
                CompletionEvent: completionEvent,
740✔
134

740✔
135
                DomainID:                           info.DomainID,
740✔
136
                WorkflowID:                         info.WorkflowID,
740✔
137
                RunID:                              info.RunID,
740✔
138
                FirstExecutionRunID:                info.FirstExecutionRunID,
740✔
139
                ParentDomainID:                     info.ParentDomainID,
740✔
140
                ParentWorkflowID:                   info.ParentWorkflowID,
740✔
141
                ParentRunID:                        info.ParentRunID,
740✔
142
                InitiatedID:                        info.InitiatedID,
740✔
143
                CompletionEventBatchID:             info.CompletionEventBatchID,
740✔
144
                TaskList:                           info.TaskList,
740✔
145
                IsCron:                             len(info.CronSchedule) > 0,
740✔
146
                WorkflowTypeName:                   info.WorkflowTypeName,
740✔
147
                WorkflowTimeout:                    int32(info.WorkflowTimeout.Seconds()),
740✔
148
                DecisionStartToCloseTimeout:        int32(info.DecisionStartToCloseTimeout.Seconds()),
740✔
149
                ExecutionContext:                   info.ExecutionContext,
740✔
150
                State:                              info.State,
740✔
151
                CloseStatus:                        info.CloseStatus,
740✔
152
                LastFirstEventID:                   info.LastFirstEventID,
740✔
153
                LastEventTaskID:                    info.LastEventTaskID,
740✔
154
                NextEventID:                        info.NextEventID,
740✔
155
                LastProcessedEvent:                 info.LastProcessedEvent,
740✔
156
                StartTimestamp:                     info.StartTimestamp,
740✔
157
                LastUpdatedTimestamp:               info.LastUpdatedTimestamp,
740✔
158
                CreateRequestID:                    info.CreateRequestID,
740✔
159
                SignalCount:                        info.SignalCount,
740✔
160
                DecisionVersion:                    info.DecisionVersion,
740✔
161
                DecisionScheduleID:                 info.DecisionScheduleID,
740✔
162
                DecisionStartedID:                  info.DecisionStartedID,
740✔
163
                DecisionRequestID:                  info.DecisionRequestID,
740✔
164
                DecisionTimeout:                    int32(info.DecisionTimeout.Seconds()),
740✔
165
                DecisionAttempt:                    info.DecisionAttempt,
740✔
166
                DecisionStartedTimestamp:           info.DecisionStartedTimestamp.UnixNano(),
740✔
167
                DecisionScheduledTimestamp:         info.DecisionScheduledTimestamp.UnixNano(),
740✔
168
                DecisionOriginalScheduledTimestamp: info.DecisionOriginalScheduledTimestamp.UnixNano(),
740✔
169
                CancelRequested:                    info.CancelRequested,
740✔
170
                CancelRequestID:                    info.CancelRequestID,
740✔
171
                StickyTaskList:                     info.StickyTaskList,
740✔
172
                StickyScheduleToStartTimeout:       int32(info.StickyScheduleToStartTimeout.Seconds()),
740✔
173
                ClientLibraryVersion:               info.ClientLibraryVersion,
740✔
174
                ClientFeatureVersion:               info.ClientFeatureVersion,
740✔
175
                ClientImpl:                         info.ClientImpl,
740✔
176
                Attempt:                            info.Attempt,
740✔
177
                HasRetryPolicy:                     info.HasRetryPolicy,
740✔
178
                InitialInterval:                    int32(info.InitialInterval.Seconds()),
740✔
179
                BackoffCoefficient:                 info.BackoffCoefficient,
740✔
180
                MaximumInterval:                    int32(info.MaximumInterval.Seconds()),
740✔
181
                ExpirationTime:                     info.ExpirationTime,
740✔
182
                MaximumAttempts:                    info.MaximumAttempts,
740✔
183
                NonRetriableErrors:                 info.NonRetriableErrors,
740✔
184
                BranchToken:                        info.BranchToken,
740✔
185
                CronSchedule:                       info.CronSchedule,
740✔
186
                ExpirationSeconds:                  int32(info.ExpirationInterval.Seconds()),
740✔
187
                AutoResetPoints:                    autoResetPoints,
740✔
188
                SearchAttributes:                   info.SearchAttributes,
740✔
189
                Memo:                               info.Memo,
740✔
190
        }
740✔
191
        newStats := &ExecutionStats{
740✔
192
                HistorySize: info.HistorySize,
740✔
193
        }
740✔
194
        return newInfo, newStats, nil
740✔
195
}
196

197
func (m *executionManagerImpl) DeserializeBufferedEvents(
198
        blobs []*DataBlob,
199
) ([]*types.HistoryEvent, error) {
740✔
200

740✔
201
        events := make([]*types.HistoryEvent, 0)
740✔
202
        for _, b := range blobs {
743✔
203
                history, err := m.serializer.DeserializeBatchEvents(b)
3✔
204
                if err != nil {
3✔
205
                        return nil, err
×
206
                }
×
207
                events = append(events, history...)
3✔
208
        }
209
        return events, nil
740✔
210
}
211

212
func (m *executionManagerImpl) DeserializeChildExecutionInfos(
213
        infos map[int64]*InternalChildExecutionInfo,
214
) (map[int64]*ChildExecutionInfo, error) {
740✔
215

740✔
216
        newInfos := make(map[int64]*ChildExecutionInfo)
740✔
217
        for k, v := range infos {
743✔
218
                initiatedEvent, err := m.serializer.DeserializeEvent(v.InitiatedEvent)
3✔
219
                if err != nil {
3✔
220
                        return nil, err
×
221
                }
×
222
                startedEvent, err := m.serializer.DeserializeEvent(v.StartedEvent)
3✔
223
                if err != nil {
3✔
224
                        return nil, err
×
225
                }
×
226
                c := &ChildExecutionInfo{
3✔
227
                        InitiatedEvent: initiatedEvent,
3✔
228
                        StartedEvent:   startedEvent,
3✔
229

3✔
230
                        Version:               v.Version,
3✔
231
                        InitiatedID:           v.InitiatedID,
3✔
232
                        InitiatedEventBatchID: v.InitiatedEventBatchID,
3✔
233
                        StartedID:             v.StartedID,
3✔
234
                        StartedWorkflowID:     v.StartedWorkflowID,
3✔
235
                        StartedRunID:          v.StartedRunID,
3✔
236
                        CreateRequestID:       v.CreateRequestID,
3✔
237
                        DomainID:              v.DomainID,
3✔
238
                        DomainNameDEPRECATED:  v.DomainNameDEPRECATED,
3✔
239
                        WorkflowTypeName:      v.WorkflowTypeName,
3✔
240
                        ParentClosePolicy:     v.ParentClosePolicy,
3✔
241
                }
3✔
242

3✔
243
                // Needed for backward compatibility reason.
3✔
244
                // ChildWorkflowExecutionStartedEvent was only used by transfer queue processing of StartChildWorkflow.
3✔
245
                // Updated the code to instead directly read WorkflowId and RunId from mutable state
3✔
246
                // Existing mutable state won't have those values set so instead use started event to set StartedWorkflowID and
3✔
247
                // StartedRunID on the mutable state before passing it to application
3✔
248
                if startedEvent != nil && startedEvent.ChildWorkflowExecutionStartedEventAttributes != nil &&
3✔
249
                        startedEvent.ChildWorkflowExecutionStartedEventAttributes.WorkflowExecution != nil {
3✔
250
                        startedExecution := startedEvent.ChildWorkflowExecutionStartedEventAttributes.WorkflowExecution
×
251
                        c.StartedWorkflowID = startedExecution.GetWorkflowID()
×
252
                        c.StartedRunID = startedExecution.GetRunID()
×
253
                }
×
254
                newInfos[k] = c
3✔
255
        }
256
        return newInfos, nil
740✔
257
}
258

259
func (m *executionManagerImpl) DeserializeActivityInfos(
260
        infos map[int64]*InternalActivityInfo,
261
) (map[int64]*ActivityInfo, error) {
740✔
262

740✔
263
        newInfos := make(map[int64]*ActivityInfo)
740✔
264
        for k, v := range infos {
845✔
265
                scheduledEvent, err := m.serializer.DeserializeEvent(v.ScheduledEvent)
105✔
266
                if err != nil {
105✔
267
                        return nil, err
×
268
                }
×
269
                startedEvent, err := m.serializer.DeserializeEvent(v.StartedEvent)
105✔
270
                if err != nil {
105✔
271
                        return nil, err
×
272
                }
×
273
                a := &ActivityInfo{
105✔
274
                        ScheduledEvent: scheduledEvent,
105✔
275
                        StartedEvent:   startedEvent,
105✔
276

105✔
277
                        Version:                                 v.Version,
105✔
278
                        ScheduleID:                              v.ScheduleID,
105✔
279
                        ScheduledEventBatchID:                   v.ScheduledEventBatchID,
105✔
280
                        ScheduledTime:                           v.ScheduledTime,
105✔
281
                        StartedID:                               v.StartedID,
105✔
282
                        StartedTime:                             v.StartedTime,
105✔
283
                        ActivityID:                              v.ActivityID,
105✔
284
                        RequestID:                               v.RequestID,
105✔
285
                        Details:                                 v.Details,
105✔
286
                        ScheduleToStartTimeout:                  int32(v.ScheduleToStartTimeout.Seconds()),
105✔
287
                        ScheduleToCloseTimeout:                  int32(v.ScheduleToCloseTimeout.Seconds()),
105✔
288
                        StartToCloseTimeout:                     int32(v.StartToCloseTimeout.Seconds()),
105✔
289
                        HeartbeatTimeout:                        int32(v.HeartbeatTimeout.Seconds()),
105✔
290
                        CancelRequested:                         v.CancelRequested,
105✔
291
                        CancelRequestID:                         v.CancelRequestID,
105✔
292
                        LastHeartBeatUpdatedTime:                v.LastHeartBeatUpdatedTime,
105✔
293
                        TimerTaskStatus:                         v.TimerTaskStatus,
105✔
294
                        Attempt:                                 v.Attempt,
105✔
295
                        DomainID:                                v.DomainID,
105✔
296
                        StartedIdentity:                         v.StartedIdentity,
105✔
297
                        TaskList:                                v.TaskList,
105✔
298
                        HasRetryPolicy:                          v.HasRetryPolicy,
105✔
299
                        InitialInterval:                         int32(v.InitialInterval.Seconds()),
105✔
300
                        BackoffCoefficient:                      v.BackoffCoefficient,
105✔
301
                        MaximumInterval:                         int32(v.MaximumInterval.Seconds()),
105✔
302
                        ExpirationTime:                          v.ExpirationTime,
105✔
303
                        MaximumAttempts:                         v.MaximumAttempts,
105✔
304
                        NonRetriableErrors:                      v.NonRetriableErrors,
105✔
305
                        LastFailureReason:                       v.LastFailureReason,
105✔
306
                        LastWorkerIdentity:                      v.LastWorkerIdentity,
105✔
307
                        LastFailureDetails:                      v.LastFailureDetails,
105✔
308
                        LastHeartbeatTimeoutVisibilityInSeconds: v.LastHeartbeatTimeoutVisibilityInSeconds,
105✔
309
                }
105✔
310
                newInfos[k] = a
105✔
311
        }
312
        return newInfos, nil
740✔
313
}
314

315
func (m *executionManagerImpl) UpdateWorkflowExecution(
316
        ctx context.Context,
317
        request *UpdateWorkflowExecutionRequest,
318
) (*UpdateWorkflowExecutionResponse, error) {
4,398✔
319

4,398✔
320
        serializedWorkflowMutation, err := m.SerializeWorkflowMutation(&request.UpdateWorkflowMutation, request.Encoding)
4,398✔
321
        if err != nil {
4,398✔
322
                return nil, err
×
323
        }
×
324
        var serializedNewWorkflowSnapshot *InternalWorkflowSnapshot
4,398✔
325
        if request.NewWorkflowSnapshot != nil {
4,575✔
326
                serializedNewWorkflowSnapshot, err = m.SerializeWorkflowSnapshot(request.NewWorkflowSnapshot, request.Encoding)
177✔
327
                if err != nil {
177✔
328
                        return nil, err
×
329
                }
×
330
        }
331

332
        newRequest := &InternalUpdateWorkflowExecutionRequest{
4,398✔
333
                RangeID: request.RangeID,
4,398✔
334

4,398✔
335
                Mode: request.Mode,
4,398✔
336

4,398✔
337
                UpdateWorkflowMutation: *serializedWorkflowMutation,
4,398✔
338
                NewWorkflowSnapshot:    serializedNewWorkflowSnapshot,
4,398✔
339
        }
4,398✔
340
        msuss := m.statsComputer.computeMutableStateUpdateStats(newRequest)
4,398✔
341
        err = m.persistence.UpdateWorkflowExecution(ctx, newRequest)
4,398✔
342
        if err != nil {
4,398✔
343
                return nil, err
×
344
        }
×
345
        return &UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: msuss}, nil
4,398✔
346
}
347

348
func (m *executionManagerImpl) SerializeUpsertChildExecutionInfos(
349
        infos []*ChildExecutionInfo,
350
        encoding common.EncodingType,
351
) ([]*InternalChildExecutionInfo, error) {
5,073✔
352

5,073✔
353
        newInfos := make([]*InternalChildExecutionInfo, 0)
5,073✔
354
        for _, v := range infos {
5,112✔
355
                initiatedEvent, err := m.serializer.SerializeEvent(v.InitiatedEvent, encoding)
39✔
356
                if err != nil {
39✔
357
                        return nil, err
×
358
                }
×
359
                startedEvent, err := m.serializer.SerializeEvent(v.StartedEvent, encoding)
39✔
360
                if err != nil {
39✔
361
                        return nil, err
×
362
                }
×
363
                i := &InternalChildExecutionInfo{
39✔
364
                        InitiatedEvent: initiatedEvent,
39✔
365
                        StartedEvent:   startedEvent,
39✔
366

39✔
367
                        Version:               v.Version,
39✔
368
                        InitiatedID:           v.InitiatedID,
39✔
369
                        InitiatedEventBatchID: v.InitiatedEventBatchID,
39✔
370
                        CreateRequestID:       v.CreateRequestID,
39✔
371
                        StartedID:             v.StartedID,
39✔
372
                        StartedWorkflowID:     v.StartedWorkflowID,
39✔
373
                        StartedRunID:          v.StartedRunID,
39✔
374
                        DomainID:              v.DomainID,
39✔
375
                        DomainNameDEPRECATED:  v.DomainNameDEPRECATED,
39✔
376
                        WorkflowTypeName:      v.WorkflowTypeName,
39✔
377
                        ParentClosePolicy:     v.ParentClosePolicy,
39✔
378
                }
39✔
379
                newInfos = append(newInfos, i)
39✔
380
        }
381
        return newInfos, nil
5,073✔
382
}
383

384
func (m *executionManagerImpl) SerializeUpsertActivityInfos(
385
        infos []*ActivityInfo,
386
        encoding common.EncodingType,
387
) ([]*InternalActivityInfo, error) {
5,073✔
388

5,073✔
389
        newInfos := make([]*InternalActivityInfo, 0)
5,073✔
390
        for _, v := range infos {
6,318✔
391
                scheduledEvent, err := m.serializer.SerializeEvent(v.ScheduledEvent, encoding)
1,245✔
392
                if err != nil {
1,245✔
393
                        return nil, err
×
394
                }
×
395
                startedEvent, err := m.serializer.SerializeEvent(v.StartedEvent, encoding)
1,245✔
396
                if err != nil {
1,245✔
397
                        return nil, err
×
398
                }
×
399
                i := &InternalActivityInfo{
1,245✔
400
                        Version:                                 v.Version,
1,245✔
401
                        ScheduleID:                              v.ScheduleID,
1,245✔
402
                        ScheduledEventBatchID:                   v.ScheduledEventBatchID,
1,245✔
403
                        ScheduledEvent:                          scheduledEvent,
1,245✔
404
                        ScheduledTime:                           v.ScheduledTime,
1,245✔
405
                        StartedID:                               v.StartedID,
1,245✔
406
                        StartedEvent:                            startedEvent,
1,245✔
407
                        StartedTime:                             v.StartedTime,
1,245✔
408
                        ActivityID:                              v.ActivityID,
1,245✔
409
                        RequestID:                               v.RequestID,
1,245✔
410
                        Details:                                 v.Details,
1,245✔
411
                        ScheduleToStartTimeout:                  common.SecondsToDuration(int64(v.ScheduleToStartTimeout)),
1,245✔
412
                        ScheduleToCloseTimeout:                  common.SecondsToDuration(int64(v.ScheduleToCloseTimeout)),
1,245✔
413
                        StartToCloseTimeout:                     common.SecondsToDuration(int64(v.StartToCloseTimeout)),
1,245✔
414
                        HeartbeatTimeout:                        common.SecondsToDuration(int64(v.HeartbeatTimeout)),
1,245✔
415
                        CancelRequested:                         v.CancelRequested,
1,245✔
416
                        CancelRequestID:                         v.CancelRequestID,
1,245✔
417
                        LastHeartBeatUpdatedTime:                v.LastHeartBeatUpdatedTime,
1,245✔
418
                        TimerTaskStatus:                         v.TimerTaskStatus,
1,245✔
419
                        Attempt:                                 v.Attempt,
1,245✔
420
                        DomainID:                                v.DomainID,
1,245✔
421
                        StartedIdentity:                         v.StartedIdentity,
1,245✔
422
                        TaskList:                                v.TaskList,
1,245✔
423
                        HasRetryPolicy:                          v.HasRetryPolicy,
1,245✔
424
                        InitialInterval:                         common.SecondsToDuration(int64(v.InitialInterval)),
1,245✔
425
                        BackoffCoefficient:                      v.BackoffCoefficient,
1,245✔
426
                        MaximumInterval:                         common.SecondsToDuration(int64(v.MaximumInterval)),
1,245✔
427
                        ExpirationTime:                          v.ExpirationTime,
1,245✔
428
                        MaximumAttempts:                         v.MaximumAttempts,
1,245✔
429
                        NonRetriableErrors:                      v.NonRetriableErrors,
1,245✔
430
                        LastFailureReason:                       v.LastFailureReason,
1,245✔
431
                        LastWorkerIdentity:                      v.LastWorkerIdentity,
1,245✔
432
                        LastFailureDetails:                      v.LastFailureDetails,
1,245✔
433
                        LastHeartbeatTimeoutVisibilityInSeconds: v.LastHeartbeatTimeoutVisibilityInSeconds,
1,245✔
434
                }
1,245✔
435
                newInfos = append(newInfos, i)
1,245✔
436
        }
437
        return newInfos, nil
5,073✔
438
}
439

440
func (m *executionManagerImpl) SerializeExecutionInfo(
441
        info *WorkflowExecutionInfo,
442
        stats *ExecutionStats,
443
        encoding common.EncodingType,
444
) (*InternalWorkflowExecutionInfo, error) {
5,073✔
445

5,073✔
446
        if info == nil {
5,073✔
447
                return &InternalWorkflowExecutionInfo{}, nil
×
448
        }
×
449
        completionEvent, err := m.serializer.SerializeEvent(info.CompletionEvent, encoding)
5,073✔
450
        if err != nil {
5,073✔
451
                return nil, err
×
452
        }
×
453

454
        resetPoints, err := m.serializer.SerializeResetPoints(info.AutoResetPoints, encoding)
5,073✔
455
        if err != nil {
5,073✔
456
                return nil, err
×
457
        }
×
458

459
        return &InternalWorkflowExecutionInfo{
5,073✔
460
                DomainID:                           info.DomainID,
5,073✔
461
                WorkflowID:                         info.WorkflowID,
5,073✔
462
                RunID:                              info.RunID,
5,073✔
463
                FirstExecutionRunID:                info.FirstExecutionRunID,
5,073✔
464
                ParentDomainID:                     info.ParentDomainID,
5,073✔
465
                ParentWorkflowID:                   info.ParentWorkflowID,
5,073✔
466
                ParentRunID:                        info.ParentRunID,
5,073✔
467
                InitiatedID:                        info.InitiatedID,
5,073✔
468
                CompletionEventBatchID:             info.CompletionEventBatchID,
5,073✔
469
                CompletionEvent:                    completionEvent,
5,073✔
470
                TaskList:                           info.TaskList,
5,073✔
471
                WorkflowTypeName:                   info.WorkflowTypeName,
5,073✔
472
                WorkflowTimeout:                    common.SecondsToDuration(int64(info.WorkflowTimeout)),
5,073✔
473
                DecisionStartToCloseTimeout:        common.SecondsToDuration(int64(info.DecisionStartToCloseTimeout)),
5,073✔
474
                ExecutionContext:                   info.ExecutionContext,
5,073✔
475
                State:                              info.State,
5,073✔
476
                CloseStatus:                        info.CloseStatus,
5,073✔
477
                LastFirstEventID:                   info.LastFirstEventID,
5,073✔
478
                LastEventTaskID:                    info.LastEventTaskID,
5,073✔
479
                NextEventID:                        info.NextEventID,
5,073✔
480
                LastProcessedEvent:                 info.LastProcessedEvent,
5,073✔
481
                StartTimestamp:                     info.StartTimestamp,
5,073✔
482
                LastUpdatedTimestamp:               info.LastUpdatedTimestamp,
5,073✔
483
                CreateRequestID:                    info.CreateRequestID,
5,073✔
484
                SignalCount:                        info.SignalCount,
5,073✔
485
                DecisionVersion:                    info.DecisionVersion,
5,073✔
486
                DecisionScheduleID:                 info.DecisionScheduleID,
5,073✔
487
                DecisionStartedID:                  info.DecisionStartedID,
5,073✔
488
                DecisionRequestID:                  info.DecisionRequestID,
5,073✔
489
                DecisionTimeout:                    common.SecondsToDuration(int64(info.DecisionTimeout)),
5,073✔
490
                DecisionAttempt:                    info.DecisionAttempt,
5,073✔
491
                DecisionStartedTimestamp:           time.Unix(0, info.DecisionStartedTimestamp),
5,073✔
492
                DecisionScheduledTimestamp:         time.Unix(0, info.DecisionScheduledTimestamp),
5,073✔
493
                DecisionOriginalScheduledTimestamp: time.Unix(0, info.DecisionOriginalScheduledTimestamp),
5,073✔
494
                CancelRequested:                    info.CancelRequested,
5,073✔
495
                CancelRequestID:                    info.CancelRequestID,
5,073✔
496
                StickyTaskList:                     info.StickyTaskList,
5,073✔
497
                StickyScheduleToStartTimeout:       common.SecondsToDuration(int64(info.StickyScheduleToStartTimeout)),
5,073✔
498
                ClientLibraryVersion:               info.ClientLibraryVersion,
5,073✔
499
                ClientFeatureVersion:               info.ClientFeatureVersion,
5,073✔
500
                ClientImpl:                         info.ClientImpl,
5,073✔
501
                AutoResetPoints:                    resetPoints,
5,073✔
502
                Attempt:                            info.Attempt,
5,073✔
503
                HasRetryPolicy:                     info.HasRetryPolicy,
5,073✔
504
                InitialInterval:                    common.SecondsToDuration(int64(info.InitialInterval)),
5,073✔
505
                BackoffCoefficient:                 info.BackoffCoefficient,
5,073✔
506
                MaximumInterval:                    common.SecondsToDuration(int64(info.MaximumInterval)),
5,073✔
507
                ExpirationTime:                     info.ExpirationTime,
5,073✔
508
                MaximumAttempts:                    info.MaximumAttempts,
5,073✔
509
                NonRetriableErrors:                 info.NonRetriableErrors,
5,073✔
510
                BranchToken:                        info.BranchToken,
5,073✔
511
                CronSchedule:                       info.CronSchedule,
5,073✔
512
                ExpirationInterval:                 common.SecondsToDuration(int64(info.ExpirationSeconds)),
5,073✔
513
                Memo:                               info.Memo,
5,073✔
514
                SearchAttributes:                   info.SearchAttributes,
5,073✔
515

5,073✔
516
                // attributes which are not related to mutable state
5,073✔
517
                HistorySize: stats.HistorySize,
5,073✔
518
        }, nil
5,073✔
519
}
520

521
func (m *executionManagerImpl) ConflictResolveWorkflowExecution(
522
        ctx context.Context,
523
        request *ConflictResolveWorkflowExecutionRequest,
524
) (*ConflictResolveWorkflowExecutionResponse, error) {
3✔
525

3✔
526
        serializedResetWorkflowSnapshot, err := m.SerializeWorkflowSnapshot(&request.ResetWorkflowSnapshot, request.Encoding)
3✔
527
        if err != nil {
3✔
528
                return nil, err
×
529
        }
×
530
        var serializedCurrentWorkflowMutation *InternalWorkflowMutation
3✔
531
        if request.CurrentWorkflowMutation != nil {
3✔
532
                serializedCurrentWorkflowMutation, err = m.SerializeWorkflowMutation(request.CurrentWorkflowMutation, request.Encoding)
×
533
                if err != nil {
×
534
                        return nil, err
×
535
                }
×
536
        }
537
        var serializedNewWorkflowMutation *InternalWorkflowSnapshot
3✔
538
        if request.NewWorkflowSnapshot != nil {
3✔
539
                serializedNewWorkflowMutation, err = m.SerializeWorkflowSnapshot(request.NewWorkflowSnapshot, request.Encoding)
×
540
                if err != nil {
×
541
                        return nil, err
×
542
                }
×
543
        }
544

545
        newRequest := &InternalConflictResolveWorkflowExecutionRequest{
3✔
546
                RangeID: request.RangeID,
3✔
547

3✔
548
                Mode: request.Mode,
3✔
549

3✔
550
                ResetWorkflowSnapshot: *serializedResetWorkflowSnapshot,
3✔
551

3✔
552
                NewWorkflowSnapshot: serializedNewWorkflowMutation,
3✔
553

3✔
554
                CurrentWorkflowMutation: serializedCurrentWorkflowMutation,
3✔
555
        }
3✔
556
        msuss := m.statsComputer.computeMutableStateConflictResolveStats(newRequest)
3✔
557
        err = m.persistence.ConflictResolveWorkflowExecution(ctx, newRequest)
3✔
558
        if err != nil {
3✔
559
                return nil, err
×
560
        }
×
561
        return &ConflictResolveWorkflowExecutionResponse{MutableStateUpdateSessionStats: msuss}, nil
3✔
562
}
563

564
func (m *executionManagerImpl) CreateWorkflowExecution(
565
        ctx context.Context,
566
        request *CreateWorkflowExecutionRequest,
567
) (*CreateWorkflowExecutionResponse, error) {
504✔
568

504✔
569
        encoding := common.EncodingTypeThriftRW
504✔
570

504✔
571
        serializedNewWorkflowSnapshot, err := m.SerializeWorkflowSnapshot(&request.NewWorkflowSnapshot, encoding)
504✔
572
        if err != nil {
504✔
573
                return nil, err
×
574
        }
×
575

576
        newRequest := &InternalCreateWorkflowExecutionRequest{
504✔
577
                RangeID: request.RangeID,
504✔
578

504✔
579
                Mode: request.Mode,
504✔
580

504✔
581
                PreviousRunID:            request.PreviousRunID,
504✔
582
                PreviousLastWriteVersion: request.PreviousLastWriteVersion,
504✔
583

504✔
584
                NewWorkflowSnapshot: *serializedNewWorkflowSnapshot,
504✔
585
        }
504✔
586

504✔
587
        msuss := m.statsComputer.computeMutableStateCreateStats(newRequest)
504✔
588
        _, err = m.persistence.CreateWorkflowExecution(ctx, newRequest)
504✔
589
        if err != nil {
549✔
590
                return nil, err
45✔
591
        }
45✔
592
        return &CreateWorkflowExecutionResponse{MutableStateUpdateSessionStats: msuss}, nil
462✔
593
}
594

595
func (m *executionManagerImpl) SerializeWorkflowMutation(
596
        input *WorkflowMutation,
597
        encoding common.EncodingType,
598
) (*InternalWorkflowMutation, error) {
4,398✔
599

4,398✔
600
        serializedExecutionInfo, err := m.SerializeExecutionInfo(
4,398✔
601
                input.ExecutionInfo,
4,398✔
602
                input.ExecutionStats,
4,398✔
603
                encoding,
4,398✔
604
        )
4,398✔
605
        if err != nil {
4,398✔
606
                return nil, err
×
607
        }
×
608
        serializedVersionHistories, err := m.SerializeVersionHistories(input.VersionHistories, encoding)
4,398✔
609
        if err != nil {
4,398✔
610
                return nil, err
×
611
        }
×
612
        serializedUpsertActivityInfos, err := m.SerializeUpsertActivityInfos(input.UpsertActivityInfos, encoding)
4,398✔
613
        if err != nil {
4,398✔
614
                return nil, err
×
615
        }
×
616
        serializedUpsertChildExecutionInfos, err := m.SerializeUpsertChildExecutionInfos(input.UpsertChildExecutionInfos, encoding)
4,398✔
617
        if err != nil {
4,398✔
618
                return nil, err
×
619
        }
×
620
        var serializedNewBufferedEvents *DataBlob
4,398✔
621
        if input.NewBufferedEvents != nil {
5,040✔
622
                serializedNewBufferedEvents, err = m.serializer.SerializeBatchEvents(input.NewBufferedEvents, encoding)
642✔
623
                if err != nil {
642✔
624
                        return nil, err
×
625
                }
×
626
        }
627

628
        startVersion, err := getStartVersion(input.VersionHistories)
4,398✔
629
        if err != nil {
4,398✔
630
                return nil, err
×
631
        }
×
632
        lastWriteVersion, err := getLastWriteVersion(input.VersionHistories)
4,398✔
633
        if err != nil {
4,398✔
634
                return nil, err
×
635
        }
×
636

637
        return &InternalWorkflowMutation{
4,398✔
638
                ExecutionInfo:    serializedExecutionInfo,
4,398✔
639
                VersionHistories: serializedVersionHistories,
4,398✔
640
                StartVersion:     startVersion,
4,398✔
641
                LastWriteVersion: lastWriteVersion,
4,398✔
642

4,398✔
643
                UpsertActivityInfos:       serializedUpsertActivityInfos,
4,398✔
644
                DeleteActivityInfos:       input.DeleteActivityInfos,
4,398✔
645
                UpsertTimerInfos:          input.UpsertTimerInfos,
4,398✔
646
                DeleteTimerInfos:          input.DeleteTimerInfos,
4,398✔
647
                UpsertChildExecutionInfos: serializedUpsertChildExecutionInfos,
4,398✔
648
                DeleteChildExecutionInfos: input.DeleteChildExecutionInfos,
4,398✔
649
                UpsertRequestCancelInfos:  input.UpsertRequestCancelInfos,
4,398✔
650
                DeleteRequestCancelInfos:  input.DeleteRequestCancelInfos,
4,398✔
651
                UpsertSignalInfos:         input.UpsertSignalInfos,
4,398✔
652
                DeleteSignalInfos:         input.DeleteSignalInfos,
4,398✔
653
                UpsertSignalRequestedIDs:  input.UpsertSignalRequestedIDs,
4,398✔
654
                DeleteSignalRequestedIDs:  input.DeleteSignalRequestedIDs,
4,398✔
655
                NewBufferedEvents:         serializedNewBufferedEvents,
4,398✔
656
                ClearBufferedEvents:       input.ClearBufferedEvents,
4,398✔
657

4,398✔
658
                TransferTasks:     input.TransferTasks,
4,398✔
659
                CrossClusterTasks: input.CrossClusterTasks,
4,398✔
660
                ReplicationTasks:  input.ReplicationTasks,
4,398✔
661
                TimerTasks:        input.TimerTasks,
4,398✔
662

4,398✔
663
                Condition: input.Condition,
4,398✔
664
                Checksum:  input.Checksum,
4,398✔
665
        }, nil
4,398✔
666
}
667

668
func (m *executionManagerImpl) SerializeWorkflowSnapshot(
669
        input *WorkflowSnapshot,
670
        encoding common.EncodingType,
671
) (*InternalWorkflowSnapshot, error) {
678✔
672

678✔
673
        serializedExecutionInfo, err := m.SerializeExecutionInfo(
678✔
674
                input.ExecutionInfo,
678✔
675
                input.ExecutionStats,
678✔
676
                encoding,
678✔
677
        )
678✔
678
        if err != nil {
678✔
679
                return nil, err
×
680
        }
×
681
        serializedVersionHistories, err := m.SerializeVersionHistories(input.VersionHistories, encoding)
678✔
682
        if err != nil {
678✔
683
                return nil, err
×
684
        }
×
685
        serializedActivityInfos, err := m.SerializeUpsertActivityInfos(input.ActivityInfos, encoding)
678✔
686
        if err != nil {
678✔
687
                return nil, err
×
688
        }
×
689
        serializedChildExecutionInfos, err := m.SerializeUpsertChildExecutionInfos(input.ChildExecutionInfos, encoding)
678✔
690
        if err != nil {
678✔
691
                return nil, err
×
692
        }
×
693

694
        startVersion, err := getStartVersion(input.VersionHistories)
678✔
695
        if err != nil {
678✔
696
                return nil, err
×
697
        }
×
698
        lastWriteVersion, err := getLastWriteVersion(input.VersionHistories)
678✔
699
        if err != nil {
678✔
700
                return nil, err
×
701
        }
×
702

703
        return &InternalWorkflowSnapshot{
678✔
704
                ExecutionInfo:    serializedExecutionInfo,
678✔
705
                VersionHistories: serializedVersionHistories,
678✔
706
                StartVersion:     startVersion,
678✔
707
                LastWriteVersion: lastWriteVersion,
678✔
708

678✔
709
                ActivityInfos:       serializedActivityInfos,
678✔
710
                TimerInfos:          input.TimerInfos,
678✔
711
                ChildExecutionInfos: serializedChildExecutionInfos,
678✔
712
                RequestCancelInfos:  input.RequestCancelInfos,
678✔
713
                SignalInfos:         input.SignalInfos,
678✔
714
                SignalRequestedIDs:  input.SignalRequestedIDs,
678✔
715

678✔
716
                TransferTasks:     input.TransferTasks,
678✔
717
                CrossClusterTasks: input.CrossClusterTasks,
678✔
718
                ReplicationTasks:  input.ReplicationTasks,
678✔
719
                TimerTasks:        input.TimerTasks,
678✔
720

678✔
721
                Condition: input.Condition,
678✔
722
                Checksum:  input.Checksum,
678✔
723
        }, nil
678✔
724
}
725

726
func (m *executionManagerImpl) SerializeVersionHistories(
727
        versionHistories *VersionHistories,
728
        encoding common.EncodingType,
729
) (*DataBlob, error) {
5,073✔
730

5,073✔
731
        if versionHistories == nil {
5,073✔
732
                return nil, nil
×
733
        }
×
734
        return m.serializer.SerializeVersionHistories(versionHistories.ToInternalType(), encoding)
5,073✔
735
}
736

737
func (m *executionManagerImpl) DeserializeVersionHistories(
738
        blob *DataBlob,
739
) (*VersionHistories, error) {
740✔
740

740✔
741
        if blob == nil {
740✔
742
                return nil, nil
×
743
        }
×
744
        versionHistories, err := m.serializer.DeserializeVersionHistories(blob)
740✔
745
        if err != nil {
740✔
746
                return nil, err
×
747
        }
×
748
        return NewVersionHistoriesFromInternalType(versionHistories), nil
740✔
749
}
750

751
func (m *executionManagerImpl) DeleteWorkflowExecution(
752
        ctx context.Context,
753
        request *DeleteWorkflowExecutionRequest,
754
) error {
54✔
755
        return m.persistence.DeleteWorkflowExecution(ctx, request)
54✔
756
}
54✔
757

758
func (m *executionManagerImpl) DeleteCurrentWorkflowExecution(
759
        ctx context.Context,
760
        request *DeleteCurrentWorkflowExecutionRequest,
761
) error {
54✔
762
        return m.persistence.DeleteCurrentWorkflowExecution(ctx, request)
54✔
763
}
54✔
764

765
func (m *executionManagerImpl) GetCurrentExecution(
766
        ctx context.Context,
767
        request *GetCurrentExecutionRequest,
768
) (*GetCurrentExecutionResponse, error) {
183✔
769
        return m.persistence.GetCurrentExecution(ctx, request)
183✔
770
}
183✔
771

772
func (m *executionManagerImpl) ListCurrentExecutions(
773
        ctx context.Context,
774
        request *ListCurrentExecutionsRequest,
775
) (*ListCurrentExecutionsResponse, error) {
×
776
        return m.persistence.ListCurrentExecutions(ctx, request)
×
777
}
×
778

779
func (m *executionManagerImpl) IsWorkflowExecutionExists(
780
        ctx context.Context,
781
        request *IsWorkflowExecutionExistsRequest,
782
) (*IsWorkflowExecutionExistsResponse, error) {
×
783
        return m.persistence.IsWorkflowExecutionExists(ctx, request)
×
784
}
×
785

786
func (m *executionManagerImpl) ListConcreteExecutions(
787
        ctx context.Context,
788
        request *ListConcreteExecutionsRequest,
789
) (*ListConcreteExecutionsResponse, error) {
×
790
        response, err := m.persistence.ListConcreteExecutions(ctx, request)
×
791
        if err != nil {
×
792
                return nil, err
×
793
        }
×
794
        newResponse := &ListConcreteExecutionsResponse{
×
795
                Executions: make([]*ListConcreteExecutionsEntity, len(response.Executions)),
×
796
                PageToken:  response.NextPageToken,
×
797
        }
×
798
        for i, e := range response.Executions {
×
799
                info, _, err := m.DeserializeExecutionInfo(e.ExecutionInfo)
×
800
                if err != nil {
×
801
                        return nil, err
×
802
                }
×
803
                vh, err := m.DeserializeVersionHistories(e.VersionHistories)
×
804
                if err != nil {
×
805
                        return nil, err
×
806
                }
×
807
                newResponse.Executions[i] = &ListConcreteExecutionsEntity{
×
808
                        ExecutionInfo:    info,
×
809
                        VersionHistories: vh,
×
810
                }
×
811
        }
812
        return newResponse, nil
×
813
}
814

815
// Transfer task related methods
816
func (m *executionManagerImpl) GetTransferTasks(
817
        ctx context.Context,
818
        request *GetTransferTasksRequest,
819
) (*GetTransferTasksResponse, error) {
2,317✔
820
        return m.persistence.GetTransferTasks(ctx, request)
2,317✔
821
}
2,317✔
822

823
func (m *executionManagerImpl) CompleteTransferTask(
824
        ctx context.Context,
825
        request *CompleteTransferTaskRequest,
826
) error {
×
827
        return m.persistence.CompleteTransferTask(ctx, request)
×
828
}
×
829

830
func (m *executionManagerImpl) RangeCompleteTransferTask(
831
        ctx context.Context,
832
        request *RangeCompleteTransferTaskRequest,
833
) (*RangeCompleteTransferTaskResponse, error) {
110✔
834
        return m.persistence.RangeCompleteTransferTask(ctx, request)
110✔
835
}
110✔
836

837
// Cross-cluster task related methods
838
func (m *executionManagerImpl) GetCrossClusterTasks(
839
        ctx context.Context,
840
        request *GetCrossClusterTasksRequest,
841
) (*GetCrossClusterTasksResponse, error) {
158✔
842
        return m.persistence.GetCrossClusterTasks(ctx, request)
158✔
843
}
158✔
844

845
func (m *executionManagerImpl) CompleteCrossClusterTask(
846
        ctx context.Context,
847
        request *CompleteCrossClusterTaskRequest,
848
) error {
×
849
        return m.persistence.CompleteCrossClusterTask(ctx, request)
×
850
}
×
851

852
func (m *executionManagerImpl) RangeCompleteCrossClusterTask(
853
        ctx context.Context,
854
        request *RangeCompleteCrossClusterTaskRequest,
855
) (*RangeCompleteCrossClusterTaskResponse, error) {
138✔
856
        return m.persistence.RangeCompleteCrossClusterTask(ctx, request)
138✔
857
}
138✔
858

859
// Replication task related methods
860
func (m *executionManagerImpl) GetReplicationTasks(
861
        ctx context.Context,
862
        request *GetReplicationTasksRequest,
863
) (*GetReplicationTasksResponse, error) {
120✔
864
        resp, err := m.persistence.GetReplicationTasks(ctx, request)
120✔
865
        if err != nil {
120✔
866
                return nil, err
×
867
        }
×
868

869
        return &GetReplicationTasksResponse{
120✔
870
                Tasks:         m.fromInternalReplicationTaskInfos(resp.Tasks),
120✔
871
                NextPageToken: resp.NextPageToken,
120✔
872
        }, nil
120✔
873
}
874

875
func (m *executionManagerImpl) CompleteReplicationTask(
876
        ctx context.Context,
877
        request *CompleteReplicationTaskRequest,
878
) error {
×
879
        return m.persistence.CompleteReplicationTask(ctx, request)
×
880
}
×
881

882
func (m *executionManagerImpl) RangeCompleteReplicationTask(
883
        ctx context.Context,
884
        request *RangeCompleteReplicationTaskRequest,
885
) (*RangeCompleteReplicationTaskResponse, error) {
117✔
886
        return m.persistence.RangeCompleteReplicationTask(ctx, request)
117✔
887
}
117✔
888

889
func (m *executionManagerImpl) PutReplicationTaskToDLQ(
890
        ctx context.Context,
891
        request *PutReplicationTaskToDLQRequest,
892
) error {
3✔
893
        internalRequest := &InternalPutReplicationTaskToDLQRequest{
3✔
894
                SourceClusterName: request.SourceClusterName,
3✔
895
                TaskInfo:          m.toInternalReplicationTaskInfo(request.TaskInfo),
3✔
896
        }
3✔
897
        return m.persistence.PutReplicationTaskToDLQ(ctx, internalRequest)
3✔
898
}
3✔
899

900
func (m *executionManagerImpl) GetReplicationTasksFromDLQ(
901
        ctx context.Context,
902
        request *GetReplicationTasksFromDLQRequest,
903
) (*GetReplicationTasksFromDLQResponse, error) {
3✔
904
        resp, err := m.persistence.GetReplicationTasksFromDLQ(ctx, request)
3✔
905
        if err != nil {
3✔
906
                return nil, err
×
907
        }
×
908
        return &GetReplicationTasksFromDLQResponse{
3✔
909
                Tasks:         m.fromInternalReplicationTaskInfos(resp.Tasks),
3✔
910
                NextPageToken: resp.NextPageToken,
3✔
911
        }, nil
3✔
912
}
913

914
func (m *executionManagerImpl) GetReplicationDLQSize(
915
        ctx context.Context,
916
        request *GetReplicationDLQSizeRequest,
917
) (*GetReplicationDLQSizeResponse, error) {
12✔
918
        return m.persistence.GetReplicationDLQSize(ctx, request)
12✔
919
}
12✔
920

921
func (m *executionManagerImpl) DeleteReplicationTaskFromDLQ(
922
        ctx context.Context,
923
        request *DeleteReplicationTaskFromDLQRequest,
924
) error {
×
925
        return m.persistence.DeleteReplicationTaskFromDLQ(ctx, request)
×
926
}
×
927

928
func (m *executionManagerImpl) RangeDeleteReplicationTaskFromDLQ(
929
        ctx context.Context,
930
        request *RangeDeleteReplicationTaskFromDLQRequest,
931
) (*RangeDeleteReplicationTaskFromDLQResponse, error) {
×
932
        return m.persistence.RangeDeleteReplicationTaskFromDLQ(ctx, request)
×
933
}
×
934

935
func (m *executionManagerImpl) CreateFailoverMarkerTasks(
936
        ctx context.Context,
937
        request *CreateFailoverMarkersRequest,
938
) error {
×
939
        return m.persistence.CreateFailoverMarkerTasks(ctx, request)
×
940
}
×
941

942
// Timer related methods.
943
func (m *executionManagerImpl) GetTimerIndexTasks(
944
        ctx context.Context,
945
        request *GetTimerIndexTasksRequest,
946
) (*GetTimerIndexTasksResponse, error) {
3,240✔
947
        return m.persistence.GetTimerIndexTasks(ctx, request)
3,240✔
948
}
3,240✔
949

950
func (m *executionManagerImpl) CompleteTimerTask(
951
        ctx context.Context,
952
        request *CompleteTimerTaskRequest,
953
) error {
×
954
        return m.persistence.CompleteTimerTask(ctx, request)
×
955
}
×
956

957
func (m *executionManagerImpl) RangeCompleteTimerTask(
958
        ctx context.Context,
959
        request *RangeCompleteTimerTaskRequest,
960
) (*RangeCompleteTimerTaskResponse, error) {
36✔
961
        return m.persistence.RangeCompleteTimerTask(ctx, request)
36✔
962
}
36✔
963

964
func (m *executionManagerImpl) Close() {
51✔
965
        m.persistence.Close()
51✔
966
}
51✔
967

968
func (m *executionManagerImpl) fromInternalReplicationTaskInfos(internalInfos []*InternalReplicationTaskInfo) []*ReplicationTaskInfo {
123✔
969
        if internalInfos == nil {
246✔
970
                return nil
123✔
971
        }
123✔
972
        infos := make([]*ReplicationTaskInfo, len(internalInfos))
3✔
973
        for i := 0; i < len(internalInfos); i++ {
6✔
974
                infos[i] = m.fromInternalReplicationTaskInfo(internalInfos[i])
3✔
975
        }
3✔
976
        return infos
3✔
977
}
978

979
func (m *executionManagerImpl) fromInternalReplicationTaskInfo(internalInfo *InternalReplicationTaskInfo) *ReplicationTaskInfo {
3✔
980
        if internalInfo == nil {
3✔
981
                return nil
×
982
        }
×
983
        return &ReplicationTaskInfo{
3✔
984
                DomainID:          internalInfo.DomainID,
3✔
985
                WorkflowID:        internalInfo.WorkflowID,
3✔
986
                RunID:             internalInfo.RunID,
3✔
987
                TaskID:            internalInfo.TaskID,
3✔
988
                TaskType:          internalInfo.TaskType,
3✔
989
                FirstEventID:      internalInfo.FirstEventID,
3✔
990
                NextEventID:       internalInfo.NextEventID,
3✔
991
                Version:           internalInfo.Version,
3✔
992
                ScheduledID:       internalInfo.ScheduledID,
3✔
993
                BranchToken:       internalInfo.BranchToken,
3✔
994
                NewRunBranchToken: internalInfo.NewRunBranchToken,
3✔
995
                CreationTime:      internalInfo.CreationTime.UnixNano(),
3✔
996
        }
3✔
997
}
998

999
func (m *executionManagerImpl) toInternalReplicationTaskInfo(info *ReplicationTaskInfo) *InternalReplicationTaskInfo {
3✔
1000
        if info == nil {
3✔
1001
                return nil
×
1002
        }
×
1003
        return &InternalReplicationTaskInfo{
3✔
1004
                DomainID:          info.DomainID,
3✔
1005
                WorkflowID:        info.WorkflowID,
3✔
1006
                RunID:             info.RunID,
3✔
1007
                TaskID:            info.TaskID,
3✔
1008
                TaskType:          info.TaskType,
3✔
1009
                FirstEventID:      info.FirstEventID,
3✔
1010
                NextEventID:       info.NextEventID,
3✔
1011
                Version:           info.Version,
3✔
1012
                ScheduledID:       info.ScheduledID,
3✔
1013
                BranchToken:       info.BranchToken,
3✔
1014
                NewRunBranchToken: info.NewRunBranchToken,
3✔
1015
                CreationTime:      time.Unix(0, info.CreationTime),
3✔
1016
        }
3✔
1017
}
1018

1019
func getStartVersion(
1020
        versionHistories *VersionHistories,
1021
) (int64, error) {
5,073✔
1022

5,073✔
1023
        if versionHistories == nil {
5,073✔
1024
                return common.EmptyVersion, nil
×
1025
        }
×
1026

1027
        versionHistory, err := versionHistories.GetCurrentVersionHistory()
5,073✔
1028
        if err != nil {
5,073✔
1029
                return 0, err
×
1030
        }
×
1031
        versionHistoryItem, err := versionHistory.GetFirstItem()
5,073✔
1032
        if err != nil {
5,073✔
1033
                return 0, err
×
1034
        }
×
1035
        return versionHistoryItem.Version, nil
5,073✔
1036
}
1037

1038
func getLastWriteVersion(
1039
        versionHistories *VersionHistories,
1040
) (int64, error) {
5,073✔
1041

5,073✔
1042
        if versionHistories == nil {
5,073✔
1043
                return common.EmptyVersion, nil
×
1044
        }
×
1045

1046
        versionHistory, err := versionHistories.GetCurrentVersionHistory()
5,073✔
1047
        if err != nil {
5,073✔
1048
                return 0, err
×
1049
        }
×
1050
        versionHistoryItem, err := versionHistory.GetLastItem()
5,073✔
1051
        if err != nil {
5,073✔
1052
                return 0, err
×
1053
        }
×
1054
        return versionHistoryItem.Version, nil
5,073✔
1055
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc