• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01880c67-2e5d-4e53-912a-94acc0108378

11 May 2023 08:21PM UTC coverage: 57.304% (+0.04%) from 57.263%
01880c67-2e5d-4e53-912a-94acc0108378

push

buildkite

GitHub
Bumping version (#5284)

86984 of 151794 relevant lines covered (57.3%)

2460.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.84
/common/persistence/sql/workflowStateMaps.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package sql
23

24
import (
25
        "context"
26
        "database/sql"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/persistence"
30
        "github.com/uber/cadence/common/persistence/serialization"
31
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
32
        "github.com/uber/cadence/common/types"
33
)
34

35
func updateActivityInfos(
36
        ctx context.Context,
37
        tx sqlplugin.Tx,
38
        activityInfos []*persistence.InternalActivityInfo,
39
        deleteInfos []int64,
40
        shardID int,
41
        domainID serialization.UUID,
42
        workflowID string,
43
        runID serialization.UUID,
44
        parser serialization.Parser,
45
) error {
3,341✔
46

3,341✔
47
        if len(activityInfos) > 0 {
4,108✔
48
                rows := make([]sqlplugin.ActivityInfoMapsRow, len(activityInfos))
767✔
49
                for i, activityInfo := range activityInfos {
1,608✔
50
                        scheduledEvent, scheduledEncoding := persistence.FromDataBlob(activityInfo.ScheduledEvent)
841✔
51
                        startEvent, startEncoding := persistence.FromDataBlob(activityInfo.StartedEvent)
841✔
52

841✔
53
                        info := &serialization.ActivityInfo{
841✔
54
                                Version:                  activityInfo.Version,
841✔
55
                                ScheduledEventBatchID:    activityInfo.ScheduledEventBatchID,
841✔
56
                                ScheduledEvent:           scheduledEvent,
841✔
57
                                ScheduledEventEncoding:   scheduledEncoding,
841✔
58
                                ScheduledTimestamp:       activityInfo.ScheduledTime,
841✔
59
                                StartedID:                activityInfo.StartedID,
841✔
60
                                StartedEvent:             startEvent,
841✔
61
                                StartedEventEncoding:     startEncoding,
841✔
62
                                StartedTimestamp:         activityInfo.StartedTime,
841✔
63
                                ActivityID:               activityInfo.ActivityID,
841✔
64
                                RequestID:                activityInfo.RequestID,
841✔
65
                                ScheduleToStartTimeout:   activityInfo.ScheduleToStartTimeout,
841✔
66
                                ScheduleToCloseTimeout:   activityInfo.ScheduleToCloseTimeout,
841✔
67
                                StartToCloseTimeout:      activityInfo.StartToCloseTimeout,
841✔
68
                                HeartbeatTimeout:         activityInfo.HeartbeatTimeout,
841✔
69
                                CancelRequested:          activityInfo.CancelRequested,
841✔
70
                                CancelRequestID:          activityInfo.CancelRequestID,
841✔
71
                                TimerTaskStatus:          activityInfo.TimerTaskStatus,
841✔
72
                                Attempt:                  activityInfo.Attempt,
841✔
73
                                TaskList:                 activityInfo.TaskList,
841✔
74
                                StartedIdentity:          activityInfo.StartedIdentity,
841✔
75
                                HasRetryPolicy:           activityInfo.HasRetryPolicy,
841✔
76
                                RetryInitialInterval:     activityInfo.InitialInterval,
841✔
77
                                RetryBackoffCoefficient:  activityInfo.BackoffCoefficient,
841✔
78
                                RetryMaximumInterval:     activityInfo.MaximumInterval,
841✔
79
                                RetryExpirationTimestamp: activityInfo.ExpirationTime,
841✔
80
                                RetryMaximumAttempts:     activityInfo.MaximumAttempts,
841✔
81
                                RetryNonRetryableErrors:  activityInfo.NonRetriableErrors,
841✔
82
                                RetryLastFailureReason:   activityInfo.LastFailureReason,
841✔
83
                                RetryLastWorkerIdentity:  activityInfo.LastWorkerIdentity,
841✔
84
                                RetryLastFailureDetails:  activityInfo.LastFailureDetails,
841✔
85
                        }
841✔
86
                        blob, err := parser.ActivityInfoToBlob(info)
841✔
87
                        if err != nil {
841✔
88
                                return err
×
89
                        }
×
90
                        rows[i] = sqlplugin.ActivityInfoMapsRow{
841✔
91
                                ShardID:                  int64(shardID),
841✔
92
                                DomainID:                 domainID,
841✔
93
                                WorkflowID:               workflowID,
841✔
94
                                RunID:                    runID,
841✔
95
                                ScheduleID:               activityInfo.ScheduleID,
841✔
96
                                LastHeartbeatUpdatedTime: activityInfo.LastHeartBeatUpdatedTime,
841✔
97
                                LastHeartbeatDetails:     activityInfo.Details,
841✔
98
                                Data:                     blob.Data,
841✔
99
                                DataEncoding:             string(blob.Encoding),
841✔
100
                        }
841✔
101
                }
102

103
                if _, err := tx.ReplaceIntoActivityInfoMaps(ctx, rows); err != nil {
767✔
104
                        return convertCommonErrors(tx, "updateActivityInfos", "Failed to execute update query.", err)
×
105
                }
×
106
        }
107

108
        if len(deleteInfos) > 0 {
3,587✔
109
                if _, err := tx.DeleteFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
246✔
110
                        ShardID:     int64(shardID),
246✔
111
                        DomainID:    domainID,
246✔
112
                        WorkflowID:  workflowID,
246✔
113
                        RunID:       runID,
246✔
114
                        ScheduleIDs: deleteInfos,
246✔
115
                }); err != nil {
246✔
116
                        return convertCommonErrors(tx, "updateActivityInfos", "Failed to execute delete query.", err)
×
117
                }
×
118
        }
119

120
        return nil
3,341✔
121
}
122

123
func getActivityInfoMap(
124
        ctx context.Context,
125
        db sqlplugin.DB,
126
        shardID int,
127
        domainID serialization.UUID,
128
        workflowID string,
129
        runID serialization.UUID,
130
        parser serialization.Parser,
131
) (map[int64]*persistence.InternalActivityInfo, error) {
756✔
132

756✔
133
        rows, err := db.SelectFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
756✔
134
                ShardID:    int64(shardID),
756✔
135
                DomainID:   domainID,
756✔
136
                WorkflowID: workflowID,
756✔
137
                RunID:      runID,
756✔
138
        })
756✔
139
        if err != nil && err != sql.ErrNoRows {
801✔
140
                return nil, convertCommonErrors(db, "getActivityInfoMap", "", err)
45✔
141
        }
45✔
142

143
        ret := make(map[int64]*persistence.InternalActivityInfo)
713✔
144
        for _, row := range rows {
783✔
145
                decoded, err := parser.ActivityInfoFromBlob(row.Data, row.DataEncoding)
70✔
146
                if err != nil {
70✔
147
                        return nil, err
×
148
                }
×
149
                info := &persistence.InternalActivityInfo{
70✔
150
                        DomainID:                 row.DomainID.String(),
70✔
151
                        ScheduleID:               row.ScheduleID,
70✔
152
                        Details:                  row.LastHeartbeatDetails,
70✔
153
                        LastHeartBeatUpdatedTime: row.LastHeartbeatUpdatedTime,
70✔
154
                        Version:                  decoded.GetVersion(),
70✔
155
                        ScheduledEventBatchID:    decoded.GetScheduledEventBatchID(),
70✔
156
                        ScheduledEvent:           persistence.NewDataBlob(decoded.ScheduledEvent, common.EncodingType(decoded.GetScheduledEventEncoding())),
70✔
157
                        ScheduledTime:            decoded.GetScheduledTimestamp(),
70✔
158
                        StartedID:                decoded.GetStartedID(),
70✔
159
                        StartedTime:              decoded.GetStartedTimestamp(),
70✔
160
                        ActivityID:               decoded.GetActivityID(),
70✔
161
                        RequestID:                decoded.GetRequestID(),
70✔
162
                        ScheduleToStartTimeout:   decoded.GetScheduleToStartTimeout(),
70✔
163
                        ScheduleToCloseTimeout:   decoded.GetScheduleToCloseTimeout(),
70✔
164
                        StartToCloseTimeout:      decoded.GetStartToCloseTimeout(),
70✔
165
                        HeartbeatTimeout:         decoded.GetHeartbeatTimeout(),
70✔
166
                        CancelRequested:          decoded.GetCancelRequested(),
70✔
167
                        CancelRequestID:          decoded.GetCancelRequestID(),
70✔
168
                        TimerTaskStatus:          decoded.GetTimerTaskStatus(),
70✔
169
                        Attempt:                  decoded.GetAttempt(),
70✔
170
                        StartedIdentity:          decoded.GetStartedIdentity(),
70✔
171
                        TaskList:                 decoded.GetTaskList(),
70✔
172
                        HasRetryPolicy:           decoded.GetHasRetryPolicy(),
70✔
173
                        InitialInterval:          decoded.GetRetryInitialInterval(),
70✔
174
                        BackoffCoefficient:       decoded.GetRetryBackoffCoefficient(),
70✔
175
                        MaximumInterval:          decoded.GetRetryMaximumInterval(),
70✔
176
                        ExpirationTime:           decoded.GetRetryExpirationTimestamp(),
70✔
177
                        MaximumAttempts:          decoded.GetRetryMaximumAttempts(),
70✔
178
                        NonRetriableErrors:       decoded.GetRetryNonRetryableErrors(),
70✔
179
                        LastFailureReason:        decoded.GetRetryLastFailureReason(),
70✔
180
                        LastWorkerIdentity:       decoded.GetRetryLastWorkerIdentity(),
70✔
181
                        LastFailureDetails:       decoded.GetRetryLastFailureDetails(),
70✔
182
                }
70✔
183
                if decoded.StartedEvent != nil {
70✔
184
                        info.StartedEvent = persistence.NewDataBlob(decoded.StartedEvent, common.EncodingType(decoded.GetStartedEventEncoding()))
×
185
                }
×
186
                ret[row.ScheduleID] = info
70✔
187
        }
188

189
        return ret, nil
713✔
190
}
191

192
func deleteActivityInfoMap(
193
        ctx context.Context,
194
        tx sqlplugin.Tx,
195
        shardID int,
196
        domainID serialization.UUID,
197
        workflowID string,
198
        runID serialization.UUID,
199
) error {
2✔
200

2✔
201
        if _, err := tx.DeleteFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
2✔
202
                ShardID:    int64(shardID),
2✔
203
                DomainID:   domainID,
2✔
204
                WorkflowID: workflowID,
2✔
205
                RunID:      runID,
2✔
206
        }); err != nil {
2✔
207
                return convertCommonErrors(tx, "deleteActivityInfoMap", "", err)
×
208
        }
×
209
        return nil
2✔
210
}
211

212
func updateTimerInfos(
213
        ctx context.Context,
214
        tx sqlplugin.Tx,
215
        timerInfos []*persistence.TimerInfo,
216
        deleteInfos []string,
217
        shardID int,
218
        domainID serialization.UUID,
219
        workflowID string,
220
        runID serialization.UUID,
221
        parser serialization.Parser,
222
) error {
3,341✔
223

3,341✔
224
        if len(timerInfos) > 0 {
3,363✔
225
                rows := make([]sqlplugin.TimerInfoMapsRow, len(timerInfos))
22✔
226
                for i, timerInfo := range timerInfos {
44✔
227
                        blob, err := parser.TimerInfoToBlob(&serialization.TimerInfo{
22✔
228
                                Version:         timerInfo.Version,
22✔
229
                                StartedID:       timerInfo.StartedID,
22✔
230
                                ExpiryTimestamp: timerInfo.ExpiryTime,
22✔
231
                                // TaskID is a misleading variable, it actually serves
22✔
232
                                // the purpose of indicating whether a timer task is
22✔
233
                                // generated for this timer info
22✔
234
                                TaskID: timerInfo.TaskStatus,
22✔
235
                        })
22✔
236
                        if err != nil {
22✔
237
                                return err
×
238
                        }
×
239
                        rows[i] = sqlplugin.TimerInfoMapsRow{
22✔
240
                                ShardID:      int64(shardID),
22✔
241
                                DomainID:     domainID,
22✔
242
                                WorkflowID:   workflowID,
22✔
243
                                RunID:        runID,
22✔
244
                                TimerID:      timerInfo.TimerID,
22✔
245
                                Data:         blob.Data,
22✔
246
                                DataEncoding: string(blob.Encoding),
22✔
247
                        }
22✔
248
                }
249
                if _, err := tx.ReplaceIntoTimerInfoMaps(ctx, rows); err != nil {
22✔
250
                        return convertCommonErrors(tx, "updateTimerInfos", "Failed to execute update query.", err)
×
251
                }
×
252
        }
253

254
        if len(deleteInfos) > 0 {
3,357✔
255
                if _, err := tx.DeleteFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
16✔
256
                        ShardID:    int64(shardID),
16✔
257
                        DomainID:   domainID,
16✔
258
                        WorkflowID: workflowID,
16✔
259
                        RunID:      runID,
16✔
260
                        TimerIDs:   deleteInfos,
16✔
261
                }); err != nil {
16✔
262
                        return convertCommonErrors(tx, "updateTimerInfos", "Failed to execute delete query.", err)
×
263
                }
×
264
        }
265

266
        return nil
3,341✔
267
}
268

269
func getTimerInfoMap(
270
        ctx context.Context,
271
        db sqlplugin.DB,
272
        shardID int,
273
        domainID serialization.UUID,
274
        workflowID string,
275
        runID serialization.UUID,
276
        parser serialization.Parser,
277
) (map[string]*persistence.TimerInfo, error) {
756✔
278

756✔
279
        rows, err := db.SelectFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
756✔
280
                ShardID:    int64(shardID),
756✔
281
                DomainID:   domainID,
756✔
282
                WorkflowID: workflowID,
756✔
283
                RunID:      runID,
756✔
284
        })
756✔
285
        if err != nil && err != sql.ErrNoRows {
922✔
286
                return nil, convertCommonErrors(db, "getTimerInfoMap", "", err)
166✔
287
        }
166✔
288
        ret := make(map[string]*persistence.TimerInfo)
592✔
289
        for _, row := range rows {
593✔
290
                info, err := parser.TimerInfoFromBlob(row.Data, row.DataEncoding)
1✔
291
                if err != nil {
1✔
292
                        return nil, err
×
293
                }
×
294
                ret[row.TimerID] = &persistence.TimerInfo{
1✔
295
                        TimerID:    row.TimerID,
1✔
296
                        Version:    info.GetVersion(),
1✔
297
                        StartedID:  info.GetStartedID(),
1✔
298
                        ExpiryTime: info.GetExpiryTimestamp(),
1✔
299
                        // TaskID is a misleading variable, it actually serves
1✔
300
                        // the purpose of indicating whether a timer task is
1✔
301
                        // generated for this timer info
1✔
302
                        TaskStatus: info.GetTaskID(),
1✔
303
                }
1✔
304
        }
305

306
        return ret, nil
592✔
307
}
308

309
func deleteTimerInfoMap(
310
        ctx context.Context,
311
        tx sqlplugin.Tx,
312
        shardID int,
313
        domainID serialization.UUID,
314
        workflowID string,
315
        runID serialization.UUID,
316
) error {
2✔
317

2✔
318
        if _, err := tx.DeleteFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
2✔
319
                ShardID:    int64(shardID),
2✔
320
                DomainID:   domainID,
2✔
321
                WorkflowID: workflowID,
2✔
322
                RunID:      runID,
2✔
323
        }); err != nil {
2✔
324
                return convertCommonErrors(tx, "deleteTimerInfoMap", "", err)
×
325
        }
×
326
        return nil
2✔
327
}
328

329
func updateChildExecutionInfos(
330
        ctx context.Context,
331
        tx sqlplugin.Tx,
332
        childExecutionInfos []*persistence.InternalChildExecutionInfo,
333
        deleteInfos []int64,
334
        shardID int,
335
        domainID serialization.UUID,
336
        workflowID string,
337
        runID serialization.UUID,
338
        parser serialization.Parser,
339
) error {
3,341✔
340

3,341✔
341
        if len(childExecutionInfos) > 0 {
3,367✔
342
                rows := make([]sqlplugin.ChildExecutionInfoMapsRow, len(childExecutionInfos))
26✔
343
                for i, childExecutionInfo := range childExecutionInfos {
52✔
344
                        initiateEvent, initiateEncoding := persistence.FromDataBlob(childExecutionInfo.InitiatedEvent)
26✔
345
                        startEvent, startEncoding := persistence.FromDataBlob(childExecutionInfo.StartedEvent)
26✔
346

26✔
347
                        info := &serialization.ChildExecutionInfo{
26✔
348
                                Version:                childExecutionInfo.Version,
26✔
349
                                InitiatedEventBatchID:  childExecutionInfo.InitiatedEventBatchID,
26✔
350
                                InitiatedEvent:         initiateEvent,
26✔
351
                                InitiatedEventEncoding: initiateEncoding,
26✔
352
                                StartedEvent:           startEvent,
26✔
353
                                StartedEventEncoding:   startEncoding,
26✔
354
                                StartedID:              childExecutionInfo.StartedID,
26✔
355
                                StartedWorkflowID:      childExecutionInfo.StartedWorkflowID,
26✔
356
                                StartedRunID:           serialization.MustParseUUID(childExecutionInfo.StartedRunID),
26✔
357
                                CreateRequestID:        childExecutionInfo.CreateRequestID,
26✔
358
                                DomainID:               childExecutionInfo.DomainID,
26✔
359
                                DomainNameDEPRECATED:   childExecutionInfo.DomainNameDEPRECATED,
26✔
360
                                WorkflowTypeName:       childExecutionInfo.WorkflowTypeName,
26✔
361
                                ParentClosePolicy:      int32(childExecutionInfo.ParentClosePolicy),
26✔
362
                        }
26✔
363
                        blob, err := parser.ChildExecutionInfoToBlob(info)
26✔
364
                        if err != nil {
26✔
365
                                return err
×
366
                        }
×
367
                        rows[i] = sqlplugin.ChildExecutionInfoMapsRow{
26✔
368
                                ShardID:      int64(shardID),
26✔
369
                                DomainID:     domainID,
26✔
370
                                WorkflowID:   workflowID,
26✔
371
                                RunID:        runID,
26✔
372
                                InitiatedID:  childExecutionInfo.InitiatedID,
26✔
373
                                Data:         blob.Data,
26✔
374
                                DataEncoding: string(blob.Encoding),
26✔
375
                        }
26✔
376
                }
377
                if _, err := tx.ReplaceIntoChildExecutionInfoMaps(ctx, rows); err != nil {
26✔
378
                        return convertCommonErrors(tx, "updateChildExecutionInfos", "Failed to execute update query.", err)
×
379
                }
×
380
        }
381

382
        if len(deleteInfos) > 0 {
3,353✔
383
                if _, err := tx.DeleteFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
12✔
384
                        ShardID:      int64(shardID),
12✔
385
                        DomainID:     domainID,
12✔
386
                        WorkflowID:   workflowID,
12✔
387
                        RunID:        runID,
12✔
388
                        InitiatedIDs: deleteInfos,
12✔
389
                }); err != nil {
12✔
390
                        return convertCommonErrors(tx, "updateChildExecutionInfos", "Failed to execute delete query.", err)
×
391
                }
×
392
        }
393

394
        return nil
3,341✔
395
}
396

397
func getChildExecutionInfoMap(
398
        ctx context.Context,
399
        db sqlplugin.DB,
400
        shardID int,
401
        domainID serialization.UUID,
402
        workflowID string,
403
        runID serialization.UUID,
404
        parser serialization.Parser,
405
) (map[int64]*persistence.InternalChildExecutionInfo, error) {
756✔
406

756✔
407
        rows, err := db.SelectFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
756✔
408
                ShardID:    int64(shardID),
756✔
409
                DomainID:   domainID,
756✔
410
                WorkflowID: workflowID,
756✔
411
                RunID:      runID,
756✔
412
        })
756✔
413
        if err != nil && err != sql.ErrNoRows {
943✔
414
                return nil, convertCommonErrors(db, "getChildExecutionInfoMap", "", err)
187✔
415
        }
187✔
416

417
        ret := make(map[int64]*persistence.InternalChildExecutionInfo)
571✔
418
        for _, row := range rows {
573✔
419
                rowInfo, err := parser.ChildExecutionInfoFromBlob(row.Data, row.DataEncoding)
2✔
420
                if err != nil {
2✔
421
                        return nil, err
×
422
                }
×
423
                info := &persistence.InternalChildExecutionInfo{
2✔
424
                        InitiatedID:           row.InitiatedID,
2✔
425
                        InitiatedEventBatchID: rowInfo.GetInitiatedEventBatchID(),
2✔
426
                        Version:               rowInfo.GetVersion(),
2✔
427
                        StartedID:             rowInfo.GetStartedID(),
2✔
428
                        StartedWorkflowID:     rowInfo.GetStartedWorkflowID(),
2✔
429
                        StartedRunID:          serialization.UUID(rowInfo.GetStartedRunID()).String(),
2✔
430
                        CreateRequestID:       rowInfo.GetCreateRequestID(),
2✔
431
                        DomainID:              rowInfo.GetDomainID(),
2✔
432
                        DomainNameDEPRECATED:  rowInfo.GetDomainNameDEPRECATED(),
2✔
433
                        WorkflowTypeName:      rowInfo.GetWorkflowTypeName(),
2✔
434
                        ParentClosePolicy:     types.ParentClosePolicy(rowInfo.GetParentClosePolicy()),
2✔
435
                }
2✔
436
                if rowInfo.InitiatedEvent != nil {
2✔
437
                        info.InitiatedEvent = persistence.NewDataBlob(rowInfo.InitiatedEvent, common.EncodingType(rowInfo.GetInitiatedEventEncoding()))
×
438
                }
×
439
                if rowInfo.StartedEvent != nil {
2✔
440
                        info.StartedEvent = persistence.NewDataBlob(rowInfo.StartedEvent, common.EncodingType(rowInfo.GetStartedEventEncoding()))
×
441
                }
×
442
                ret[row.InitiatedID] = info
2✔
443
        }
444

445
        return ret, nil
571✔
446
}
447

448
func deleteChildExecutionInfoMap(
449
        ctx context.Context,
450
        tx sqlplugin.Tx,
451
        shardID int,
452
        domainID serialization.UUID,
453
        workflowID string,
454
        runID serialization.UUID,
455
) error {
2✔
456

2✔
457
        if _, err := tx.DeleteFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
2✔
458
                ShardID:    int64(shardID),
2✔
459
                DomainID:   domainID,
2✔
460
                WorkflowID: workflowID,
2✔
461
                RunID:      runID,
2✔
462
        }); err != nil {
2✔
463
                return convertCommonErrors(tx, "deleteChildExecutionInfoMap", "", err)
×
464
        }
×
465
        return nil
2✔
466
}
467

468
func updateRequestCancelInfos(
469
        ctx context.Context,
470
        tx sqlplugin.Tx,
471
        requestCancelInfos []*persistence.RequestCancelInfo,
472
        deleteInfos []int64,
473
        shardID int,
474
        domainID serialization.UUID,
475
        workflowID string,
476
        runID serialization.UUID,
477
        parser serialization.Parser,
478
) error {
3,341✔
479

3,341✔
480
        if len(requestCancelInfos) > 0 {
3,347✔
481
                rows := make([]sqlplugin.RequestCancelInfoMapsRow, len(requestCancelInfos))
6✔
482
                for i, requestCancelInfo := range requestCancelInfos {
12✔
483
                        blob, err := parser.RequestCancelInfoToBlob(&serialization.RequestCancelInfo{
6✔
484
                                Version:               requestCancelInfo.Version,
6✔
485
                                InitiatedEventBatchID: requestCancelInfo.InitiatedEventBatchID,
6✔
486
                                CancelRequestID:       requestCancelInfo.CancelRequestID,
6✔
487
                        })
6✔
488
                        if err != nil {
6✔
489
                                return err
×
490
                        }
×
491
                        rows[i] = sqlplugin.RequestCancelInfoMapsRow{
6✔
492
                                ShardID:      int64(shardID),
6✔
493
                                DomainID:     domainID,
6✔
494
                                WorkflowID:   workflowID,
6✔
495
                                RunID:        runID,
6✔
496
                                InitiatedID:  requestCancelInfo.InitiatedID,
6✔
497
                                Data:         blob.Data,
6✔
498
                                DataEncoding: string(blob.Encoding),
6✔
499
                        }
6✔
500
                }
501

502
                if _, err := tx.ReplaceIntoRequestCancelInfoMaps(ctx, rows); err != nil {
6✔
503
                        return convertCommonErrors(tx, "updateRequestCancelInfos", "Failed to execute update query.", err)
×
504
                }
×
505
        }
506

507
        if len(deleteInfos) > 0 {
3,347✔
508
                if _, err := tx.DeleteFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
6✔
509
                        ShardID:      int64(shardID),
6✔
510
                        DomainID:     domainID,
6✔
511
                        WorkflowID:   workflowID,
6✔
512
                        RunID:        runID,
6✔
513
                        InitiatedIDs: deleteInfos,
6✔
514
                }); err != nil {
6✔
515
                        return convertCommonErrors(tx, "updateRequestCancelInfos", "Failed to execute delete query.", err)
×
516
                }
×
517
        }
518

519
        return nil
3,341✔
520
}
521

522
func getRequestCancelInfoMap(
523
        ctx context.Context,
524
        db sqlplugin.DB,
525
        shardID int,
526
        domainID serialization.UUID,
527
        workflowID string,
528
        runID serialization.UUID,
529
        parser serialization.Parser,
530
) (map[int64]*persistence.RequestCancelInfo, error) {
756✔
531

756✔
532
        rows, err := db.SelectFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
756✔
533
                ShardID:    int64(shardID),
756✔
534
                DomainID:   domainID,
756✔
535
                WorkflowID: workflowID,
756✔
536
                RunID:      runID,
756✔
537
        })
756✔
538
        if err != nil && err != sql.ErrNoRows {
937✔
539
                return nil, convertCommonErrors(db, "getRequestCancelInfoMap", "", err)
181✔
540
        }
181✔
541

542
        ret := make(map[int64]*persistence.RequestCancelInfo)
577✔
543
        for _, row := range rows {
577✔
544
                rowInfo, err := parser.RequestCancelInfoFromBlob(row.Data, row.DataEncoding)
×
545
                if err != nil {
×
546
                        return nil, err
×
547
                }
×
548
                ret[row.InitiatedID] = &persistence.RequestCancelInfo{
×
549
                        Version:               rowInfo.GetVersion(),
×
550
                        InitiatedID:           row.InitiatedID,
×
551
                        InitiatedEventBatchID: rowInfo.GetInitiatedEventBatchID(),
×
552
                        CancelRequestID:       rowInfo.GetCancelRequestID(),
×
553
                }
×
554
        }
555

556
        return ret, nil
577✔
557
}
558

559
func deleteRequestCancelInfoMap(
560
        ctx context.Context,
561
        tx sqlplugin.Tx,
562
        shardID int,
563
        domainID serialization.UUID,
564
        workflowID string,
565
        runID serialization.UUID,
566
) error {
2✔
567

2✔
568
        if _, err := tx.DeleteFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
2✔
569
                ShardID:    int64(shardID),
2✔
570
                DomainID:   domainID,
2✔
571
                WorkflowID: workflowID,
2✔
572
                RunID:      runID,
2✔
573
        }); err != nil {
2✔
574
                return convertCommonErrors(tx, "deleteRequestCancelInfoMap", "", err)
×
575
        }
×
576
        return nil
2✔
577
}
578

579
func updateSignalInfos(
580
        ctx context.Context,
581
        tx sqlplugin.Tx,
582
        signalInfos []*persistence.SignalInfo,
583
        deleteInfos []int64,
584
        shardID int,
585
        domainID serialization.UUID,
586
        workflowID string,
587
        runID serialization.UUID,
588
        parser serialization.Parser,
589
) error {
3,341✔
590

3,341✔
591
        if len(signalInfos) > 0 {
3,351✔
592
                rows := make([]sqlplugin.SignalInfoMapsRow, len(signalInfos))
10✔
593
                for i, signalInfo := range signalInfos {
20✔
594
                        blob, err := parser.SignalInfoToBlob(&serialization.SignalInfo{
10✔
595
                                Version:               signalInfo.Version,
10✔
596
                                InitiatedEventBatchID: signalInfo.InitiatedEventBatchID,
10✔
597
                                RequestID:             signalInfo.SignalRequestID,
10✔
598
                                Name:                  signalInfo.SignalName,
10✔
599
                                Input:                 signalInfo.Input,
10✔
600
                                Control:               signalInfo.Control,
10✔
601
                        })
10✔
602
                        if err != nil {
10✔
603
                                return err
×
604
                        }
×
605
                        rows[i] = sqlplugin.SignalInfoMapsRow{
10✔
606
                                ShardID:      int64(shardID),
10✔
607
                                DomainID:     domainID,
10✔
608
                                WorkflowID:   workflowID,
10✔
609
                                RunID:        runID,
10✔
610
                                InitiatedID:  signalInfo.InitiatedID,
10✔
611
                                Data:         blob.Data,
10✔
612
                                DataEncoding: string(blob.Encoding),
10✔
613
                        }
10✔
614
                }
615

616
                if _, err := tx.ReplaceIntoSignalInfoMaps(ctx, rows); err != nil {
10✔
617
                        return convertCommonErrors(tx, "updateSignalInfos", "Failed to execute update query.", err)
×
618
                }
×
619
        }
620

621
        if len(deleteInfos) > 0 {
3,351✔
622
                if _, err := tx.DeleteFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
10✔
623
                        ShardID:      int64(shardID),
10✔
624
                        DomainID:     domainID,
10✔
625
                        WorkflowID:   workflowID,
10✔
626
                        RunID:        runID,
10✔
627
                        InitiatedIDs: deleteInfos,
10✔
628
                }); err != nil {
10✔
629
                        return convertCommonErrors(tx, "updateSignalInfos", "Failed to execute delete query.", err)
×
630
                }
×
631
        }
632

633
        return nil
3,341✔
634
}
635

636
func getSignalInfoMap(
637
        ctx context.Context,
638
        db sqlplugin.DB,
639
        shardID int,
640
        domainID serialization.UUID,
641
        workflowID string,
642
        runID serialization.UUID,
643
        parser serialization.Parser,
644
) (map[int64]*persistence.SignalInfo, error) {
756✔
645

756✔
646
        rows, err := db.SelectFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
756✔
647
                ShardID:    int64(shardID),
756✔
648
                DomainID:   domainID,
756✔
649
                WorkflowID: workflowID,
756✔
650
                RunID:      runID,
756✔
651
        })
756✔
652
        if err != nil && err != sql.ErrNoRows {
954✔
653
                return nil, convertCommonErrors(db, "getSignalInfoMap", "", err)
198✔
654
        }
198✔
655

656
        ret := make(map[int64]*persistence.SignalInfo)
560✔
657
        for _, row := range rows {
560✔
658
                rowInfo, err := parser.SignalInfoFromBlob(row.Data, row.DataEncoding)
×
659
                if err != nil {
×
660
                        return nil, err
×
661
                }
×
662
                ret[row.InitiatedID] = &persistence.SignalInfo{
×
663
                        Version:               rowInfo.GetVersion(),
×
664
                        InitiatedID:           row.InitiatedID,
×
665
                        InitiatedEventBatchID: rowInfo.GetInitiatedEventBatchID(),
×
666
                        SignalRequestID:       rowInfo.GetRequestID(),
×
667
                        SignalName:            rowInfo.GetName(),
×
668
                        Input:                 rowInfo.GetInput(),
×
669
                        Control:               rowInfo.GetControl(),
×
670
                }
×
671
        }
672

673
        return ret, nil
560✔
674
}
675

676
func deleteSignalInfoMap(
677
        ctx context.Context,
678
        tx sqlplugin.Tx,
679
        shardID int,
680
        domainID serialization.UUID,
681
        workflowID string,
682
        runID serialization.UUID,
683
) error {
2✔
684

2✔
685
        if _, err := tx.DeleteFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
2✔
686
                ShardID:    int64(shardID),
2✔
687
                DomainID:   domainID,
2✔
688
                WorkflowID: workflowID,
2✔
689
                RunID:      runID,
2✔
690
        }); err != nil {
2✔
691
                return convertCommonErrors(tx, "deleteSignalInfoMap", "", err)
×
692
        }
×
693
        return nil
2✔
694
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc