• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018dfb1b-c20b-4820-815a-b6aed230920a

01 Mar 2024 05:41PM UTC coverage: 62.893% (+0.03%) from 62.868%
018dfb1b-c20b-4820-815a-b6aed230920a

push

buildkite

web-flow
Do not get workflow execution from database when shard is closed (#5697)

7 of 7 new or added lines in 3 files covered. (100.0%)

66 existing lines in 10 files now uncovered.

92963 of 147811 relevant lines covered (62.89%)

2320.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.84
/common/persistence/sql/workflowStateMaps.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package sql
23

24
import (
25
        "context"
26
        "database/sql"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/persistence"
30
        "github.com/uber/cadence/common/persistence/serialization"
31
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
32
        "github.com/uber/cadence/common/types"
33
)
34

35
func updateActivityInfos(
36
        ctx context.Context,
37
        tx sqlplugin.Tx,
38
        activityInfos []*persistence.InternalActivityInfo,
39
        deleteInfos []int64,
40
        shardID int,
41
        domainID serialization.UUID,
42
        workflowID string,
43
        runID serialization.UUID,
44
        parser serialization.Parser,
45
) error {
3,336✔
46

3,336✔
47
        if len(activityInfos) > 0 {
4,110✔
48
                rows := make([]sqlplugin.ActivityInfoMapsRow, len(activityInfos))
774✔
49
                for i, activityInfo := range activityInfos {
1,628✔
50
                        scheduledEvent, scheduledEncoding := persistence.FromDataBlob(activityInfo.ScheduledEvent)
854✔
51
                        startEvent, startEncoding := persistence.FromDataBlob(activityInfo.StartedEvent)
854✔
52

854✔
53
                        info := &serialization.ActivityInfo{
854✔
54
                                Version:                  activityInfo.Version,
854✔
55
                                ScheduledEventBatchID:    activityInfo.ScheduledEventBatchID,
854✔
56
                                ScheduledEvent:           scheduledEvent,
854✔
57
                                ScheduledEventEncoding:   scheduledEncoding,
854✔
58
                                ScheduledTimestamp:       activityInfo.ScheduledTime,
854✔
59
                                StartedID:                activityInfo.StartedID,
854✔
60
                                StartedEvent:             startEvent,
854✔
61
                                StartedEventEncoding:     startEncoding,
854✔
62
                                StartedTimestamp:         activityInfo.StartedTime,
854✔
63
                                ActivityID:               activityInfo.ActivityID,
854✔
64
                                RequestID:                activityInfo.RequestID,
854✔
65
                                ScheduleToStartTimeout:   activityInfo.ScheduleToStartTimeout,
854✔
66
                                ScheduleToCloseTimeout:   activityInfo.ScheduleToCloseTimeout,
854✔
67
                                StartToCloseTimeout:      activityInfo.StartToCloseTimeout,
854✔
68
                                HeartbeatTimeout:         activityInfo.HeartbeatTimeout,
854✔
69
                                CancelRequested:          activityInfo.CancelRequested,
854✔
70
                                CancelRequestID:          activityInfo.CancelRequestID,
854✔
71
                                TimerTaskStatus:          activityInfo.TimerTaskStatus,
854✔
72
                                Attempt:                  activityInfo.Attempt,
854✔
73
                                TaskList:                 activityInfo.TaskList,
854✔
74
                                StartedIdentity:          activityInfo.StartedIdentity,
854✔
75
                                HasRetryPolicy:           activityInfo.HasRetryPolicy,
854✔
76
                                RetryInitialInterval:     activityInfo.InitialInterval,
854✔
77
                                RetryBackoffCoefficient:  activityInfo.BackoffCoefficient,
854✔
78
                                RetryMaximumInterval:     activityInfo.MaximumInterval,
854✔
79
                                RetryExpirationTimestamp: activityInfo.ExpirationTime,
854✔
80
                                RetryMaximumAttempts:     activityInfo.MaximumAttempts,
854✔
81
                                RetryNonRetryableErrors:  activityInfo.NonRetriableErrors,
854✔
82
                                RetryLastFailureReason:   activityInfo.LastFailureReason,
854✔
83
                                RetryLastWorkerIdentity:  activityInfo.LastWorkerIdentity,
854✔
84
                                RetryLastFailureDetails:  activityInfo.LastFailureDetails,
854✔
85
                        }
854✔
86
                        blob, err := parser.ActivityInfoToBlob(info)
854✔
87
                        if err != nil {
854✔
88
                                return err
×
89
                        }
×
90
                        rows[i] = sqlplugin.ActivityInfoMapsRow{
854✔
91
                                ShardID:                  int64(shardID),
854✔
92
                                DomainID:                 domainID,
854✔
93
                                WorkflowID:               workflowID,
854✔
94
                                RunID:                    runID,
854✔
95
                                ScheduleID:               activityInfo.ScheduleID,
854✔
96
                                LastHeartbeatUpdatedTime: activityInfo.LastHeartBeatUpdatedTime,
854✔
97
                                LastHeartbeatDetails:     activityInfo.Details,
854✔
98
                                Data:                     blob.Data,
854✔
99
                                DataEncoding:             string(blob.Encoding),
854✔
100
                        }
854✔
101
                }
102

103
                if _, err := tx.ReplaceIntoActivityInfoMaps(ctx, rows); err != nil {
774✔
104
                        return convertCommonErrors(tx, "updateActivityInfos", "Failed to execute update query.", err)
×
105
                }
×
106
        }
107

108
        if len(deleteInfos) > 0 {
3,585✔
109
                if _, err := tx.DeleteFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
249✔
110
                        ShardID:     int64(shardID),
249✔
111
                        DomainID:    domainID,
249✔
112
                        WorkflowID:  workflowID,
249✔
113
                        RunID:       runID,
249✔
114
                        ScheduleIDs: deleteInfos,
249✔
115
                }); err != nil {
249✔
116
                        return convertCommonErrors(tx, "updateActivityInfos", "Failed to execute delete query.", err)
×
117
                }
×
118
        }
119

120
        return nil
3,336✔
121
}
122

123
func getActivityInfoMap(
124
        ctx context.Context,
125
        db sqlplugin.DB,
126
        shardID int,
127
        domainID serialization.UUID,
128
        workflowID string,
129
        runID serialization.UUID,
130
        parser serialization.Parser,
131
) (map[int64]*persistence.InternalActivityInfo, error) {
789✔
132

789✔
133
        rows, err := db.SelectFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
789✔
134
                ShardID:    int64(shardID),
789✔
135
                DomainID:   domainID,
789✔
136
                WorkflowID: workflowID,
789✔
137
                RunID:      runID,
789✔
138
        })
789✔
139
        if err != nil && err != sql.ErrNoRows {
912✔
140
                return nil, convertCommonErrors(db, "getActivityInfoMap", "", err)
123✔
141
        }
123✔
142

143
        ret := make(map[int64]*persistence.InternalActivityInfo)
668✔
144
        for _, row := range rows {
730✔
145
                decoded, err := parser.ActivityInfoFromBlob(row.Data, row.DataEncoding)
62✔
146
                if err != nil {
62✔
147
                        return nil, err
×
148
                }
×
149
                info := &persistence.InternalActivityInfo{
62✔
150
                        DomainID:                 row.DomainID.String(),
62✔
151
                        ScheduleID:               row.ScheduleID,
62✔
152
                        Details:                  row.LastHeartbeatDetails,
62✔
153
                        LastHeartBeatUpdatedTime: row.LastHeartbeatUpdatedTime,
62✔
154
                        Version:                  decoded.GetVersion(),
62✔
155
                        ScheduledEventBatchID:    decoded.GetScheduledEventBatchID(),
62✔
156
                        ScheduledEvent:           persistence.NewDataBlob(decoded.ScheduledEvent, common.EncodingType(decoded.GetScheduledEventEncoding())),
62✔
157
                        ScheduledTime:            decoded.GetScheduledTimestamp(),
62✔
158
                        StartedID:                decoded.GetStartedID(),
62✔
159
                        StartedTime:              decoded.GetStartedTimestamp(),
62✔
160
                        ActivityID:               decoded.GetActivityID(),
62✔
161
                        RequestID:                decoded.GetRequestID(),
62✔
162
                        ScheduleToStartTimeout:   decoded.GetScheduleToStartTimeout(),
62✔
163
                        ScheduleToCloseTimeout:   decoded.GetScheduleToCloseTimeout(),
62✔
164
                        StartToCloseTimeout:      decoded.GetStartToCloseTimeout(),
62✔
165
                        HeartbeatTimeout:         decoded.GetHeartbeatTimeout(),
62✔
166
                        CancelRequested:          decoded.GetCancelRequested(),
62✔
167
                        CancelRequestID:          decoded.GetCancelRequestID(),
62✔
168
                        TimerTaskStatus:          decoded.GetTimerTaskStatus(),
62✔
169
                        Attempt:                  decoded.GetAttempt(),
62✔
170
                        StartedIdentity:          decoded.GetStartedIdentity(),
62✔
171
                        TaskList:                 decoded.GetTaskList(),
62✔
172
                        HasRetryPolicy:           decoded.GetHasRetryPolicy(),
62✔
173
                        InitialInterval:          decoded.GetRetryInitialInterval(),
62✔
174
                        BackoffCoefficient:       decoded.GetRetryBackoffCoefficient(),
62✔
175
                        MaximumInterval:          decoded.GetRetryMaximumInterval(),
62✔
176
                        ExpirationTime:           decoded.GetRetryExpirationTimestamp(),
62✔
177
                        MaximumAttempts:          decoded.GetRetryMaximumAttempts(),
62✔
178
                        NonRetriableErrors:       decoded.GetRetryNonRetryableErrors(),
62✔
179
                        LastFailureReason:        decoded.GetRetryLastFailureReason(),
62✔
180
                        LastWorkerIdentity:       decoded.GetRetryLastWorkerIdentity(),
62✔
181
                        LastFailureDetails:       decoded.GetRetryLastFailureDetails(),
62✔
182
                }
62✔
183
                if decoded.StartedEvent != nil {
62✔
184
                        info.StartedEvent = persistence.NewDataBlob(decoded.StartedEvent, common.EncodingType(decoded.GetStartedEventEncoding()))
×
185
                }
×
186
                ret[row.ScheduleID] = info
62✔
187
        }
188

189
        return ret, nil
668✔
190
}
191

192
func deleteActivityInfoMap(
193
        ctx context.Context,
194
        tx sqlplugin.Tx,
195
        shardID int,
196
        domainID serialization.UUID,
197
        workflowID string,
198
        runID serialization.UUID,
199
) error {
3✔
200

3✔
201
        if _, err := tx.DeleteFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
3✔
202
                ShardID:    int64(shardID),
3✔
203
                DomainID:   domainID,
3✔
204
                WorkflowID: workflowID,
3✔
205
                RunID:      runID,
3✔
206
        }); err != nil {
3✔
207
                return convertCommonErrors(tx, "deleteActivityInfoMap", "", err)
×
208
        }
×
209
        return nil
3✔
210
}
211

212
func updateTimerInfos(
213
        ctx context.Context,
214
        tx sqlplugin.Tx,
215
        timerInfos []*persistence.TimerInfo,
216
        deleteInfos []string,
217
        shardID int,
218
        domainID serialization.UUID,
219
        workflowID string,
220
        runID serialization.UUID,
221
        parser serialization.Parser,
222
) error {
3,336✔
223

3,336✔
224
        if len(timerInfos) > 0 {
3,361✔
225
                rows := make([]sqlplugin.TimerInfoMapsRow, len(timerInfos))
25✔
226
                for i, timerInfo := range timerInfos {
50✔
227
                        blob, err := parser.TimerInfoToBlob(&serialization.TimerInfo{
25✔
228
                                Version:         timerInfo.Version,
25✔
229
                                StartedID:       timerInfo.StartedID,
25✔
230
                                ExpiryTimestamp: timerInfo.ExpiryTime,
25✔
231
                                // TaskID is a misleading variable, it actually serves
25✔
232
                                // the purpose of indicating whether a timer task is
25✔
233
                                // generated for this timer info
25✔
234
                                TaskID: timerInfo.TaskStatus,
25✔
235
                        })
25✔
236
                        if err != nil {
25✔
237
                                return err
×
238
                        }
×
239
                        rows[i] = sqlplugin.TimerInfoMapsRow{
25✔
240
                                ShardID:      int64(shardID),
25✔
241
                                DomainID:     domainID,
25✔
242
                                WorkflowID:   workflowID,
25✔
243
                                RunID:        runID,
25✔
244
                                TimerID:      timerInfo.TimerID,
25✔
245
                                Data:         blob.Data,
25✔
246
                                DataEncoding: string(blob.Encoding),
25✔
247
                        }
25✔
248
                }
249
                if _, err := tx.ReplaceIntoTimerInfoMaps(ctx, rows); err != nil {
25✔
250
                        return convertCommonErrors(tx, "updateTimerInfos", "Failed to execute update query.", err)
×
251
                }
×
252
        }
253

254
        if len(deleteInfos) > 0 {
3,351✔
255
                if _, err := tx.DeleteFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
15✔
256
                        ShardID:    int64(shardID),
15✔
257
                        DomainID:   domainID,
15✔
258
                        WorkflowID: workflowID,
15✔
259
                        RunID:      runID,
15✔
260
                        TimerIDs:   deleteInfos,
15✔
261
                }); err != nil {
15✔
262
                        return convertCommonErrors(tx, "updateTimerInfos", "Failed to execute delete query.", err)
×
263
                }
×
264
        }
265

266
        return nil
3,336✔
267
}
268

269
func getTimerInfoMap(
270
        ctx context.Context,
271
        db sqlplugin.DB,
272
        shardID int,
273
        domainID serialization.UUID,
274
        workflowID string,
275
        runID serialization.UUID,
276
        parser serialization.Parser,
277
) (map[string]*persistence.TimerInfo, error) {
789✔
278

789✔
279
        rows, err := db.SelectFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
789✔
280
                ShardID:    int64(shardID),
789✔
281
                DomainID:   domainID,
789✔
282
                WorkflowID: workflowID,
789✔
283
                RunID:      runID,
789✔
284
        })
789✔
285
        if err != nil && err != sql.ErrNoRows {
897✔
286
                return nil, convertCommonErrors(db, "getTimerInfoMap", "", err)
108✔
287
        }
108✔
288
        ret := make(map[string]*persistence.TimerInfo)
682✔
289
        for _, row := range rows {
683✔
290
                info, err := parser.TimerInfoFromBlob(row.Data, row.DataEncoding)
1✔
291
                if err != nil {
1✔
292
                        return nil, err
×
293
                }
×
294
                ret[row.TimerID] = &persistence.TimerInfo{
1✔
295
                        TimerID:    row.TimerID,
1✔
296
                        Version:    info.GetVersion(),
1✔
297
                        StartedID:  info.GetStartedID(),
1✔
298
                        ExpiryTime: info.GetExpiryTimestamp(),
1✔
299
                        // TaskID is a misleading variable, it actually serves
1✔
300
                        // the purpose of indicating whether a timer task is
1✔
301
                        // generated for this timer info
1✔
302
                        TaskStatus: info.GetTaskID(),
1✔
303
                }
1✔
304
        }
305

306
        return ret, nil
682✔
307
}
308

309
func deleteTimerInfoMap(
310
        ctx context.Context,
311
        tx sqlplugin.Tx,
312
        shardID int,
313
        domainID serialization.UUID,
314
        workflowID string,
315
        runID serialization.UUID,
316
) error {
3✔
317

3✔
318
        if _, err := tx.DeleteFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
3✔
319
                ShardID:    int64(shardID),
3✔
320
                DomainID:   domainID,
3✔
321
                WorkflowID: workflowID,
3✔
322
                RunID:      runID,
3✔
323
        }); err != nil {
3✔
324
                return convertCommonErrors(tx, "deleteTimerInfoMap", "", err)
×
325
        }
×
326
        return nil
3✔
327
}
328

329
func updateChildExecutionInfos(
330
        ctx context.Context,
331
        tx sqlplugin.Tx,
332
        childExecutionInfos []*persistence.InternalChildExecutionInfo,
333
        deleteInfos []int64,
334
        shardID int,
335
        domainID serialization.UUID,
336
        workflowID string,
337
        runID serialization.UUID,
338
        parser serialization.Parser,
339
) error {
3,336✔
340

3,336✔
341
        if len(childExecutionInfos) > 0 {
3,365✔
342
                rows := make([]sqlplugin.ChildExecutionInfoMapsRow, len(childExecutionInfos))
29✔
343
                for i, childExecutionInfo := range childExecutionInfos {
58✔
344
                        initiateEvent, initiateEncoding := persistence.FromDataBlob(childExecutionInfo.InitiatedEvent)
29✔
345
                        startEvent, startEncoding := persistence.FromDataBlob(childExecutionInfo.StartedEvent)
29✔
346

29✔
347
                        info := &serialization.ChildExecutionInfo{
29✔
348
                                Version:                childExecutionInfo.Version,
29✔
349
                                InitiatedEventBatchID:  childExecutionInfo.InitiatedEventBatchID,
29✔
350
                                InitiatedEvent:         initiateEvent,
29✔
351
                                InitiatedEventEncoding: initiateEncoding,
29✔
352
                                StartedEvent:           startEvent,
29✔
353
                                StartedEventEncoding:   startEncoding,
29✔
354
                                StartedID:              childExecutionInfo.StartedID,
29✔
355
                                StartedWorkflowID:      childExecutionInfo.StartedWorkflowID,
29✔
356
                                StartedRunID:           serialization.MustParseUUID(childExecutionInfo.StartedRunID),
29✔
357
                                CreateRequestID:        childExecutionInfo.CreateRequestID,
29✔
358
                                DomainID:               childExecutionInfo.DomainID,
29✔
359
                                DomainNameDEPRECATED:   childExecutionInfo.DomainNameDEPRECATED,
29✔
360
                                WorkflowTypeName:       childExecutionInfo.WorkflowTypeName,
29✔
361
                                ParentClosePolicy:      int32(childExecutionInfo.ParentClosePolicy),
29✔
362
                        }
29✔
363
                        blob, err := parser.ChildExecutionInfoToBlob(info)
29✔
364
                        if err != nil {
29✔
365
                                return err
×
366
                        }
×
367
                        rows[i] = sqlplugin.ChildExecutionInfoMapsRow{
29✔
368
                                ShardID:      int64(shardID),
29✔
369
                                DomainID:     domainID,
29✔
370
                                WorkflowID:   workflowID,
29✔
371
                                RunID:        runID,
29✔
372
                                InitiatedID:  childExecutionInfo.InitiatedID,
29✔
373
                                Data:         blob.Data,
29✔
374
                                DataEncoding: string(blob.Encoding),
29✔
375
                        }
29✔
376
                }
377
                if _, err := tx.ReplaceIntoChildExecutionInfoMaps(ctx, rows); err != nil {
29✔
378
                        return convertCommonErrors(tx, "updateChildExecutionInfos", "Failed to execute update query.", err)
×
379
                }
×
380
        }
381

382
        if len(deleteInfos) > 0 {
3,349✔
383
                if _, err := tx.DeleteFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
13✔
384
                        ShardID:      int64(shardID),
13✔
385
                        DomainID:     domainID,
13✔
386
                        WorkflowID:   workflowID,
13✔
387
                        RunID:        runID,
13✔
388
                        InitiatedIDs: deleteInfos,
13✔
389
                }); err != nil {
13✔
390
                        return convertCommonErrors(tx, "updateChildExecutionInfos", "Failed to execute delete query.", err)
×
391
                }
×
392
        }
393

394
        return nil
3,336✔
395
}
396

397
func getChildExecutionInfoMap(
398
        ctx context.Context,
399
        db sqlplugin.DB,
400
        shardID int,
401
        domainID serialization.UUID,
402
        workflowID string,
403
        runID serialization.UUID,
404
        parser serialization.Parser,
405
) (map[int64]*persistence.InternalChildExecutionInfo, error) {
789✔
406

789✔
407
        rows, err := db.SelectFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
789✔
408
                ShardID:    int64(shardID),
789✔
409
                DomainID:   domainID,
789✔
410
                WorkflowID: workflowID,
789✔
411
                RunID:      runID,
789✔
412
        })
789✔
413
        if err != nil && err != sql.ErrNoRows {
899✔
414
                return nil, convertCommonErrors(db, "getChildExecutionInfoMap", "", err)
110✔
415
        }
110✔
416

417
        ret := make(map[int64]*persistence.InternalChildExecutionInfo)
680✔
418
        for _, row := range rows {
682✔
419
                rowInfo, err := parser.ChildExecutionInfoFromBlob(row.Data, row.DataEncoding)
2✔
420
                if err != nil {
2✔
421
                        return nil, err
×
422
                }
×
423
                info := &persistence.InternalChildExecutionInfo{
2✔
424
                        InitiatedID:           row.InitiatedID,
2✔
425
                        InitiatedEventBatchID: rowInfo.GetInitiatedEventBatchID(),
2✔
426
                        Version:               rowInfo.GetVersion(),
2✔
427
                        StartedID:             rowInfo.GetStartedID(),
2✔
428
                        StartedWorkflowID:     rowInfo.GetStartedWorkflowID(),
2✔
429
                        StartedRunID:          serialization.UUID(rowInfo.GetStartedRunID()).String(),
2✔
430
                        CreateRequestID:       rowInfo.GetCreateRequestID(),
2✔
431
                        DomainID:              rowInfo.GetDomainID(),
2✔
432
                        DomainNameDEPRECATED:  rowInfo.GetDomainNameDEPRECATED(),
2✔
433
                        WorkflowTypeName:      rowInfo.GetWorkflowTypeName(),
2✔
434
                        ParentClosePolicy:     types.ParentClosePolicy(rowInfo.GetParentClosePolicy()),
2✔
435
                }
2✔
436
                if rowInfo.InitiatedEvent != nil {
2✔
437
                        info.InitiatedEvent = persistence.NewDataBlob(rowInfo.InitiatedEvent, common.EncodingType(rowInfo.GetInitiatedEventEncoding()))
×
438
                }
×
439
                if rowInfo.StartedEvent != nil {
2✔
440
                        info.StartedEvent = persistence.NewDataBlob(rowInfo.StartedEvent, common.EncodingType(rowInfo.GetStartedEventEncoding()))
×
441
                }
×
442
                ret[row.InitiatedID] = info
2✔
443
        }
444

445
        return ret, nil
680✔
446
}
447

448
func deleteChildExecutionInfoMap(
449
        ctx context.Context,
450
        tx sqlplugin.Tx,
451
        shardID int,
452
        domainID serialization.UUID,
453
        workflowID string,
454
        runID serialization.UUID,
455
) error {
3✔
456

3✔
457
        if _, err := tx.DeleteFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
3✔
458
                ShardID:    int64(shardID),
3✔
459
                DomainID:   domainID,
3✔
460
                WorkflowID: workflowID,
3✔
461
                RunID:      runID,
3✔
462
        }); err != nil {
3✔
463
                return convertCommonErrors(tx, "deleteChildExecutionInfoMap", "", err)
×
464
        }
×
465
        return nil
3✔
466
}
467

468
func updateRequestCancelInfos(
469
        ctx context.Context,
470
        tx sqlplugin.Tx,
471
        requestCancelInfos []*persistence.RequestCancelInfo,
472
        deleteInfos []int64,
473
        shardID int,
474
        domainID serialization.UUID,
475
        workflowID string,
476
        runID serialization.UUID,
477
        parser serialization.Parser,
478
) error {
3,336✔
479

3,336✔
480
        if len(requestCancelInfos) > 0 {
3,345✔
481
                rows := make([]sqlplugin.RequestCancelInfoMapsRow, len(requestCancelInfos))
9✔
482
                for i, requestCancelInfo := range requestCancelInfos {
18✔
483
                        blob, err := parser.RequestCancelInfoToBlob(&serialization.RequestCancelInfo{
9✔
484
                                Version:               requestCancelInfo.Version,
9✔
485
                                InitiatedEventBatchID: requestCancelInfo.InitiatedEventBatchID,
9✔
486
                                CancelRequestID:       requestCancelInfo.CancelRequestID,
9✔
487
                        })
9✔
488
                        if err != nil {
9✔
489
                                return err
×
490
                        }
×
491
                        rows[i] = sqlplugin.RequestCancelInfoMapsRow{
9✔
492
                                ShardID:      int64(shardID),
9✔
493
                                DomainID:     domainID,
9✔
494
                                WorkflowID:   workflowID,
9✔
495
                                RunID:        runID,
9✔
496
                                InitiatedID:  requestCancelInfo.InitiatedID,
9✔
497
                                Data:         blob.Data,
9✔
498
                                DataEncoding: string(blob.Encoding),
9✔
499
                        }
9✔
500
                }
501

502
                if _, err := tx.ReplaceIntoRequestCancelInfoMaps(ctx, rows); err != nil {
9✔
503
                        return convertCommonErrors(tx, "updateRequestCancelInfos", "Failed to execute update query.", err)
×
504
                }
×
505
        }
506

507
        if len(deleteInfos) > 0 {
3,343✔
508
                if _, err := tx.DeleteFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
7✔
509
                        ShardID:      int64(shardID),
7✔
510
                        DomainID:     domainID,
7✔
511
                        WorkflowID:   workflowID,
7✔
512
                        RunID:        runID,
7✔
513
                        InitiatedIDs: deleteInfos,
7✔
514
                }); err != nil {
7✔
515
                        return convertCommonErrors(tx, "updateRequestCancelInfos", "Failed to execute delete query.", err)
×
516
                }
×
517
        }
518

519
        return nil
3,336✔
520
}
521

522
func getRequestCancelInfoMap(
523
        ctx context.Context,
524
        db sqlplugin.DB,
525
        shardID int,
526
        domainID serialization.UUID,
527
        workflowID string,
528
        runID serialization.UUID,
529
        parser serialization.Parser,
530
) (map[int64]*persistence.RequestCancelInfo, error) {
789✔
531

789✔
532
        rows, err := db.SelectFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
789✔
533
                ShardID:    int64(shardID),
789✔
534
                DomainID:   domainID,
789✔
535
                WorkflowID: workflowID,
789✔
536
                RunID:      runID,
789✔
537
        })
789✔
538
        if err != nil && err != sql.ErrNoRows {
899✔
539
                return nil, convertCommonErrors(db, "getRequestCancelInfoMap", "", err)
110✔
540
        }
110✔
541

542
        ret := make(map[int64]*persistence.RequestCancelInfo)
681✔
543
        for _, row := range rows {
681✔
544
                rowInfo, err := parser.RequestCancelInfoFromBlob(row.Data, row.DataEncoding)
×
545
                if err != nil {
×
546
                        return nil, err
×
547
                }
×
548
                ret[row.InitiatedID] = &persistence.RequestCancelInfo{
×
549
                        Version:               rowInfo.GetVersion(),
×
550
                        InitiatedID:           row.InitiatedID,
×
551
                        InitiatedEventBatchID: rowInfo.GetInitiatedEventBatchID(),
×
552
                        CancelRequestID:       rowInfo.GetCancelRequestID(),
×
553
                }
×
554
        }
555

556
        return ret, nil
681✔
557
}
558

559
func deleteRequestCancelInfoMap(
560
        ctx context.Context,
561
        tx sqlplugin.Tx,
562
        shardID int,
563
        domainID serialization.UUID,
564
        workflowID string,
565
        runID serialization.UUID,
566
) error {
3✔
567

3✔
568
        if _, err := tx.DeleteFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
3✔
569
                ShardID:    int64(shardID),
3✔
570
                DomainID:   domainID,
3✔
571
                WorkflowID: workflowID,
3✔
572
                RunID:      runID,
3✔
573
        }); err != nil {
3✔
574
                return convertCommonErrors(tx, "deleteRequestCancelInfoMap", "", err)
×
575
        }
×
576
        return nil
3✔
577
}
578

579
func updateSignalInfos(
580
        ctx context.Context,
581
        tx sqlplugin.Tx,
582
        signalInfos []*persistence.SignalInfo,
583
        deleteInfos []int64,
584
        shardID int,
585
        domainID serialization.UUID,
586
        workflowID string,
587
        runID serialization.UUID,
588
        parser serialization.Parser,
589
) error {
3,336✔
590

3,336✔
591
        if len(signalInfos) > 0 {
3,349✔
592
                rows := make([]sqlplugin.SignalInfoMapsRow, len(signalInfos))
13✔
593
                for i, signalInfo := range signalInfos {
26✔
594
                        blob, err := parser.SignalInfoToBlob(&serialization.SignalInfo{
13✔
595
                                Version:               signalInfo.Version,
13✔
596
                                InitiatedEventBatchID: signalInfo.InitiatedEventBatchID,
13✔
597
                                RequestID:             signalInfo.SignalRequestID,
13✔
598
                                Name:                  signalInfo.SignalName,
13✔
599
                                Input:                 signalInfo.Input,
13✔
600
                                Control:               signalInfo.Control,
13✔
601
                        })
13✔
602
                        if err != nil {
13✔
603
                                return err
×
604
                        }
×
605
                        rows[i] = sqlplugin.SignalInfoMapsRow{
13✔
606
                                ShardID:      int64(shardID),
13✔
607
                                DomainID:     domainID,
13✔
608
                                WorkflowID:   workflowID,
13✔
609
                                RunID:        runID,
13✔
610
                                InitiatedID:  signalInfo.InitiatedID,
13✔
611
                                Data:         blob.Data,
13✔
612
                                DataEncoding: string(blob.Encoding),
13✔
613
                        }
13✔
614
                }
615

616
                if _, err := tx.ReplaceIntoSignalInfoMaps(ctx, rows); err != nil {
13✔
617
                        return convertCommonErrors(tx, "updateSignalInfos", "Failed to execute update query.", err)
×
618
                }
×
619
        }
620

621
        if len(deleteInfos) > 0 {
3,347✔
622
                if _, err := tx.DeleteFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
11✔
623
                        ShardID:      int64(shardID),
11✔
624
                        DomainID:     domainID,
11✔
625
                        WorkflowID:   workflowID,
11✔
626
                        RunID:        runID,
11✔
627
                        InitiatedIDs: deleteInfos,
11✔
628
                }); err != nil {
11✔
629
                        return convertCommonErrors(tx, "updateSignalInfos", "Failed to execute delete query.", err)
×
630
                }
×
631
        }
632

633
        return nil
3,336✔
634
}
635

636
func getSignalInfoMap(
637
        ctx context.Context,
638
        db sqlplugin.DB,
639
        shardID int,
640
        domainID serialization.UUID,
641
        workflowID string,
642
        runID serialization.UUID,
643
        parser serialization.Parser,
644
) (map[int64]*persistence.SignalInfo, error) {
789✔
645

789✔
646
        rows, err := db.SelectFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
789✔
647
                ShardID:    int64(shardID),
789✔
648
                DomainID:   domainID,
789✔
649
                WorkflowID: workflowID,
789✔
650
                RunID:      runID,
789✔
651
        })
789✔
652
        if err != nil && err != sql.ErrNoRows {
890✔
653
                return nil, convertCommonErrors(db, "getSignalInfoMap", "", err)
101✔
654
        }
101✔
655

656
        ret := make(map[int64]*persistence.SignalInfo)
690✔
657
        for _, row := range rows {
690✔
UNCOV
658
                rowInfo, err := parser.SignalInfoFromBlob(row.Data, row.DataEncoding)
×
UNCOV
659
                if err != nil {
×
660
                        return nil, err
×
661
                }
×
UNCOV
662
                ret[row.InitiatedID] = &persistence.SignalInfo{
×
UNCOV
663
                        Version:               rowInfo.GetVersion(),
×
UNCOV
664
                        InitiatedID:           row.InitiatedID,
×
UNCOV
665
                        InitiatedEventBatchID: rowInfo.GetInitiatedEventBatchID(),
×
UNCOV
666
                        SignalRequestID:       rowInfo.GetRequestID(),
×
UNCOV
667
                        SignalName:            rowInfo.GetName(),
×
UNCOV
668
                        Input:                 rowInfo.GetInput(),
×
UNCOV
669
                        Control:               rowInfo.GetControl(),
×
UNCOV
670
                }
×
671
        }
672

673
        return ret, nil
690✔
674
}
675

676
func deleteSignalInfoMap(
677
        ctx context.Context,
678
        tx sqlplugin.Tx,
679
        shardID int,
680
        domainID serialization.UUID,
681
        workflowID string,
682
        runID serialization.UUID,
683
) error {
3✔
684

3✔
685
        if _, err := tx.DeleteFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
3✔
686
                ShardID:    int64(shardID),
3✔
687
                DomainID:   domainID,
3✔
688
                WorkflowID: workflowID,
3✔
689
                RunID:      runID,
3✔
690
        }); err != nil {
3✔
691
                return convertCommonErrors(tx, "deleteSignalInfoMap", "", err)
×
692
        }
×
693
        return nil
3✔
694
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc