• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01876c66-a3be-4e17-91b5-e14f7457e2fa

10 Apr 2023 06:38PM UTC coverage: 57.111% (+0.01%) from 57.097%
01876c66-a3be-4e17-91b5-e14f7457e2fa

push

buildkite

GitHub
Allow registering search attributes without Advance Visibility enabled (#5185)

17 of 17 new or added lines in 1 file covered. (100.0%)

85296 of 149351 relevant lines covered (57.11%)

2334.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.89
/common/persistence/executionManager.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package persistence
23

24
import (
25
        "context"
26
        "time"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/log"
30
        "github.com/uber/cadence/common/types"
31
)
32

33
type (
34
        // executionManagerImpl implements ExecutionManager based on ExecutionStore, statsComputer and PayloadSerializer
35
        executionManagerImpl struct {
36
                serializer    PayloadSerializer
37
                persistence   ExecutionStore
38
                statsComputer statsComputer
39
                logger        log.Logger
40
        }
41
)
42

43
var _ ExecutionManager = (*executionManagerImpl)(nil)
44

45
// NewExecutionManagerImpl returns new ExecutionManager
46
func NewExecutionManagerImpl(
47
        persistence ExecutionStore,
48
        logger log.Logger,
49
) ExecutionManager {
63✔
50

63✔
51
        return &executionManagerImpl{
63✔
52
                serializer:    NewPayloadSerializer(),
63✔
53
                persistence:   persistence,
63✔
54
                statsComputer: statsComputer{},
63✔
55
                logger:        logger,
63✔
56
        }
63✔
57
}
63✔
58

59
func (m *executionManagerImpl) GetName() string {
×
60
        return m.persistence.GetName()
×
61
}
×
62

63
func (m *executionManagerImpl) GetShardID() int {
11,582✔
64
        return m.persistence.GetShardID()
11,582✔
65
}
11,582✔
66

67
// The below three APIs are related to serialization/deserialization
68
func (m *executionManagerImpl) GetWorkflowExecution(
69
        ctx context.Context,
70
        request *GetWorkflowExecutionRequest,
71
) (*GetWorkflowExecutionResponse, error) {
1,136✔
72

1,136✔
73
        internalRequest := &InternalGetWorkflowExecutionRequest{
1,136✔
74
                DomainID:  request.DomainID,
1,136✔
75
                Execution: request.Execution,
1,136✔
76
        }
1,136✔
77
        response, err := m.persistence.GetWorkflowExecution(ctx, internalRequest)
1,136✔
78
        if err != nil {
1,534✔
79
                return nil, err
398✔
80
        }
398✔
81
        newResponse := &GetWorkflowExecutionResponse{
741✔
82
                State: &WorkflowMutableState{
741✔
83
                        TimerInfos:         response.State.TimerInfos,
741✔
84
                        RequestCancelInfos: response.State.RequestCancelInfos,
741✔
85
                        SignalInfos:        response.State.SignalInfos,
741✔
86
                        SignalRequestedIDs: response.State.SignalRequestedIDs,
741✔
87
                        ReplicationState:   response.State.ReplicationState, // TODO: remove this after all 2DC workflows complete
741✔
88
                        Checksum:           response.State.Checksum,
741✔
89
                },
741✔
90
        }
741✔
91

741✔
92
        newResponse.State.ActivityInfos, err = m.DeserializeActivityInfos(response.State.ActivityInfos)
741✔
93
        if err != nil {
741✔
94
                return nil, err
×
95
        }
×
96
        newResponse.State.ChildExecutionInfos, err = m.DeserializeChildExecutionInfos(response.State.ChildExecutionInfos)
741✔
97
        if err != nil {
741✔
98
                return nil, err
×
99
        }
×
100
        newResponse.State.BufferedEvents, err = m.DeserializeBufferedEvents(response.State.BufferedEvents)
741✔
101
        if err != nil {
741✔
102
                return nil, err
×
103
        }
×
104
        newResponse.State.ExecutionInfo, newResponse.State.ExecutionStats, err = m.DeserializeExecutionInfo(response.State.ExecutionInfo)
741✔
105
        if err != nil {
741✔
106
                return nil, err
×
107
        }
×
108
        versionHistories, err := m.DeserializeVersionHistories(response.State.VersionHistories)
741✔
109
        if err != nil {
741✔
110
                return nil, err
×
111
        }
×
112
        newResponse.State.VersionHistories = versionHistories
741✔
113
        newResponse.MutableStateStats = m.statsComputer.computeMutableStateStats(response)
741✔
114

741✔
115
        return newResponse, nil
741✔
116
}
117

118
func (m *executionManagerImpl) DeserializeExecutionInfo(
119
        info *InternalWorkflowExecutionInfo,
120
) (*WorkflowExecutionInfo, *ExecutionStats, error) {
741✔
121

741✔
122
        completionEvent, err := m.serializer.DeserializeEvent(info.CompletionEvent)
741✔
123
        if err != nil {
741✔
124
                return nil, nil, err
×
125
        }
×
126

127
        autoResetPoints, err := m.serializer.DeserializeResetPoints(info.AutoResetPoints)
741✔
128
        if err != nil {
741✔
129
                return nil, nil, err
×
130
        }
×
131

132
        newInfo := &WorkflowExecutionInfo{
741✔
133
                CompletionEvent: completionEvent,
741✔
134

741✔
135
                DomainID:                           info.DomainID,
741✔
136
                WorkflowID:                         info.WorkflowID,
741✔
137
                RunID:                              info.RunID,
741✔
138
                FirstExecutionRunID:                info.FirstExecutionRunID,
741✔
139
                ParentDomainID:                     info.ParentDomainID,
741✔
140
                ParentWorkflowID:                   info.ParentWorkflowID,
741✔
141
                ParentRunID:                        info.ParentRunID,
741✔
142
                InitiatedID:                        info.InitiatedID,
741✔
143
                CompletionEventBatchID:             info.CompletionEventBatchID,
741✔
144
                TaskList:                           info.TaskList,
741✔
145
                IsCron:                             len(info.CronSchedule) > 0,
741✔
146
                WorkflowTypeName:                   info.WorkflowTypeName,
741✔
147
                WorkflowTimeout:                    int32(info.WorkflowTimeout.Seconds()),
741✔
148
                DecisionStartToCloseTimeout:        int32(info.DecisionStartToCloseTimeout.Seconds()),
741✔
149
                ExecutionContext:                   info.ExecutionContext,
741✔
150
                State:                              info.State,
741✔
151
                CloseStatus:                        info.CloseStatus,
741✔
152
                LastFirstEventID:                   info.LastFirstEventID,
741✔
153
                LastEventTaskID:                    info.LastEventTaskID,
741✔
154
                NextEventID:                        info.NextEventID,
741✔
155
                LastProcessedEvent:                 info.LastProcessedEvent,
741✔
156
                StartTimestamp:                     info.StartTimestamp,
741✔
157
                LastUpdatedTimestamp:               info.LastUpdatedTimestamp,
741✔
158
                CreateRequestID:                    info.CreateRequestID,
741✔
159
                SignalCount:                        info.SignalCount,
741✔
160
                DecisionVersion:                    info.DecisionVersion,
741✔
161
                DecisionScheduleID:                 info.DecisionScheduleID,
741✔
162
                DecisionStartedID:                  info.DecisionStartedID,
741✔
163
                DecisionRequestID:                  info.DecisionRequestID,
741✔
164
                DecisionTimeout:                    int32(info.DecisionTimeout.Seconds()),
741✔
165
                DecisionAttempt:                    info.DecisionAttempt,
741✔
166
                DecisionStartedTimestamp:           info.DecisionStartedTimestamp.UnixNano(),
741✔
167
                DecisionScheduledTimestamp:         info.DecisionScheduledTimestamp.UnixNano(),
741✔
168
                DecisionOriginalScheduledTimestamp: info.DecisionOriginalScheduledTimestamp.UnixNano(),
741✔
169
                CancelRequested:                    info.CancelRequested,
741✔
170
                CancelRequestID:                    info.CancelRequestID,
741✔
171
                StickyTaskList:                     info.StickyTaskList,
741✔
172
                StickyScheduleToStartTimeout:       int32(info.StickyScheduleToStartTimeout.Seconds()),
741✔
173
                ClientLibraryVersion:               info.ClientLibraryVersion,
741✔
174
                ClientFeatureVersion:               info.ClientFeatureVersion,
741✔
175
                ClientImpl:                         info.ClientImpl,
741✔
176
                Attempt:                            info.Attempt,
741✔
177
                HasRetryPolicy:                     info.HasRetryPolicy,
741✔
178
                InitialInterval:                    int32(info.InitialInterval.Seconds()),
741✔
179
                BackoffCoefficient:                 info.BackoffCoefficient,
741✔
180
                MaximumInterval:                    int32(info.MaximumInterval.Seconds()),
741✔
181
                ExpirationTime:                     info.ExpirationTime,
741✔
182
                MaximumAttempts:                    info.MaximumAttempts,
741✔
183
                NonRetriableErrors:                 info.NonRetriableErrors,
741✔
184
                BranchToken:                        info.BranchToken,
741✔
185
                CronSchedule:                       info.CronSchedule,
741✔
186
                ExpirationSeconds:                  int32(info.ExpirationInterval.Seconds()),
741✔
187
                AutoResetPoints:                    autoResetPoints,
741✔
188
                SearchAttributes:                   info.SearchAttributes,
741✔
189
                Memo:                               info.Memo,
741✔
190
        }
741✔
191
        newStats := &ExecutionStats{
741✔
192
                HistorySize: info.HistorySize,
741✔
193
        }
741✔
194
        return newInfo, newStats, nil
741✔
195
}
196

197
func (m *executionManagerImpl) DeserializeBufferedEvents(
198
        blobs []*DataBlob,
199
) ([]*types.HistoryEvent, error) {
741✔
200

741✔
201
        events := make([]*types.HistoryEvent, 0)
741✔
202
        for _, b := range blobs {
744✔
203
                history, err := m.serializer.DeserializeBatchEvents(b)
3✔
204
                if err != nil {
3✔
205
                        return nil, err
×
206
                }
×
207
                events = append(events, history...)
3✔
208
        }
209
        return events, nil
741✔
210
}
211

212
func (m *executionManagerImpl) DeserializeChildExecutionInfos(
213
        infos map[int64]*InternalChildExecutionInfo,
214
) (map[int64]*ChildExecutionInfo, error) {
741✔
215

741✔
216
        newInfos := make(map[int64]*ChildExecutionInfo)
741✔
217
        for k, v := range infos {
745✔
218
                initiatedEvent, err := m.serializer.DeserializeEvent(v.InitiatedEvent)
4✔
219
                if err != nil {
4✔
220
                        return nil, err
×
221
                }
×
222
                startedEvent, err := m.serializer.DeserializeEvent(v.StartedEvent)
4✔
223
                if err != nil {
4✔
224
                        return nil, err
×
225
                }
×
226
                c := &ChildExecutionInfo{
4✔
227
                        InitiatedEvent: initiatedEvent,
4✔
228
                        StartedEvent:   startedEvent,
4✔
229

4✔
230
                        Version:               v.Version,
4✔
231
                        InitiatedID:           v.InitiatedID,
4✔
232
                        InitiatedEventBatchID: v.InitiatedEventBatchID,
4✔
233
                        StartedID:             v.StartedID,
4✔
234
                        StartedWorkflowID:     v.StartedWorkflowID,
4✔
235
                        StartedRunID:          v.StartedRunID,
4✔
236
                        CreateRequestID:       v.CreateRequestID,
4✔
237
                        DomainID:              v.DomainID,
4✔
238
                        DomainNameDEPRECATED:  v.DomainNameDEPRECATED,
4✔
239
                        WorkflowTypeName:      v.WorkflowTypeName,
4✔
240
                        ParentClosePolicy:     v.ParentClosePolicy,
4✔
241
                }
4✔
242

4✔
243
                // Needed for backward compatibility reason.
4✔
244
                // ChildWorkflowExecutionStartedEvent was only used by transfer queue processing of StartChildWorkflow.
4✔
245
                // Updated the code to instead directly read WorkflowId and RunId from mutable state
4✔
246
                // Existing mutable state won't have those values set so instead use started event to set StartedWorkflowID and
4✔
247
                // StartedRunID on the mutable state before passing it to application
4✔
248
                if startedEvent != nil && startedEvent.ChildWorkflowExecutionStartedEventAttributes != nil &&
4✔
249
                        startedEvent.ChildWorkflowExecutionStartedEventAttributes.WorkflowExecution != nil {
4✔
250
                        startedExecution := startedEvent.ChildWorkflowExecutionStartedEventAttributes.WorkflowExecution
×
251
                        c.StartedWorkflowID = startedExecution.GetWorkflowID()
×
252
                        c.StartedRunID = startedExecution.GetRunID()
×
253
                }
×
254
                newInfos[k] = c
4✔
255
        }
256
        return newInfos, nil
741✔
257
}
258

259
func (m *executionManagerImpl) DeserializeActivityInfos(
260
        infos map[int64]*InternalActivityInfo,
261
) (map[int64]*ActivityInfo, error) {
741✔
262

741✔
263
        newInfos := make(map[int64]*ActivityInfo)
741✔
264
        for k, v := range infos {
862✔
265
                scheduledEvent, err := m.serializer.DeserializeEvent(v.ScheduledEvent)
121✔
266
                if err != nil {
121✔
267
                        return nil, err
×
268
                }
×
269
                startedEvent, err := m.serializer.DeserializeEvent(v.StartedEvent)
121✔
270
                if err != nil {
121✔
271
                        return nil, err
×
272
                }
×
273
                a := &ActivityInfo{
121✔
274
                        ScheduledEvent: scheduledEvent,
121✔
275
                        StartedEvent:   startedEvent,
121✔
276

121✔
277
                        Version:                                 v.Version,
121✔
278
                        ScheduleID:                              v.ScheduleID,
121✔
279
                        ScheduledEventBatchID:                   v.ScheduledEventBatchID,
121✔
280
                        ScheduledTime:                           v.ScheduledTime,
121✔
281
                        StartedID:                               v.StartedID,
121✔
282
                        StartedTime:                             v.StartedTime,
121✔
283
                        ActivityID:                              v.ActivityID,
121✔
284
                        RequestID:                               v.RequestID,
121✔
285
                        Details:                                 v.Details,
121✔
286
                        ScheduleToStartTimeout:                  int32(v.ScheduleToStartTimeout.Seconds()),
121✔
287
                        ScheduleToCloseTimeout:                  int32(v.ScheduleToCloseTimeout.Seconds()),
121✔
288
                        StartToCloseTimeout:                     int32(v.StartToCloseTimeout.Seconds()),
121✔
289
                        HeartbeatTimeout:                        int32(v.HeartbeatTimeout.Seconds()),
121✔
290
                        CancelRequested:                         v.CancelRequested,
121✔
291
                        CancelRequestID:                         v.CancelRequestID,
121✔
292
                        LastHeartBeatUpdatedTime:                v.LastHeartBeatUpdatedTime,
121✔
293
                        TimerTaskStatus:                         v.TimerTaskStatus,
121✔
294
                        Attempt:                                 v.Attempt,
121✔
295
                        DomainID:                                v.DomainID,
121✔
296
                        StartedIdentity:                         v.StartedIdentity,
121✔
297
                        TaskList:                                v.TaskList,
121✔
298
                        HasRetryPolicy:                          v.HasRetryPolicy,
121✔
299
                        InitialInterval:                         int32(v.InitialInterval.Seconds()),
121✔
300
                        BackoffCoefficient:                      v.BackoffCoefficient,
121✔
301
                        MaximumInterval:                         int32(v.MaximumInterval.Seconds()),
121✔
302
                        ExpirationTime:                          v.ExpirationTime,
121✔
303
                        MaximumAttempts:                         v.MaximumAttempts,
121✔
304
                        NonRetriableErrors:                      v.NonRetriableErrors,
121✔
305
                        LastFailureReason:                       v.LastFailureReason,
121✔
306
                        LastWorkerIdentity:                      v.LastWorkerIdentity,
121✔
307
                        LastFailureDetails:                      v.LastFailureDetails,
121✔
308
                        LastHeartbeatTimeoutVisibilityInSeconds: v.LastHeartbeatTimeoutVisibilityInSeconds,
121✔
309
                }
121✔
310
                newInfos[k] = a
121✔
311
        }
312
        return newInfos, nil
741✔
313
}
314

315
func (m *executionManagerImpl) UpdateWorkflowExecution(
316
        ctx context.Context,
317
        request *UpdateWorkflowExecutionRequest,
318
) (*UpdateWorkflowExecutionResponse, error) {
4,413✔
319

4,413✔
320
        serializedWorkflowMutation, err := m.SerializeWorkflowMutation(&request.UpdateWorkflowMutation, request.Encoding)
4,413✔
321
        if err != nil {
4,413✔
322
                return nil, err
×
323
        }
×
324
        var serializedNewWorkflowSnapshot *InternalWorkflowSnapshot
4,413✔
325
        if request.NewWorkflowSnapshot != nil {
4,590✔
326
                serializedNewWorkflowSnapshot, err = m.SerializeWorkflowSnapshot(request.NewWorkflowSnapshot, request.Encoding)
177✔
327
                if err != nil {
177✔
328
                        return nil, err
×
329
                }
×
330
        }
331

332
        newRequest := &InternalUpdateWorkflowExecutionRequest{
4,413✔
333
                RangeID: request.RangeID,
4,413✔
334

4,413✔
335
                Mode: request.Mode,
4,413✔
336

4,413✔
337
                UpdateWorkflowMutation: *serializedWorkflowMutation,
4,413✔
338
                NewWorkflowSnapshot:    serializedNewWorkflowSnapshot,
4,413✔
339
        }
4,413✔
340
        msuss := m.statsComputer.computeMutableStateUpdateStats(newRequest)
4,413✔
341
        err = m.persistence.UpdateWorkflowExecution(ctx, newRequest)
4,413✔
342
        if err != nil {
4,413✔
343
                return nil, err
×
344
        }
×
345
        return &UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: msuss}, nil
4,413✔
346
}
347

348
func (m *executionManagerImpl) SerializeUpsertChildExecutionInfos(
349
        infos []*ChildExecutionInfo,
350
        encoding common.EncodingType,
351
) ([]*InternalChildExecutionInfo, error) {
5,088✔
352

5,088✔
353
        newInfos := make([]*InternalChildExecutionInfo, 0)
5,088✔
354
        for _, v := range infos {
5,127✔
355
                initiatedEvent, err := m.serializer.SerializeEvent(v.InitiatedEvent, encoding)
39✔
356
                if err != nil {
39✔
357
                        return nil, err
×
358
                }
×
359
                startedEvent, err := m.serializer.SerializeEvent(v.StartedEvent, encoding)
39✔
360
                if err != nil {
39✔
361
                        return nil, err
×
362
                }
×
363
                i := &InternalChildExecutionInfo{
39✔
364
                        InitiatedEvent: initiatedEvent,
39✔
365
                        StartedEvent:   startedEvent,
39✔
366

39✔
367
                        Version:               v.Version,
39✔
368
                        InitiatedID:           v.InitiatedID,
39✔
369
                        InitiatedEventBatchID: v.InitiatedEventBatchID,
39✔
370
                        CreateRequestID:       v.CreateRequestID,
39✔
371
                        StartedID:             v.StartedID,
39✔
372
                        StartedWorkflowID:     v.StartedWorkflowID,
39✔
373
                        StartedRunID:          v.StartedRunID,
39✔
374
                        DomainID:              v.DomainID,
39✔
375
                        DomainNameDEPRECATED:  v.DomainNameDEPRECATED,
39✔
376
                        WorkflowTypeName:      v.WorkflowTypeName,
39✔
377
                        ParentClosePolicy:     v.ParentClosePolicy,
39✔
378
                }
39✔
379
                newInfos = append(newInfos, i)
39✔
380
        }
381
        return newInfos, nil
5,088✔
382
}
383

384
func (m *executionManagerImpl) SerializeUpsertActivityInfos(
385
        infos []*ActivityInfo,
386
        encoding common.EncodingType,
387
) ([]*InternalActivityInfo, error) {
5,088✔
388

5,088✔
389
        newInfos := make([]*InternalActivityInfo, 0)
5,088✔
390
        for _, v := range infos {
6,346✔
391
                scheduledEvent, err := m.serializer.SerializeEvent(v.ScheduledEvent, encoding)
1,258✔
392
                if err != nil {
1,258✔
393
                        return nil, err
×
394
                }
×
395
                startedEvent, err := m.serializer.SerializeEvent(v.StartedEvent, encoding)
1,258✔
396
                if err != nil {
1,258✔
397
                        return nil, err
×
398
                }
×
399
                i := &InternalActivityInfo{
1,258✔
400
                        Version:                                 v.Version,
1,258✔
401
                        ScheduleID:                              v.ScheduleID,
1,258✔
402
                        ScheduledEventBatchID:                   v.ScheduledEventBatchID,
1,258✔
403
                        ScheduledEvent:                          scheduledEvent,
1,258✔
404
                        ScheduledTime:                           v.ScheduledTime,
1,258✔
405
                        StartedID:                               v.StartedID,
1,258✔
406
                        StartedEvent:                            startedEvent,
1,258✔
407
                        StartedTime:                             v.StartedTime,
1,258✔
408
                        ActivityID:                              v.ActivityID,
1,258✔
409
                        RequestID:                               v.RequestID,
1,258✔
410
                        Details:                                 v.Details,
1,258✔
411
                        ScheduleToStartTimeout:                  common.SecondsToDuration(int64(v.ScheduleToStartTimeout)),
1,258✔
412
                        ScheduleToCloseTimeout:                  common.SecondsToDuration(int64(v.ScheduleToCloseTimeout)),
1,258✔
413
                        StartToCloseTimeout:                     common.SecondsToDuration(int64(v.StartToCloseTimeout)),
1,258✔
414
                        HeartbeatTimeout:                        common.SecondsToDuration(int64(v.HeartbeatTimeout)),
1,258✔
415
                        CancelRequested:                         v.CancelRequested,
1,258✔
416
                        CancelRequestID:                         v.CancelRequestID,
1,258✔
417
                        LastHeartBeatUpdatedTime:                v.LastHeartBeatUpdatedTime,
1,258✔
418
                        TimerTaskStatus:                         v.TimerTaskStatus,
1,258✔
419
                        Attempt:                                 v.Attempt,
1,258✔
420
                        DomainID:                                v.DomainID,
1,258✔
421
                        StartedIdentity:                         v.StartedIdentity,
1,258✔
422
                        TaskList:                                v.TaskList,
1,258✔
423
                        HasRetryPolicy:                          v.HasRetryPolicy,
1,258✔
424
                        InitialInterval:                         common.SecondsToDuration(int64(v.InitialInterval)),
1,258✔
425
                        BackoffCoefficient:                      v.BackoffCoefficient,
1,258✔
426
                        MaximumInterval:                         common.SecondsToDuration(int64(v.MaximumInterval)),
1,258✔
427
                        ExpirationTime:                          v.ExpirationTime,
1,258✔
428
                        MaximumAttempts:                         v.MaximumAttempts,
1,258✔
429
                        NonRetriableErrors:                      v.NonRetriableErrors,
1,258✔
430
                        LastFailureReason:                       v.LastFailureReason,
1,258✔
431
                        LastWorkerIdentity:                      v.LastWorkerIdentity,
1,258✔
432
                        LastFailureDetails:                      v.LastFailureDetails,
1,258✔
433
                        LastHeartbeatTimeoutVisibilityInSeconds: v.LastHeartbeatTimeoutVisibilityInSeconds,
1,258✔
434
                }
1,258✔
435
                newInfos = append(newInfos, i)
1,258✔
436
        }
437
        return newInfos, nil
5,088✔
438
}
439

440
func (m *executionManagerImpl) SerializeExecutionInfo(
441
        info *WorkflowExecutionInfo,
442
        stats *ExecutionStats,
443
        encoding common.EncodingType,
444
) (*InternalWorkflowExecutionInfo, error) {
5,088✔
445

5,088✔
446
        if info == nil {
5,088✔
447
                return &InternalWorkflowExecutionInfo{}, nil
×
448
        }
×
449
        completionEvent, err := m.serializer.SerializeEvent(info.CompletionEvent, encoding)
5,088✔
450
        if err != nil {
5,088✔
451
                return nil, err
×
452
        }
×
453

454
        resetPoints, err := m.serializer.SerializeResetPoints(info.AutoResetPoints, encoding)
5,088✔
455
        if err != nil {
5,088✔
456
                return nil, err
×
457
        }
×
458

459
        return &InternalWorkflowExecutionInfo{
5,088✔
460
                DomainID:                           info.DomainID,
5,088✔
461
                WorkflowID:                         info.WorkflowID,
5,088✔
462
                RunID:                              info.RunID,
5,088✔
463
                FirstExecutionRunID:                info.FirstExecutionRunID,
5,088✔
464
                ParentDomainID:                     info.ParentDomainID,
5,088✔
465
                ParentWorkflowID:                   info.ParentWorkflowID,
5,088✔
466
                ParentRunID:                        info.ParentRunID,
5,088✔
467
                InitiatedID:                        info.InitiatedID,
5,088✔
468
                CompletionEventBatchID:             info.CompletionEventBatchID,
5,088✔
469
                CompletionEvent:                    completionEvent,
5,088✔
470
                TaskList:                           info.TaskList,
5,088✔
471
                WorkflowTypeName:                   info.WorkflowTypeName,
5,088✔
472
                WorkflowTimeout:                    common.SecondsToDuration(int64(info.WorkflowTimeout)),
5,088✔
473
                DecisionStartToCloseTimeout:        common.SecondsToDuration(int64(info.DecisionStartToCloseTimeout)),
5,088✔
474
                ExecutionContext:                   info.ExecutionContext,
5,088✔
475
                State:                              info.State,
5,088✔
476
                CloseStatus:                        info.CloseStatus,
5,088✔
477
                LastFirstEventID:                   info.LastFirstEventID,
5,088✔
478
                LastEventTaskID:                    info.LastEventTaskID,
5,088✔
479
                NextEventID:                        info.NextEventID,
5,088✔
480
                LastProcessedEvent:                 info.LastProcessedEvent,
5,088✔
481
                StartTimestamp:                     info.StartTimestamp,
5,088✔
482
                LastUpdatedTimestamp:               info.LastUpdatedTimestamp,
5,088✔
483
                CreateRequestID:                    info.CreateRequestID,
5,088✔
484
                SignalCount:                        info.SignalCount,
5,088✔
485
                DecisionVersion:                    info.DecisionVersion,
5,088✔
486
                DecisionScheduleID:                 info.DecisionScheduleID,
5,088✔
487
                DecisionStartedID:                  info.DecisionStartedID,
5,088✔
488
                DecisionRequestID:                  info.DecisionRequestID,
5,088✔
489
                DecisionTimeout:                    common.SecondsToDuration(int64(info.DecisionTimeout)),
5,088✔
490
                DecisionAttempt:                    info.DecisionAttempt,
5,088✔
491
                DecisionStartedTimestamp:           time.Unix(0, info.DecisionStartedTimestamp),
5,088✔
492
                DecisionScheduledTimestamp:         time.Unix(0, info.DecisionScheduledTimestamp),
5,088✔
493
                DecisionOriginalScheduledTimestamp: time.Unix(0, info.DecisionOriginalScheduledTimestamp),
5,088✔
494
                CancelRequested:                    info.CancelRequested,
5,088✔
495
                CancelRequestID:                    info.CancelRequestID,
5,088✔
496
                StickyTaskList:                     info.StickyTaskList,
5,088✔
497
                StickyScheduleToStartTimeout:       common.SecondsToDuration(int64(info.StickyScheduleToStartTimeout)),
5,088✔
498
                ClientLibraryVersion:               info.ClientLibraryVersion,
5,088✔
499
                ClientFeatureVersion:               info.ClientFeatureVersion,
5,088✔
500
                ClientImpl:                         info.ClientImpl,
5,088✔
501
                AutoResetPoints:                    resetPoints,
5,088✔
502
                Attempt:                            info.Attempt,
5,088✔
503
                HasRetryPolicy:                     info.HasRetryPolicy,
5,088✔
504
                InitialInterval:                    common.SecondsToDuration(int64(info.InitialInterval)),
5,088✔
505
                BackoffCoefficient:                 info.BackoffCoefficient,
5,088✔
506
                MaximumInterval:                    common.SecondsToDuration(int64(info.MaximumInterval)),
5,088✔
507
                ExpirationTime:                     info.ExpirationTime,
5,088✔
508
                MaximumAttempts:                    info.MaximumAttempts,
5,088✔
509
                NonRetriableErrors:                 info.NonRetriableErrors,
5,088✔
510
                BranchToken:                        info.BranchToken,
5,088✔
511
                CronSchedule:                       info.CronSchedule,
5,088✔
512
                ExpirationInterval:                 common.SecondsToDuration(int64(info.ExpirationSeconds)),
5,088✔
513
                Memo:                               info.Memo,
5,088✔
514
                SearchAttributes:                   info.SearchAttributes,
5,088✔
515

5,088✔
516
                // attributes which are not related to mutable state
5,088✔
517
                HistorySize: stats.HistorySize,
5,088✔
518
        }, nil
5,088✔
519
}
520

521
func (m *executionManagerImpl) ConflictResolveWorkflowExecution(
522
        ctx context.Context,
523
        request *ConflictResolveWorkflowExecutionRequest,
524
) (*ConflictResolveWorkflowExecutionResponse, error) {
3✔
525

3✔
526
        serializedResetWorkflowSnapshot, err := m.SerializeWorkflowSnapshot(&request.ResetWorkflowSnapshot, request.Encoding)
3✔
527
        if err != nil {
3✔
528
                return nil, err
×
529
        }
×
530
        var serializedCurrentWorkflowMutation *InternalWorkflowMutation
3✔
531
        if request.CurrentWorkflowMutation != nil {
3✔
532
                serializedCurrentWorkflowMutation, err = m.SerializeWorkflowMutation(request.CurrentWorkflowMutation, request.Encoding)
×
533
                if err != nil {
×
534
                        return nil, err
×
535
                }
×
536
        }
537
        var serializedNewWorkflowMutation *InternalWorkflowSnapshot
3✔
538
        if request.NewWorkflowSnapshot != nil {
3✔
539
                serializedNewWorkflowMutation, err = m.SerializeWorkflowSnapshot(request.NewWorkflowSnapshot, request.Encoding)
×
540
                if err != nil {
×
541
                        return nil, err
×
542
                }
×
543
        }
544

545
        newRequest := &InternalConflictResolveWorkflowExecutionRequest{
3✔
546
                RangeID: request.RangeID,
3✔
547

3✔
548
                Mode: request.Mode,
3✔
549

3✔
550
                ResetWorkflowSnapshot: *serializedResetWorkflowSnapshot,
3✔
551

3✔
552
                NewWorkflowSnapshot: serializedNewWorkflowMutation,
3✔
553

3✔
554
                CurrentWorkflowMutation: serializedCurrentWorkflowMutation,
3✔
555
        }
3✔
556
        msuss := m.statsComputer.computeMutableStateConflictResolveStats(newRequest)
3✔
557
        err = m.persistence.ConflictResolveWorkflowExecution(ctx, newRequest)
3✔
558
        if err != nil {
3✔
559
                return nil, err
×
560
        }
×
561
        return &ConflictResolveWorkflowExecutionResponse{MutableStateUpdateSessionStats: msuss}, nil
3✔
562
}
563

564
func (m *executionManagerImpl) CreateWorkflowExecution(
565
        ctx context.Context,
566
        request *CreateWorkflowExecutionRequest,
567
) (*CreateWorkflowExecutionResponse, error) {
504✔
568

504✔
569
        encoding := common.EncodingTypeThriftRW
504✔
570

504✔
571
        serializedNewWorkflowSnapshot, err := m.SerializeWorkflowSnapshot(&request.NewWorkflowSnapshot, encoding)
504✔
572
        if err != nil {
504✔
573
                return nil, err
×
574
        }
×
575

576
        newRequest := &InternalCreateWorkflowExecutionRequest{
504✔
577
                RangeID: request.RangeID,
504✔
578

504✔
579
                Mode: request.Mode,
504✔
580

504✔
581
                PreviousRunID:            request.PreviousRunID,
504✔
582
                PreviousLastWriteVersion: request.PreviousLastWriteVersion,
504✔
583

504✔
584
                NewWorkflowSnapshot: *serializedNewWorkflowSnapshot,
504✔
585
        }
504✔
586

504✔
587
        msuss := m.statsComputer.computeMutableStateCreateStats(newRequest)
504✔
588
        _, err = m.persistence.CreateWorkflowExecution(ctx, newRequest)
504✔
589
        if err != nil {
549✔
590
                return nil, err
45✔
591
        }
45✔
592
        return &CreateWorkflowExecutionResponse{MutableStateUpdateSessionStats: msuss}, nil
462✔
593
}
594

595
func (m *executionManagerImpl) SerializeWorkflowMutation(
596
        input *WorkflowMutation,
597
        encoding common.EncodingType,
598
) (*InternalWorkflowMutation, error) {
4,413✔
599

4,413✔
600
        serializedExecutionInfo, err := m.SerializeExecutionInfo(
4,413✔
601
                input.ExecutionInfo,
4,413✔
602
                input.ExecutionStats,
4,413✔
603
                encoding,
4,413✔
604
        )
4,413✔
605
        if err != nil {
4,413✔
606
                return nil, err
×
607
        }
×
608
        serializedVersionHistories, err := m.SerializeVersionHistories(input.VersionHistories, encoding)
4,413✔
609
        if err != nil {
4,413✔
610
                return nil, err
×
611
        }
×
612
        serializedUpsertActivityInfos, err := m.SerializeUpsertActivityInfos(input.UpsertActivityInfos, encoding)
4,413✔
613
        if err != nil {
4,413✔
614
                return nil, err
×
615
        }
×
616
        serializedUpsertChildExecutionInfos, err := m.SerializeUpsertChildExecutionInfos(input.UpsertChildExecutionInfos, encoding)
4,413✔
617
        if err != nil {
4,413✔
618
                return nil, err
×
619
        }
×
620
        var serializedNewBufferedEvents *DataBlob
4,413✔
621
        if input.NewBufferedEvents != nil {
5,055✔
622
                serializedNewBufferedEvents, err = m.serializer.SerializeBatchEvents(input.NewBufferedEvents, encoding)
642✔
623
                if err != nil {
642✔
624
                        return nil, err
×
625
                }
×
626
        }
627

628
        startVersion, err := getStartVersion(input.VersionHistories)
4,413✔
629
        if err != nil {
4,413✔
630
                return nil, err
×
631
        }
×
632
        lastWriteVersion, err := getLastWriteVersion(input.VersionHistories)
4,413✔
633
        if err != nil {
4,413✔
634
                return nil, err
×
635
        }
×
636

637
        return &InternalWorkflowMutation{
4,413✔
638
                ExecutionInfo:    serializedExecutionInfo,
4,413✔
639
                VersionHistories: serializedVersionHistories,
4,413✔
640
                StartVersion:     startVersion,
4,413✔
641
                LastWriteVersion: lastWriteVersion,
4,413✔
642

4,413✔
643
                UpsertActivityInfos:       serializedUpsertActivityInfos,
4,413✔
644
                DeleteActivityInfos:       input.DeleteActivityInfos,
4,413✔
645
                UpsertTimerInfos:          input.UpsertTimerInfos,
4,413✔
646
                DeleteTimerInfos:          input.DeleteTimerInfos,
4,413✔
647
                UpsertChildExecutionInfos: serializedUpsertChildExecutionInfos,
4,413✔
648
                DeleteChildExecutionInfos: input.DeleteChildExecutionInfos,
4,413✔
649
                UpsertRequestCancelInfos:  input.UpsertRequestCancelInfos,
4,413✔
650
                DeleteRequestCancelInfos:  input.DeleteRequestCancelInfos,
4,413✔
651
                UpsertSignalInfos:         input.UpsertSignalInfos,
4,413✔
652
                DeleteSignalInfos:         input.DeleteSignalInfos,
4,413✔
653
                UpsertSignalRequestedIDs:  input.UpsertSignalRequestedIDs,
4,413✔
654
                DeleteSignalRequestedIDs:  input.DeleteSignalRequestedIDs,
4,413✔
655
                NewBufferedEvents:         serializedNewBufferedEvents,
4,413✔
656
                ClearBufferedEvents:       input.ClearBufferedEvents,
4,413✔
657

4,413✔
658
                TransferTasks:     input.TransferTasks,
4,413✔
659
                CrossClusterTasks: input.CrossClusterTasks,
4,413✔
660
                ReplicationTasks:  input.ReplicationTasks,
4,413✔
661
                TimerTasks:        input.TimerTasks,
4,413✔
662

4,413✔
663
                Condition: input.Condition,
4,413✔
664
                Checksum:  input.Checksum,
4,413✔
665
        }, nil
4,413✔
666
}
667

668
func (m *executionManagerImpl) SerializeWorkflowSnapshot(
669
        input *WorkflowSnapshot,
670
        encoding common.EncodingType,
671
) (*InternalWorkflowSnapshot, error) {
678✔
672

678✔
673
        serializedExecutionInfo, err := m.SerializeExecutionInfo(
678✔
674
                input.ExecutionInfo,
678✔
675
                input.ExecutionStats,
678✔
676
                encoding,
678✔
677
        )
678✔
678
        if err != nil {
678✔
679
                return nil, err
×
680
        }
×
681
        serializedVersionHistories, err := m.SerializeVersionHistories(input.VersionHistories, encoding)
678✔
682
        if err != nil {
678✔
683
                return nil, err
×
684
        }
×
685
        serializedActivityInfos, err := m.SerializeUpsertActivityInfos(input.ActivityInfos, encoding)
678✔
686
        if err != nil {
678✔
687
                return nil, err
×
688
        }
×
689
        serializedChildExecutionInfos, err := m.SerializeUpsertChildExecutionInfos(input.ChildExecutionInfos, encoding)
678✔
690
        if err != nil {
678✔
691
                return nil, err
×
692
        }
×
693

694
        startVersion, err := getStartVersion(input.VersionHistories)
678✔
695
        if err != nil {
678✔
696
                return nil, err
×
697
        }
×
698
        lastWriteVersion, err := getLastWriteVersion(input.VersionHistories)
678✔
699
        if err != nil {
678✔
700
                return nil, err
×
701
        }
×
702

703
        return &InternalWorkflowSnapshot{
678✔
704
                ExecutionInfo:    serializedExecutionInfo,
678✔
705
                VersionHistories: serializedVersionHistories,
678✔
706
                StartVersion:     startVersion,
678✔
707
                LastWriteVersion: lastWriteVersion,
678✔
708

678✔
709
                ActivityInfos:       serializedActivityInfos,
678✔
710
                TimerInfos:          input.TimerInfos,
678✔
711
                ChildExecutionInfos: serializedChildExecutionInfos,
678✔
712
                RequestCancelInfos:  input.RequestCancelInfos,
678✔
713
                SignalInfos:         input.SignalInfos,
678✔
714
                SignalRequestedIDs:  input.SignalRequestedIDs,
678✔
715

678✔
716
                TransferTasks:     input.TransferTasks,
678✔
717
                CrossClusterTasks: input.CrossClusterTasks,
678✔
718
                ReplicationTasks:  input.ReplicationTasks,
678✔
719
                TimerTasks:        input.TimerTasks,
678✔
720

678✔
721
                Condition: input.Condition,
678✔
722
                Checksum:  input.Checksum,
678✔
723
        }, nil
678✔
724
}
725

726
func (m *executionManagerImpl) SerializeVersionHistories(
727
        versionHistories *VersionHistories,
728
        encoding common.EncodingType,
729
) (*DataBlob, error) {
5,088✔
730

5,088✔
731
        if versionHistories == nil {
5,088✔
732
                return nil, nil
×
733
        }
×
734
        return m.serializer.SerializeVersionHistories(versionHistories.ToInternalType(), encoding)
5,088✔
735
}
736

737
func (m *executionManagerImpl) DeserializeVersionHistories(
738
        blob *DataBlob,
739
) (*VersionHistories, error) {
741✔
740

741✔
741
        if blob == nil {
741✔
742
                return nil, nil
×
743
        }
×
744
        versionHistories, err := m.serializer.DeserializeVersionHistories(blob)
741✔
745
        if err != nil {
741✔
746
                return nil, err
×
747
        }
×
748
        return NewVersionHistoriesFromInternalType(versionHistories), nil
741✔
749
}
750

751
func (m *executionManagerImpl) DeleteWorkflowExecution(
752
        ctx context.Context,
753
        request *DeleteWorkflowExecutionRequest,
754
) error {
54✔
755
        return m.persistence.DeleteWorkflowExecution(ctx, request)
54✔
756
}
54✔
757

758
func (m *executionManagerImpl) DeleteCurrentWorkflowExecution(
759
        ctx context.Context,
760
        request *DeleteCurrentWorkflowExecutionRequest,
761
) error {
54✔
762
        return m.persistence.DeleteCurrentWorkflowExecution(ctx, request)
54✔
763
}
54✔
764

765
func (m *executionManagerImpl) GetCurrentExecution(
766
        ctx context.Context,
767
        request *GetCurrentExecutionRequest,
768
) (*GetCurrentExecutionResponse, error) {
183✔
769
        return m.persistence.GetCurrentExecution(ctx, request)
183✔
770
}
183✔
771

772
func (m *executionManagerImpl) ListCurrentExecutions(
773
        ctx context.Context,
774
        request *ListCurrentExecutionsRequest,
775
) (*ListCurrentExecutionsResponse, error) {
×
776
        return m.persistence.ListCurrentExecutions(ctx, request)
×
777
}
×
778

779
func (m *executionManagerImpl) IsWorkflowExecutionExists(
780
        ctx context.Context,
781
        request *IsWorkflowExecutionExistsRequest,
782
) (*IsWorkflowExecutionExistsResponse, error) {
×
783
        return m.persistence.IsWorkflowExecutionExists(ctx, request)
×
784
}
×
785

786
func (m *executionManagerImpl) ListConcreteExecutions(
787
        ctx context.Context,
788
        request *ListConcreteExecutionsRequest,
789
) (*ListConcreteExecutionsResponse, error) {
×
790
        response, err := m.persistence.ListConcreteExecutions(ctx, request)
×
791
        if err != nil {
×
792
                return nil, err
×
793
        }
×
794
        newResponse := &ListConcreteExecutionsResponse{
×
795
                Executions: make([]*ListConcreteExecutionsEntity, len(response.Executions)),
×
796
                PageToken:  response.NextPageToken,
×
797
        }
×
798
        for i, e := range response.Executions {
×
799
                info, _, err := m.DeserializeExecutionInfo(e.ExecutionInfo)
×
800
                if err != nil {
×
801
                        return nil, err
×
802
                }
×
803
                vh, err := m.DeserializeVersionHistories(e.VersionHistories)
×
804
                if err != nil {
×
805
                        return nil, err
×
806
                }
×
807
                newResponse.Executions[i] = &ListConcreteExecutionsEntity{
×
808
                        ExecutionInfo:    info,
×
809
                        VersionHistories: vh,
×
810
                }
×
811
        }
812
        return newResponse, nil
×
813
}
814

815
// Transfer task related methods
816
func (m *executionManagerImpl) GetTransferTasks(
817
        ctx context.Context,
818
        request *GetTransferTasksRequest,
819
) (*GetTransferTasksResponse, error) {
2,319✔
820
        return m.persistence.GetTransferTasks(ctx, request)
2,319✔
821
}
2,319✔
822

823
func (m *executionManagerImpl) CompleteTransferTask(
824
        ctx context.Context,
825
        request *CompleteTransferTaskRequest,
826
) error {
×
827
        return m.persistence.CompleteTransferTask(ctx, request)
×
828
}
×
829

830
func (m *executionManagerImpl) RangeCompleteTransferTask(
831
        ctx context.Context,
832
        request *RangeCompleteTransferTaskRequest,
833
) (*RangeCompleteTransferTaskResponse, error) {
105✔
834
        return m.persistence.RangeCompleteTransferTask(ctx, request)
105✔
835
}
105✔
836

837
// Cross-cluster task related methods
838
func (m *executionManagerImpl) GetCrossClusterTasks(
839
        ctx context.Context,
840
        request *GetCrossClusterTasksRequest,
841
) (*GetCrossClusterTasksResponse, error) {
165✔
842
        return m.persistence.GetCrossClusterTasks(ctx, request)
165✔
843
}
165✔
844

845
func (m *executionManagerImpl) CompleteCrossClusterTask(
846
        ctx context.Context,
847
        request *CompleteCrossClusterTaskRequest,
848
) error {
×
849
        return m.persistence.CompleteCrossClusterTask(ctx, request)
×
850
}
×
851

852
func (m *executionManagerImpl) RangeCompleteCrossClusterTask(
853
        ctx context.Context,
854
        request *RangeCompleteCrossClusterTaskRequest,
855
) (*RangeCompleteCrossClusterTaskResponse, error) {
144✔
856
        return m.persistence.RangeCompleteCrossClusterTask(ctx, request)
144✔
857
}
144✔
858

859
// Replication task related methods
860
func (m *executionManagerImpl) GetReplicationTasks(
861
        ctx context.Context,
862
        request *GetReplicationTasksRequest,
863
) (*GetReplicationTasksResponse, error) {
121✔
864
        resp, err := m.persistence.GetReplicationTasks(ctx, request)
121✔
865
        if err != nil {
121✔
866
                return nil, err
×
867
        }
×
868

869
        return &GetReplicationTasksResponse{
121✔
870
                Tasks:         m.fromInternalReplicationTaskInfos(resp.Tasks),
121✔
871
                NextPageToken: resp.NextPageToken,
121✔
872
        }, nil
121✔
873
}
874

875
func (m *executionManagerImpl) CompleteReplicationTask(
876
        ctx context.Context,
877
        request *CompleteReplicationTaskRequest,
878
) error {
×
879
        return m.persistence.CompleteReplicationTask(ctx, request)
×
880
}
×
881

882
func (m *executionManagerImpl) RangeCompleteReplicationTask(
883
        ctx context.Context,
884
        request *RangeCompleteReplicationTaskRequest,
885
) (*RangeCompleteReplicationTaskResponse, error) {
119✔
886
        return m.persistence.RangeCompleteReplicationTask(ctx, request)
119✔
887
}
119✔
888

889
func (m *executionManagerImpl) PutReplicationTaskToDLQ(
890
        ctx context.Context,
891
        request *PutReplicationTaskToDLQRequest,
892
) error {
3✔
893
        internalRequest := &InternalPutReplicationTaskToDLQRequest{
3✔
894
                SourceClusterName: request.SourceClusterName,
3✔
895
                TaskInfo:          m.toInternalReplicationTaskInfo(request.TaskInfo),
3✔
896
        }
3✔
897
        return m.persistence.PutReplicationTaskToDLQ(ctx, internalRequest)
3✔
898
}
3✔
899

900
func (m *executionManagerImpl) GetReplicationTasksFromDLQ(
901
        ctx context.Context,
902
        request *GetReplicationTasksFromDLQRequest,
903
) (*GetReplicationTasksFromDLQResponse, error) {
3✔
904
        resp, err := m.persistence.GetReplicationTasksFromDLQ(ctx, request)
3✔
905
        if err != nil {
3✔
906
                return nil, err
×
907
        }
×
908
        return &GetReplicationTasksFromDLQResponse{
3✔
909
                Tasks:         m.fromInternalReplicationTaskInfos(resp.Tasks),
3✔
910
                NextPageToken: resp.NextPageToken,
3✔
911
        }, nil
3✔
912
}
913

914
func (m *executionManagerImpl) GetReplicationDLQSize(
915
        ctx context.Context,
916
        request *GetReplicationDLQSizeRequest,
917
) (*GetReplicationDLQSizeResponse, error) {
12✔
918
        return m.persistence.GetReplicationDLQSize(ctx, request)
12✔
919
}
12✔
920

921
func (m *executionManagerImpl) DeleteReplicationTaskFromDLQ(
922
        ctx context.Context,
923
        request *DeleteReplicationTaskFromDLQRequest,
924
) error {
×
925
        return m.persistence.DeleteReplicationTaskFromDLQ(ctx, request)
×
926
}
×
927

928
func (m *executionManagerImpl) RangeDeleteReplicationTaskFromDLQ(
929
        ctx context.Context,
930
        request *RangeDeleteReplicationTaskFromDLQRequest,
931
) (*RangeDeleteReplicationTaskFromDLQResponse, error) {
×
932
        return m.persistence.RangeDeleteReplicationTaskFromDLQ(ctx, request)
×
933
}
×
934

935
func (m *executionManagerImpl) CreateFailoverMarkerTasks(
936
        ctx context.Context,
937
        request *CreateFailoverMarkersRequest,
938
) error {
×
939
        return m.persistence.CreateFailoverMarkerTasks(ctx, request)
×
940
}
×
941

942
// Timer related methods.
943
func (m *executionManagerImpl) GetTimerIndexTasks(
944
        ctx context.Context,
945
        request *GetTimerIndexTasksRequest,
946
) (*GetTimerIndexTasksResponse, error) {
3,249✔
947
        return m.persistence.GetTimerIndexTasks(ctx, request)
3,249✔
948
}
3,249✔
949

950
func (m *executionManagerImpl) CompleteTimerTask(
951
        ctx context.Context,
952
        request *CompleteTimerTaskRequest,
953
) error {
×
954
        return m.persistence.CompleteTimerTask(ctx, request)
×
955
}
×
956

957
func (m *executionManagerImpl) RangeCompleteTimerTask(
958
        ctx context.Context,
959
        request *RangeCompleteTimerTaskRequest,
960
) (*RangeCompleteTimerTaskResponse, error) {
37✔
961
        return m.persistence.RangeCompleteTimerTask(ctx, request)
37✔
962
}
37✔
963

964
func (m *executionManagerImpl) Close() {
51✔
965
        m.persistence.Close()
51✔
966
}
51✔
967

968
func (m *executionManagerImpl) fromInternalReplicationTaskInfos(internalInfos []*InternalReplicationTaskInfo) []*ReplicationTaskInfo {
123✔
969
        if internalInfos == nil {
246✔
970
                return nil
123✔
971
        }
123✔
972
        infos := make([]*ReplicationTaskInfo, len(internalInfos))
3✔
973
        for i := 0; i < len(internalInfos); i++ {
6✔
974
                infos[i] = m.fromInternalReplicationTaskInfo(internalInfos[i])
3✔
975
        }
3✔
976
        return infos
3✔
977
}
978

979
func (m *executionManagerImpl) fromInternalReplicationTaskInfo(internalInfo *InternalReplicationTaskInfo) *ReplicationTaskInfo {
3✔
980
        if internalInfo == nil {
3✔
981
                return nil
×
982
        }
×
983
        return &ReplicationTaskInfo{
3✔
984
                DomainID:          internalInfo.DomainID,
3✔
985
                WorkflowID:        internalInfo.WorkflowID,
3✔
986
                RunID:             internalInfo.RunID,
3✔
987
                TaskID:            internalInfo.TaskID,
3✔
988
                TaskType:          internalInfo.TaskType,
3✔
989
                FirstEventID:      internalInfo.FirstEventID,
3✔
990
                NextEventID:       internalInfo.NextEventID,
3✔
991
                Version:           internalInfo.Version,
3✔
992
                ScheduledID:       internalInfo.ScheduledID,
3✔
993
                BranchToken:       internalInfo.BranchToken,
3✔
994
                NewRunBranchToken: internalInfo.NewRunBranchToken,
3✔
995
                CreationTime:      internalInfo.CreationTime.UnixNano(),
3✔
996
        }
3✔
997
}
998

999
func (m *executionManagerImpl) toInternalReplicationTaskInfo(info *ReplicationTaskInfo) *InternalReplicationTaskInfo {
3✔
1000
        if info == nil {
3✔
1001
                return nil
×
1002
        }
×
1003
        return &InternalReplicationTaskInfo{
3✔
1004
                DomainID:          info.DomainID,
3✔
1005
                WorkflowID:        info.WorkflowID,
3✔
1006
                RunID:             info.RunID,
3✔
1007
                TaskID:            info.TaskID,
3✔
1008
                TaskType:          info.TaskType,
3✔
1009
                FirstEventID:      info.FirstEventID,
3✔
1010
                NextEventID:       info.NextEventID,
3✔
1011
                Version:           info.Version,
3✔
1012
                ScheduledID:       info.ScheduledID,
3✔
1013
                BranchToken:       info.BranchToken,
3✔
1014
                NewRunBranchToken: info.NewRunBranchToken,
3✔
1015
                CreationTime:      time.Unix(0, info.CreationTime),
3✔
1016
        }
3✔
1017
}
1018

1019
func getStartVersion(
1020
        versionHistories *VersionHistories,
1021
) (int64, error) {
5,088✔
1022

5,088✔
1023
        if versionHistories == nil {
5,088✔
1024
                return common.EmptyVersion, nil
×
1025
        }
×
1026

1027
        versionHistory, err := versionHistories.GetCurrentVersionHistory()
5,088✔
1028
        if err != nil {
5,088✔
1029
                return 0, err
×
1030
        }
×
1031
        versionHistoryItem, err := versionHistory.GetFirstItem()
5,088✔
1032
        if err != nil {
5,088✔
1033
                return 0, err
×
1034
        }
×
1035
        return versionHistoryItem.Version, nil
5,088✔
1036
}
1037

1038
func getLastWriteVersion(
1039
        versionHistories *VersionHistories,
1040
) (int64, error) {
5,088✔
1041

5,088✔
1042
        if versionHistories == nil {
5,088✔
1043
                return common.EmptyVersion, nil
×
1044
        }
×
1045

1046
        versionHistory, err := versionHistories.GetCurrentVersionHistory()
5,088✔
1047
        if err != nil {
5,088✔
1048
                return 0, err
×
1049
        }
×
1050
        versionHistoryItem, err := versionHistory.GetLastItem()
5,088✔
1051
        if err != nil {
5,088✔
1052
                return 0, err
×
1053
        }
×
1054
        return versionHistoryItem.Version, nil
5,088✔
1055
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc