• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01886f46-77ea-4306-8d9c-5f71c79fbe8a

31 May 2023 01:07AM UTC coverage: 57.276% (+0.05%) from 57.229%
01886f46-77ea-4306-8d9c-5f71c79fbe8a

push

buildkite

web-flow
Minor fixes for sticky tasklist isolation (#5308)

3 of 3 new or added lines in 2 files covered. (100.0%)

87037 of 151962 relevant lines covered (57.28%)

2480.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.25
/common/persistence/sql/workflowStateMaps.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package sql
23

24
import (
25
        "context"
26
        "database/sql"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/persistence"
30
        "github.com/uber/cadence/common/persistence/serialization"
31
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
32
        "github.com/uber/cadence/common/types"
33
)
34

35
func updateActivityInfos(
36
        ctx context.Context,
37
        tx sqlplugin.Tx,
38
        activityInfos []*persistence.InternalActivityInfo,
39
        deleteInfos []int64,
40
        shardID int,
41
        domainID serialization.UUID,
42
        workflowID string,
43
        runID serialization.UUID,
44
        parser serialization.Parser,
45
) error {
3,360✔
46

3,360✔
47
        if len(activityInfos) > 0 {
4,132✔
48
                rows := make([]sqlplugin.ActivityInfoMapsRow, len(activityInfos))
772✔
49
                for i, activityInfo := range activityInfos {
1,615✔
50
                        scheduledEvent, scheduledEncoding := persistence.FromDataBlob(activityInfo.ScheduledEvent)
843✔
51
                        startEvent, startEncoding := persistence.FromDataBlob(activityInfo.StartedEvent)
843✔
52

843✔
53
                        info := &serialization.ActivityInfo{
843✔
54
                                Version:                  activityInfo.Version,
843✔
55
                                ScheduledEventBatchID:    activityInfo.ScheduledEventBatchID,
843✔
56
                                ScheduledEvent:           scheduledEvent,
843✔
57
                                ScheduledEventEncoding:   scheduledEncoding,
843✔
58
                                ScheduledTimestamp:       activityInfo.ScheduledTime,
843✔
59
                                StartedID:                activityInfo.StartedID,
843✔
60
                                StartedEvent:             startEvent,
843✔
61
                                StartedEventEncoding:     startEncoding,
843✔
62
                                StartedTimestamp:         activityInfo.StartedTime,
843✔
63
                                ActivityID:               activityInfo.ActivityID,
843✔
64
                                RequestID:                activityInfo.RequestID,
843✔
65
                                ScheduleToStartTimeout:   activityInfo.ScheduleToStartTimeout,
843✔
66
                                ScheduleToCloseTimeout:   activityInfo.ScheduleToCloseTimeout,
843✔
67
                                StartToCloseTimeout:      activityInfo.StartToCloseTimeout,
843✔
68
                                HeartbeatTimeout:         activityInfo.HeartbeatTimeout,
843✔
69
                                CancelRequested:          activityInfo.CancelRequested,
843✔
70
                                CancelRequestID:          activityInfo.CancelRequestID,
843✔
71
                                TimerTaskStatus:          activityInfo.TimerTaskStatus,
843✔
72
                                Attempt:                  activityInfo.Attempt,
843✔
73
                                TaskList:                 activityInfo.TaskList,
843✔
74
                                StartedIdentity:          activityInfo.StartedIdentity,
843✔
75
                                HasRetryPolicy:           activityInfo.HasRetryPolicy,
843✔
76
                                RetryInitialInterval:     activityInfo.InitialInterval,
843✔
77
                                RetryBackoffCoefficient:  activityInfo.BackoffCoefficient,
843✔
78
                                RetryMaximumInterval:     activityInfo.MaximumInterval,
843✔
79
                                RetryExpirationTimestamp: activityInfo.ExpirationTime,
843✔
80
                                RetryMaximumAttempts:     activityInfo.MaximumAttempts,
843✔
81
                                RetryNonRetryableErrors:  activityInfo.NonRetriableErrors,
843✔
82
                                RetryLastFailureReason:   activityInfo.LastFailureReason,
843✔
83
                                RetryLastWorkerIdentity:  activityInfo.LastWorkerIdentity,
843✔
84
                                RetryLastFailureDetails:  activityInfo.LastFailureDetails,
843✔
85
                        }
843✔
86
                        blob, err := parser.ActivityInfoToBlob(info)
843✔
87
                        if err != nil {
843✔
88
                                return err
×
89
                        }
×
90
                        rows[i] = sqlplugin.ActivityInfoMapsRow{
843✔
91
                                ShardID:                  int64(shardID),
843✔
92
                                DomainID:                 domainID,
843✔
93
                                WorkflowID:               workflowID,
843✔
94
                                RunID:                    runID,
843✔
95
                                ScheduleID:               activityInfo.ScheduleID,
843✔
96
                                LastHeartbeatUpdatedTime: activityInfo.LastHeartBeatUpdatedTime,
843✔
97
                                LastHeartbeatDetails:     activityInfo.Details,
843✔
98
                                Data:                     blob.Data,
843✔
99
                                DataEncoding:             string(blob.Encoding),
843✔
100
                        }
843✔
101
                }
102

103
                if _, err := tx.ReplaceIntoActivityInfoMaps(ctx, rows); err != nil {
772✔
104
                        return convertCommonErrors(tx, "updateActivityInfos", "Failed to execute update query.", err)
×
105
                }
×
106
        }
107

108
        if len(deleteInfos) > 0 {
3,608✔
109
                if _, err := tx.DeleteFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
248✔
110
                        ShardID:     int64(shardID),
248✔
111
                        DomainID:    domainID,
248✔
112
                        WorkflowID:  workflowID,
248✔
113
                        RunID:       runID,
248✔
114
                        ScheduleIDs: deleteInfos,
248✔
115
                }); err != nil {
248✔
116
                        return convertCommonErrors(tx, "updateActivityInfos", "Failed to execute delete query.", err)
×
117
                }
×
118
        }
119

120
        return nil
3,360✔
121
}
122

123
func getActivityInfoMap(
124
        ctx context.Context,
125
        db sqlplugin.DB,
126
        shardID int,
127
        domainID serialization.UUID,
128
        workflowID string,
129
        runID serialization.UUID,
130
        parser serialization.Parser,
131
) (map[int64]*persistence.InternalActivityInfo, error) {
764✔
132

764✔
133
        rows, err := db.SelectFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
764✔
134
                ShardID:    int64(shardID),
764✔
135
                DomainID:   domainID,
764✔
136
                WorkflowID: workflowID,
764✔
137
                RunID:      runID,
764✔
138
        })
764✔
139
        if err != nil && err != sql.ErrNoRows {
808✔
140
                return nil, convertCommonErrors(db, "getActivityInfoMap", "", err)
44✔
141
        }
44✔
142

143
        ret := make(map[int64]*persistence.InternalActivityInfo)
722✔
144
        for _, row := range rows {
787✔
145
                decoded, err := parser.ActivityInfoFromBlob(row.Data, row.DataEncoding)
65✔
146
                if err != nil {
65✔
147
                        return nil, err
×
148
                }
×
149
                info := &persistence.InternalActivityInfo{
65✔
150
                        DomainID:                 row.DomainID.String(),
65✔
151
                        ScheduleID:               row.ScheduleID,
65✔
152
                        Details:                  row.LastHeartbeatDetails,
65✔
153
                        LastHeartBeatUpdatedTime: row.LastHeartbeatUpdatedTime,
65✔
154
                        Version:                  decoded.GetVersion(),
65✔
155
                        ScheduledEventBatchID:    decoded.GetScheduledEventBatchID(),
65✔
156
                        ScheduledEvent:           persistence.NewDataBlob(decoded.ScheduledEvent, common.EncodingType(decoded.GetScheduledEventEncoding())),
65✔
157
                        ScheduledTime:            decoded.GetScheduledTimestamp(),
65✔
158
                        StartedID:                decoded.GetStartedID(),
65✔
159
                        StartedTime:              decoded.GetStartedTimestamp(),
65✔
160
                        ActivityID:               decoded.GetActivityID(),
65✔
161
                        RequestID:                decoded.GetRequestID(),
65✔
162
                        ScheduleToStartTimeout:   decoded.GetScheduleToStartTimeout(),
65✔
163
                        ScheduleToCloseTimeout:   decoded.GetScheduleToCloseTimeout(),
65✔
164
                        StartToCloseTimeout:      decoded.GetStartToCloseTimeout(),
65✔
165
                        HeartbeatTimeout:         decoded.GetHeartbeatTimeout(),
65✔
166
                        CancelRequested:          decoded.GetCancelRequested(),
65✔
167
                        CancelRequestID:          decoded.GetCancelRequestID(),
65✔
168
                        TimerTaskStatus:          decoded.GetTimerTaskStatus(),
65✔
169
                        Attempt:                  decoded.GetAttempt(),
65✔
170
                        StartedIdentity:          decoded.GetStartedIdentity(),
65✔
171
                        TaskList:                 decoded.GetTaskList(),
65✔
172
                        HasRetryPolicy:           decoded.GetHasRetryPolicy(),
65✔
173
                        InitialInterval:          decoded.GetRetryInitialInterval(),
65✔
174
                        BackoffCoefficient:       decoded.GetRetryBackoffCoefficient(),
65✔
175
                        MaximumInterval:          decoded.GetRetryMaximumInterval(),
65✔
176
                        ExpirationTime:           decoded.GetRetryExpirationTimestamp(),
65✔
177
                        MaximumAttempts:          decoded.GetRetryMaximumAttempts(),
65✔
178
                        NonRetriableErrors:       decoded.GetRetryNonRetryableErrors(),
65✔
179
                        LastFailureReason:        decoded.GetRetryLastFailureReason(),
65✔
180
                        LastWorkerIdentity:       decoded.GetRetryLastWorkerIdentity(),
65✔
181
                        LastFailureDetails:       decoded.GetRetryLastFailureDetails(),
65✔
182
                }
65✔
183
                if decoded.StartedEvent != nil {
65✔
184
                        info.StartedEvent = persistence.NewDataBlob(decoded.StartedEvent, common.EncodingType(decoded.GetStartedEventEncoding()))
×
185
                }
×
186
                ret[row.ScheduleID] = info
65✔
187
        }
188

189
        return ret, nil
722✔
190
}
191

192
func deleteActivityInfoMap(
193
        ctx context.Context,
194
        tx sqlplugin.Tx,
195
        shardID int,
196
        domainID serialization.UUID,
197
        workflowID string,
198
        runID serialization.UUID,
199
) error {
2✔
200

2✔
201
        if _, err := tx.DeleteFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
2✔
202
                ShardID:    int64(shardID),
2✔
203
                DomainID:   domainID,
2✔
204
                WorkflowID: workflowID,
2✔
205
                RunID:      runID,
2✔
206
        }); err != nil {
2✔
207
                return convertCommonErrors(tx, "deleteActivityInfoMap", "", err)
×
208
        }
×
209
        return nil
2✔
210
}
211

212
func updateTimerInfos(
213
        ctx context.Context,
214
        tx sqlplugin.Tx,
215
        timerInfos []*persistence.TimerInfo,
216
        deleteInfos []string,
217
        shardID int,
218
        domainID serialization.UUID,
219
        workflowID string,
220
        runID serialization.UUID,
221
        parser serialization.Parser,
222
) error {
3,360✔
223

3,360✔
224
        if len(timerInfos) > 0 {
3,382✔
225
                rows := make([]sqlplugin.TimerInfoMapsRow, len(timerInfos))
22✔
226
                for i, timerInfo := range timerInfos {
44✔
227
                        blob, err := parser.TimerInfoToBlob(&serialization.TimerInfo{
22✔
228
                                Version:         timerInfo.Version,
22✔
229
                                StartedID:       timerInfo.StartedID,
22✔
230
                                ExpiryTimestamp: timerInfo.ExpiryTime,
22✔
231
                                // TaskID is a misleading variable, it actually serves
22✔
232
                                // the purpose of indicating whether a timer task is
22✔
233
                                // generated for this timer info
22✔
234
                                TaskID: timerInfo.TaskStatus,
22✔
235
                        })
22✔
236
                        if err != nil {
22✔
237
                                return err
×
238
                        }
×
239
                        rows[i] = sqlplugin.TimerInfoMapsRow{
22✔
240
                                ShardID:      int64(shardID),
22✔
241
                                DomainID:     domainID,
22✔
242
                                WorkflowID:   workflowID,
22✔
243
                                RunID:        runID,
22✔
244
                                TimerID:      timerInfo.TimerID,
22✔
245
                                Data:         blob.Data,
22✔
246
                                DataEncoding: string(blob.Encoding),
22✔
247
                        }
22✔
248
                }
249
                if _, err := tx.ReplaceIntoTimerInfoMaps(ctx, rows); err != nil {
22✔
250
                        return convertCommonErrors(tx, "updateTimerInfos", "Failed to execute update query.", err)
×
251
                }
×
252
        }
253

254
        if len(deleteInfos) > 0 {
3,376✔
255
                if _, err := tx.DeleteFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
16✔
256
                        ShardID:    int64(shardID),
16✔
257
                        DomainID:   domainID,
16✔
258
                        WorkflowID: workflowID,
16✔
259
                        RunID:      runID,
16✔
260
                        TimerIDs:   deleteInfos,
16✔
261
                }); err != nil {
16✔
262
                        return convertCommonErrors(tx, "updateTimerInfos", "Failed to execute delete query.", err)
×
263
                }
×
264
        }
265

266
        return nil
3,360✔
267
}
268

269
func getTimerInfoMap(
270
        ctx context.Context,
271
        db sqlplugin.DB,
272
        shardID int,
273
        domainID serialization.UUID,
274
        workflowID string,
275
        runID serialization.UUID,
276
        parser serialization.Parser,
277
) (map[string]*persistence.TimerInfo, error) {
764✔
278

764✔
279
        rows, err := db.SelectFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
764✔
280
                ShardID:    int64(shardID),
764✔
281
                DomainID:   domainID,
764✔
282
                WorkflowID: workflowID,
764✔
283
                RunID:      runID,
764✔
284
        })
764✔
285
        if err != nil && err != sql.ErrNoRows {
932✔
286
                return nil, convertCommonErrors(db, "getTimerInfoMap", "", err)
168✔
287
        }
168✔
288
        ret := make(map[string]*persistence.TimerInfo)
598✔
289
        for _, row := range rows {
598✔
290
                info, err := parser.TimerInfoFromBlob(row.Data, row.DataEncoding)
×
291
                if err != nil {
×
292
                        return nil, err
×
293
                }
×
294
                ret[row.TimerID] = &persistence.TimerInfo{
×
295
                        TimerID:    row.TimerID,
×
296
                        Version:    info.GetVersion(),
×
297
                        StartedID:  info.GetStartedID(),
×
298
                        ExpiryTime: info.GetExpiryTimestamp(),
×
299
                        // TaskID is a misleading variable, it actually serves
×
300
                        // the purpose of indicating whether a timer task is
×
301
                        // generated for this timer info
×
302
                        TaskStatus: info.GetTaskID(),
×
303
                }
×
304
        }
305

306
        return ret, nil
598✔
307
}
308

309
func deleteTimerInfoMap(
310
        ctx context.Context,
311
        tx sqlplugin.Tx,
312
        shardID int,
313
        domainID serialization.UUID,
314
        workflowID string,
315
        runID serialization.UUID,
316
) error {
2✔
317

2✔
318
        if _, err := tx.DeleteFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
2✔
319
                ShardID:    int64(shardID),
2✔
320
                DomainID:   domainID,
2✔
321
                WorkflowID: workflowID,
2✔
322
                RunID:      runID,
2✔
323
        }); err != nil {
2✔
324
                return convertCommonErrors(tx, "deleteTimerInfoMap", "", err)
×
325
        }
×
326
        return nil
2✔
327
}
328

329
func updateChildExecutionInfos(
330
        ctx context.Context,
331
        tx sqlplugin.Tx,
332
        childExecutionInfos []*persistence.InternalChildExecutionInfo,
333
        deleteInfos []int64,
334
        shardID int,
335
        domainID serialization.UUID,
336
        workflowID string,
337
        runID serialization.UUID,
338
        parser serialization.Parser,
339
) error {
3,360✔
340

3,360✔
341
        if len(childExecutionInfos) > 0 {
3,386✔
342
                rows := make([]sqlplugin.ChildExecutionInfoMapsRow, len(childExecutionInfos))
26✔
343
                for i, childExecutionInfo := range childExecutionInfos {
52✔
344
                        initiateEvent, initiateEncoding := persistence.FromDataBlob(childExecutionInfo.InitiatedEvent)
26✔
345
                        startEvent, startEncoding := persistence.FromDataBlob(childExecutionInfo.StartedEvent)
26✔
346

26✔
347
                        info := &serialization.ChildExecutionInfo{
26✔
348
                                Version:                childExecutionInfo.Version,
26✔
349
                                InitiatedEventBatchID:  childExecutionInfo.InitiatedEventBatchID,
26✔
350
                                InitiatedEvent:         initiateEvent,
26✔
351
                                InitiatedEventEncoding: initiateEncoding,
26✔
352
                                StartedEvent:           startEvent,
26✔
353
                                StartedEventEncoding:   startEncoding,
26✔
354
                                StartedID:              childExecutionInfo.StartedID,
26✔
355
                                StartedWorkflowID:      childExecutionInfo.StartedWorkflowID,
26✔
356
                                StartedRunID:           serialization.MustParseUUID(childExecutionInfo.StartedRunID),
26✔
357
                                CreateRequestID:        childExecutionInfo.CreateRequestID,
26✔
358
                                DomainID:               childExecutionInfo.DomainID,
26✔
359
                                DomainNameDEPRECATED:   childExecutionInfo.DomainNameDEPRECATED,
26✔
360
                                WorkflowTypeName:       childExecutionInfo.WorkflowTypeName,
26✔
361
                                ParentClosePolicy:      int32(childExecutionInfo.ParentClosePolicy),
26✔
362
                        }
26✔
363
                        blob, err := parser.ChildExecutionInfoToBlob(info)
26✔
364
                        if err != nil {
26✔
365
                                return err
×
366
                        }
×
367
                        rows[i] = sqlplugin.ChildExecutionInfoMapsRow{
26✔
368
                                ShardID:      int64(shardID),
26✔
369
                                DomainID:     domainID,
26✔
370
                                WorkflowID:   workflowID,
26✔
371
                                RunID:        runID,
26✔
372
                                InitiatedID:  childExecutionInfo.InitiatedID,
26✔
373
                                Data:         blob.Data,
26✔
374
                                DataEncoding: string(blob.Encoding),
26✔
375
                        }
26✔
376
                }
377
                if _, err := tx.ReplaceIntoChildExecutionInfoMaps(ctx, rows); err != nil {
26✔
378
                        return convertCommonErrors(tx, "updateChildExecutionInfos", "Failed to execute update query.", err)
×
379
                }
×
380
        }
381

382
        if len(deleteInfos) > 0 {
3,372✔
383
                if _, err := tx.DeleteFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
12✔
384
                        ShardID:      int64(shardID),
12✔
385
                        DomainID:     domainID,
12✔
386
                        WorkflowID:   workflowID,
12✔
387
                        RunID:        runID,
12✔
388
                        InitiatedIDs: deleteInfos,
12✔
389
                }); err != nil {
12✔
390
                        return convertCommonErrors(tx, "updateChildExecutionInfos", "Failed to execute delete query.", err)
×
391
                }
×
392
        }
393

394
        return nil
3,360✔
395
}
396

397
func getChildExecutionInfoMap(
398
        ctx context.Context,
399
        db sqlplugin.DB,
400
        shardID int,
401
        domainID serialization.UUID,
402
        workflowID string,
403
        runID serialization.UUID,
404
        parser serialization.Parser,
405
) (map[int64]*persistence.InternalChildExecutionInfo, error) {
764✔
406

764✔
407
        rows, err := db.SelectFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
764✔
408
                ShardID:    int64(shardID),
764✔
409
                DomainID:   domainID,
764✔
410
                WorkflowID: workflowID,
764✔
411
                RunID:      runID,
764✔
412
        })
764✔
413
        if err != nil && err != sql.ErrNoRows {
954✔
414
                return nil, convertCommonErrors(db, "getChildExecutionInfoMap", "", err)
190✔
415
        }
190✔
416

417
        ret := make(map[int64]*persistence.InternalChildExecutionInfo)
575✔
418
        for _, row := range rows {
577✔
419
                rowInfo, err := parser.ChildExecutionInfoFromBlob(row.Data, row.DataEncoding)
2✔
420
                if err != nil {
2✔
421
                        return nil, err
×
422
                }
×
423
                info := &persistence.InternalChildExecutionInfo{
2✔
424
                        InitiatedID:           row.InitiatedID,
2✔
425
                        InitiatedEventBatchID: rowInfo.GetInitiatedEventBatchID(),
2✔
426
                        Version:               rowInfo.GetVersion(),
2✔
427
                        StartedID:             rowInfo.GetStartedID(),
2✔
428
                        StartedWorkflowID:     rowInfo.GetStartedWorkflowID(),
2✔
429
                        StartedRunID:          serialization.UUID(rowInfo.GetStartedRunID()).String(),
2✔
430
                        CreateRequestID:       rowInfo.GetCreateRequestID(),
2✔
431
                        DomainID:              rowInfo.GetDomainID(),
2✔
432
                        DomainNameDEPRECATED:  rowInfo.GetDomainNameDEPRECATED(),
2✔
433
                        WorkflowTypeName:      rowInfo.GetWorkflowTypeName(),
2✔
434
                        ParentClosePolicy:     types.ParentClosePolicy(rowInfo.GetParentClosePolicy()),
2✔
435
                }
2✔
436
                if rowInfo.InitiatedEvent != nil {
2✔
437
                        info.InitiatedEvent = persistence.NewDataBlob(rowInfo.InitiatedEvent, common.EncodingType(rowInfo.GetInitiatedEventEncoding()))
×
438
                }
×
439
                if rowInfo.StartedEvent != nil {
2✔
440
                        info.StartedEvent = persistence.NewDataBlob(rowInfo.StartedEvent, common.EncodingType(rowInfo.GetStartedEventEncoding()))
×
441
                }
×
442
                ret[row.InitiatedID] = info
2✔
443
        }
444

445
        return ret, nil
575✔
446
}
447

448
func deleteChildExecutionInfoMap(
449
        ctx context.Context,
450
        tx sqlplugin.Tx,
451
        shardID int,
452
        domainID serialization.UUID,
453
        workflowID string,
454
        runID serialization.UUID,
455
) error {
2✔
456

2✔
457
        if _, err := tx.DeleteFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
2✔
458
                ShardID:    int64(shardID),
2✔
459
                DomainID:   domainID,
2✔
460
                WorkflowID: workflowID,
2✔
461
                RunID:      runID,
2✔
462
        }); err != nil {
2✔
463
                return convertCommonErrors(tx, "deleteChildExecutionInfoMap", "", err)
×
464
        }
×
465
        return nil
2✔
466
}
467

468
func updateRequestCancelInfos(
469
        ctx context.Context,
470
        tx sqlplugin.Tx,
471
        requestCancelInfos []*persistence.RequestCancelInfo,
472
        deleteInfos []int64,
473
        shardID int,
474
        domainID serialization.UUID,
475
        workflowID string,
476
        runID serialization.UUID,
477
        parser serialization.Parser,
478
) error {
3,360✔
479

3,360✔
480
        if len(requestCancelInfos) > 0 {
3,366✔
481
                rows := make([]sqlplugin.RequestCancelInfoMapsRow, len(requestCancelInfos))
6✔
482
                for i, requestCancelInfo := range requestCancelInfos {
12✔
483
                        blob, err := parser.RequestCancelInfoToBlob(&serialization.RequestCancelInfo{
6✔
484
                                Version:               requestCancelInfo.Version,
6✔
485
                                InitiatedEventBatchID: requestCancelInfo.InitiatedEventBatchID,
6✔
486
                                CancelRequestID:       requestCancelInfo.CancelRequestID,
6✔
487
                        })
6✔
488
                        if err != nil {
6✔
489
                                return err
×
490
                        }
×
491
                        rows[i] = sqlplugin.RequestCancelInfoMapsRow{
6✔
492
                                ShardID:      int64(shardID),
6✔
493
                                DomainID:     domainID,
6✔
494
                                WorkflowID:   workflowID,
6✔
495
                                RunID:        runID,
6✔
496
                                InitiatedID:  requestCancelInfo.InitiatedID,
6✔
497
                                Data:         blob.Data,
6✔
498
                                DataEncoding: string(blob.Encoding),
6✔
499
                        }
6✔
500
                }
501

502
                if _, err := tx.ReplaceIntoRequestCancelInfoMaps(ctx, rows); err != nil {
6✔
503
                        return convertCommonErrors(tx, "updateRequestCancelInfos", "Failed to execute update query.", err)
×
504
                }
×
505
        }
506

507
        if len(deleteInfos) > 0 {
3,366✔
508
                if _, err := tx.DeleteFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
6✔
509
                        ShardID:      int64(shardID),
6✔
510
                        DomainID:     domainID,
6✔
511
                        WorkflowID:   workflowID,
6✔
512
                        RunID:        runID,
6✔
513
                        InitiatedIDs: deleteInfos,
6✔
514
                }); err != nil {
6✔
515
                        return convertCommonErrors(tx, "updateRequestCancelInfos", "Failed to execute delete query.", err)
×
516
                }
×
517
        }
518

519
        return nil
3,360✔
520
}
521

522
func getRequestCancelInfoMap(
523
        ctx context.Context,
524
        db sqlplugin.DB,
525
        shardID int,
526
        domainID serialization.UUID,
527
        workflowID string,
528
        runID serialization.UUID,
529
        parser serialization.Parser,
530
) (map[int64]*persistence.RequestCancelInfo, error) {
764✔
531

764✔
532
        rows, err := db.SelectFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
764✔
533
                ShardID:    int64(shardID),
764✔
534
                DomainID:   domainID,
764✔
535
                WorkflowID: workflowID,
764✔
536
                RunID:      runID,
764✔
537
        })
764✔
538
        if err != nil && err != sql.ErrNoRows {
959✔
539
                return nil, convertCommonErrors(db, "getRequestCancelInfoMap", "", err)
195✔
540
        }
195✔
541

542
        ret := make(map[int64]*persistence.RequestCancelInfo)
571✔
543
        for _, row := range rows {
571✔
544
                rowInfo, err := parser.RequestCancelInfoFromBlob(row.Data, row.DataEncoding)
×
545
                if err != nil {
×
546
                        return nil, err
×
547
                }
×
548
                ret[row.InitiatedID] = &persistence.RequestCancelInfo{
×
549
                        Version:               rowInfo.GetVersion(),
×
550
                        InitiatedID:           row.InitiatedID,
×
551
                        InitiatedEventBatchID: rowInfo.GetInitiatedEventBatchID(),
×
552
                        CancelRequestID:       rowInfo.GetCancelRequestID(),
×
553
                }
×
554
        }
555

556
        return ret, nil
571✔
557
}
558

559
func deleteRequestCancelInfoMap(
560
        ctx context.Context,
561
        tx sqlplugin.Tx,
562
        shardID int,
563
        domainID serialization.UUID,
564
        workflowID string,
565
        runID serialization.UUID,
566
) error {
2✔
567

2✔
568
        if _, err := tx.DeleteFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
2✔
569
                ShardID:    int64(shardID),
2✔
570
                DomainID:   domainID,
2✔
571
                WorkflowID: workflowID,
2✔
572
                RunID:      runID,
2✔
573
        }); err != nil {
2✔
574
                return convertCommonErrors(tx, "deleteRequestCancelInfoMap", "", err)
×
575
        }
×
576
        return nil
2✔
577
}
578

579
func updateSignalInfos(
580
        ctx context.Context,
581
        tx sqlplugin.Tx,
582
        signalInfos []*persistence.SignalInfo,
583
        deleteInfos []int64,
584
        shardID int,
585
        domainID serialization.UUID,
586
        workflowID string,
587
        runID serialization.UUID,
588
        parser serialization.Parser,
589
) error {
3,360✔
590

3,360✔
591
        if len(signalInfos) > 0 {
3,370✔
592
                rows := make([]sqlplugin.SignalInfoMapsRow, len(signalInfos))
10✔
593
                for i, signalInfo := range signalInfos {
20✔
594
                        blob, err := parser.SignalInfoToBlob(&serialization.SignalInfo{
10✔
595
                                Version:               signalInfo.Version,
10✔
596
                                InitiatedEventBatchID: signalInfo.InitiatedEventBatchID,
10✔
597
                                RequestID:             signalInfo.SignalRequestID,
10✔
598
                                Name:                  signalInfo.SignalName,
10✔
599
                                Input:                 signalInfo.Input,
10✔
600
                                Control:               signalInfo.Control,
10✔
601
                        })
10✔
602
                        if err != nil {
10✔
603
                                return err
×
604
                        }
×
605
                        rows[i] = sqlplugin.SignalInfoMapsRow{
10✔
606
                                ShardID:      int64(shardID),
10✔
607
                                DomainID:     domainID,
10✔
608
                                WorkflowID:   workflowID,
10✔
609
                                RunID:        runID,
10✔
610
                                InitiatedID:  signalInfo.InitiatedID,
10✔
611
                                Data:         blob.Data,
10✔
612
                                DataEncoding: string(blob.Encoding),
10✔
613
                        }
10✔
614
                }
615

616
                if _, err := tx.ReplaceIntoSignalInfoMaps(ctx, rows); err != nil {
10✔
617
                        return convertCommonErrors(tx, "updateSignalInfos", "Failed to execute update query.", err)
×
618
                }
×
619
        }
620

621
        if len(deleteInfos) > 0 {
3,370✔
622
                if _, err := tx.DeleteFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
10✔
623
                        ShardID:      int64(shardID),
10✔
624
                        DomainID:     domainID,
10✔
625
                        WorkflowID:   workflowID,
10✔
626
                        RunID:        runID,
10✔
627
                        InitiatedIDs: deleteInfos,
10✔
628
                }); err != nil {
10✔
629
                        return convertCommonErrors(tx, "updateSignalInfos", "Failed to execute delete query.", err)
×
630
                }
×
631
        }
632

633
        return nil
3,360✔
634
}
635

636
func getSignalInfoMap(
637
        ctx context.Context,
638
        db sqlplugin.DB,
639
        shardID int,
640
        domainID serialization.UUID,
641
        workflowID string,
642
        runID serialization.UUID,
643
        parser serialization.Parser,
644
) (map[int64]*persistence.SignalInfo, error) {
764✔
645

764✔
646
        rows, err := db.SelectFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
764✔
647
                ShardID:    int64(shardID),
764✔
648
                DomainID:   domainID,
764✔
649
                WorkflowID: workflowID,
764✔
650
                RunID:      runID,
764✔
651
        })
764✔
652
        if err != nil && err != sql.ErrNoRows {
952✔
653
                return nil, convertCommonErrors(db, "getSignalInfoMap", "", err)
188✔
654
        }
188✔
655

656
        ret := make(map[int64]*persistence.SignalInfo)
578✔
657
        for _, row := range rows {
578✔
658
                rowInfo, err := parser.SignalInfoFromBlob(row.Data, row.DataEncoding)
×
659
                if err != nil {
×
660
                        return nil, err
×
661
                }
×
662
                ret[row.InitiatedID] = &persistence.SignalInfo{
×
663
                        Version:               rowInfo.GetVersion(),
×
664
                        InitiatedID:           row.InitiatedID,
×
665
                        InitiatedEventBatchID: rowInfo.GetInitiatedEventBatchID(),
×
666
                        SignalRequestID:       rowInfo.GetRequestID(),
×
667
                        SignalName:            rowInfo.GetName(),
×
668
                        Input:                 rowInfo.GetInput(),
×
669
                        Control:               rowInfo.GetControl(),
×
670
                }
×
671
        }
672

673
        return ret, nil
578✔
674
}
675

676
func deleteSignalInfoMap(
677
        ctx context.Context,
678
        tx sqlplugin.Tx,
679
        shardID int,
680
        domainID serialization.UUID,
681
        workflowID string,
682
        runID serialization.UUID,
683
) error {
2✔
684

2✔
685
        if _, err := tx.DeleteFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
2✔
686
                ShardID:    int64(shardID),
2✔
687
                DomainID:   domainID,
2✔
688
                WorkflowID: workflowID,
2✔
689
                RunID:      runID,
2✔
690
        }); err != nil {
2✔
691
                return convertCommonErrors(tx, "deleteSignalInfoMap", "", err)
×
692
        }
×
693
        return nil
2✔
694
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc