• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018df99b-a386-4814-ad28-09490f5042bf

01 Mar 2024 10:42AM UTC coverage: 62.868% (-0.03%) from 62.9%
018df99b-a386-4814-ad28-09490f5042bf

push

buildkite

web-flow
Fix the local integration test docker-compose file (#5695)

What changed?
Moved updates to the docker-compose file used in the CI to the local
docker-compose.

Why?
This enables us to run the integration tests locally in docker

How did you test it?
Local tests

Potential risks

Release notes

Documentation Changes

92923 of 147806 relevant lines covered (62.87%)

2352.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.62
/common/persistence/sql/workflowStateMaps.go
1
// Copyright (c) 2017-2020 Uber Technologies, Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package sql
23

24
import (
25
        "context"
26
        "database/sql"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/persistence"
30
        "github.com/uber/cadence/common/persistence/serialization"
31
        "github.com/uber/cadence/common/persistence/sql/sqlplugin"
32
        "github.com/uber/cadence/common/types"
33
)
34

35
func updateActivityInfos(
36
        ctx context.Context,
37
        tx sqlplugin.Tx,
38
        activityInfos []*persistence.InternalActivityInfo,
39
        deleteInfos []int64,
40
        shardID int,
41
        domainID serialization.UUID,
42
        workflowID string,
43
        runID serialization.UUID,
44
        parser serialization.Parser,
45
) error {
3,337✔
46

3,337✔
47
        if len(activityInfos) > 0 {
4,113✔
48
                rows := make([]sqlplugin.ActivityInfoMapsRow, len(activityInfos))
776✔
49
                for i, activityInfo := range activityInfos {
1,632✔
50
                        scheduledEvent, scheduledEncoding := persistence.FromDataBlob(activityInfo.ScheduledEvent)
856✔
51
                        startEvent, startEncoding := persistence.FromDataBlob(activityInfo.StartedEvent)
856✔
52

856✔
53
                        info := &serialization.ActivityInfo{
856✔
54
                                Version:                  activityInfo.Version,
856✔
55
                                ScheduledEventBatchID:    activityInfo.ScheduledEventBatchID,
856✔
56
                                ScheduledEvent:           scheduledEvent,
856✔
57
                                ScheduledEventEncoding:   scheduledEncoding,
856✔
58
                                ScheduledTimestamp:       activityInfo.ScheduledTime,
856✔
59
                                StartedID:                activityInfo.StartedID,
856✔
60
                                StartedEvent:             startEvent,
856✔
61
                                StartedEventEncoding:     startEncoding,
856✔
62
                                StartedTimestamp:         activityInfo.StartedTime,
856✔
63
                                ActivityID:               activityInfo.ActivityID,
856✔
64
                                RequestID:                activityInfo.RequestID,
856✔
65
                                ScheduleToStartTimeout:   activityInfo.ScheduleToStartTimeout,
856✔
66
                                ScheduleToCloseTimeout:   activityInfo.ScheduleToCloseTimeout,
856✔
67
                                StartToCloseTimeout:      activityInfo.StartToCloseTimeout,
856✔
68
                                HeartbeatTimeout:         activityInfo.HeartbeatTimeout,
856✔
69
                                CancelRequested:          activityInfo.CancelRequested,
856✔
70
                                CancelRequestID:          activityInfo.CancelRequestID,
856✔
71
                                TimerTaskStatus:          activityInfo.TimerTaskStatus,
856✔
72
                                Attempt:                  activityInfo.Attempt,
856✔
73
                                TaskList:                 activityInfo.TaskList,
856✔
74
                                StartedIdentity:          activityInfo.StartedIdentity,
856✔
75
                                HasRetryPolicy:           activityInfo.HasRetryPolicy,
856✔
76
                                RetryInitialInterval:     activityInfo.InitialInterval,
856✔
77
                                RetryBackoffCoefficient:  activityInfo.BackoffCoefficient,
856✔
78
                                RetryMaximumInterval:     activityInfo.MaximumInterval,
856✔
79
                                RetryExpirationTimestamp: activityInfo.ExpirationTime,
856✔
80
                                RetryMaximumAttempts:     activityInfo.MaximumAttempts,
856✔
81
                                RetryNonRetryableErrors:  activityInfo.NonRetriableErrors,
856✔
82
                                RetryLastFailureReason:   activityInfo.LastFailureReason,
856✔
83
                                RetryLastWorkerIdentity:  activityInfo.LastWorkerIdentity,
856✔
84
                                RetryLastFailureDetails:  activityInfo.LastFailureDetails,
856✔
85
                        }
856✔
86
                        blob, err := parser.ActivityInfoToBlob(info)
856✔
87
                        if err != nil {
856✔
88
                                return err
×
89
                        }
×
90
                        rows[i] = sqlplugin.ActivityInfoMapsRow{
856✔
91
                                ShardID:                  int64(shardID),
856✔
92
                                DomainID:                 domainID,
856✔
93
                                WorkflowID:               workflowID,
856✔
94
                                RunID:                    runID,
856✔
95
                                ScheduleID:               activityInfo.ScheduleID,
856✔
96
                                LastHeartbeatUpdatedTime: activityInfo.LastHeartBeatUpdatedTime,
856✔
97
                                LastHeartbeatDetails:     activityInfo.Details,
856✔
98
                                Data:                     blob.Data,
856✔
99
                                DataEncoding:             string(blob.Encoding),
856✔
100
                        }
856✔
101
                }
102

103
                if _, err := tx.ReplaceIntoActivityInfoMaps(ctx, rows); err != nil {
776✔
104
                        return convertCommonErrors(tx, "updateActivityInfos", "Failed to execute update query.", err)
×
105
                }
×
106
        }
107

108
        if len(deleteInfos) > 0 {
3,583✔
109
                if _, err := tx.DeleteFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
246✔
110
                        ShardID:     int64(shardID),
246✔
111
                        DomainID:    domainID,
246✔
112
                        WorkflowID:  workflowID,
246✔
113
                        RunID:       runID,
246✔
114
                        ScheduleIDs: deleteInfos,
246✔
115
                }); err != nil {
246✔
116
                        return convertCommonErrors(tx, "updateActivityInfos", "Failed to execute delete query.", err)
×
117
                }
×
118
        }
119

120
        return nil
3,337✔
121
}
122

123
func getActivityInfoMap(
124
        ctx context.Context,
125
        db sqlplugin.DB,
126
        shardID int,
127
        domainID serialization.UUID,
128
        workflowID string,
129
        runID serialization.UUID,
130
        parser serialization.Parser,
131
) (map[int64]*persistence.InternalActivityInfo, error) {
793✔
132

793✔
133
        rows, err := db.SelectFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
793✔
134
                ShardID:    int64(shardID),
793✔
135
                DomainID:   domainID,
793✔
136
                WorkflowID: workflowID,
793✔
137
                RunID:      runID,
793✔
138
        })
793✔
139
        if err != nil && err != sql.ErrNoRows {
890✔
140
                return nil, convertCommonErrors(db, "getActivityInfoMap", "", err)
97✔
141
        }
97✔
142

143
        ret := make(map[int64]*persistence.InternalActivityInfo)
698✔
144
        for _, row := range rows {
758✔
145
                decoded, err := parser.ActivityInfoFromBlob(row.Data, row.DataEncoding)
60✔
146
                if err != nil {
60✔
147
                        return nil, err
×
148
                }
×
149
                info := &persistence.InternalActivityInfo{
60✔
150
                        DomainID:                 row.DomainID.String(),
60✔
151
                        ScheduleID:               row.ScheduleID,
60✔
152
                        Details:                  row.LastHeartbeatDetails,
60✔
153
                        LastHeartBeatUpdatedTime: row.LastHeartbeatUpdatedTime,
60✔
154
                        Version:                  decoded.GetVersion(),
60✔
155
                        ScheduledEventBatchID:    decoded.GetScheduledEventBatchID(),
60✔
156
                        ScheduledEvent:           persistence.NewDataBlob(decoded.ScheduledEvent, common.EncodingType(decoded.GetScheduledEventEncoding())),
60✔
157
                        ScheduledTime:            decoded.GetScheduledTimestamp(),
60✔
158
                        StartedID:                decoded.GetStartedID(),
60✔
159
                        StartedTime:              decoded.GetStartedTimestamp(),
60✔
160
                        ActivityID:               decoded.GetActivityID(),
60✔
161
                        RequestID:                decoded.GetRequestID(),
60✔
162
                        ScheduleToStartTimeout:   decoded.GetScheduleToStartTimeout(),
60✔
163
                        ScheduleToCloseTimeout:   decoded.GetScheduleToCloseTimeout(),
60✔
164
                        StartToCloseTimeout:      decoded.GetStartToCloseTimeout(),
60✔
165
                        HeartbeatTimeout:         decoded.GetHeartbeatTimeout(),
60✔
166
                        CancelRequested:          decoded.GetCancelRequested(),
60✔
167
                        CancelRequestID:          decoded.GetCancelRequestID(),
60✔
168
                        TimerTaskStatus:          decoded.GetTimerTaskStatus(),
60✔
169
                        Attempt:                  decoded.GetAttempt(),
60✔
170
                        StartedIdentity:          decoded.GetStartedIdentity(),
60✔
171
                        TaskList:                 decoded.GetTaskList(),
60✔
172
                        HasRetryPolicy:           decoded.GetHasRetryPolicy(),
60✔
173
                        InitialInterval:          decoded.GetRetryInitialInterval(),
60✔
174
                        BackoffCoefficient:       decoded.GetRetryBackoffCoefficient(),
60✔
175
                        MaximumInterval:          decoded.GetRetryMaximumInterval(),
60✔
176
                        ExpirationTime:           decoded.GetRetryExpirationTimestamp(),
60✔
177
                        MaximumAttempts:          decoded.GetRetryMaximumAttempts(),
60✔
178
                        NonRetriableErrors:       decoded.GetRetryNonRetryableErrors(),
60✔
179
                        LastFailureReason:        decoded.GetRetryLastFailureReason(),
60✔
180
                        LastWorkerIdentity:       decoded.GetRetryLastWorkerIdentity(),
60✔
181
                        LastFailureDetails:       decoded.GetRetryLastFailureDetails(),
60✔
182
                }
60✔
183
                if decoded.StartedEvent != nil {
60✔
184
                        info.StartedEvent = persistence.NewDataBlob(decoded.StartedEvent, common.EncodingType(decoded.GetStartedEventEncoding()))
×
185
                }
×
186
                ret[row.ScheduleID] = info
60✔
187
        }
188

189
        return ret, nil
698✔
190
}
191

192
func deleteActivityInfoMap(
193
        ctx context.Context,
194
        tx sqlplugin.Tx,
195
        shardID int,
196
        domainID serialization.UUID,
197
        workflowID string,
198
        runID serialization.UUID,
199
) error {
3✔
200

3✔
201
        if _, err := tx.DeleteFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
3✔
202
                ShardID:    int64(shardID),
3✔
203
                DomainID:   domainID,
3✔
204
                WorkflowID: workflowID,
3✔
205
                RunID:      runID,
3✔
206
        }); err != nil {
3✔
207
                return convertCommonErrors(tx, "deleteActivityInfoMap", "", err)
×
208
        }
×
209
        return nil
3✔
210
}
211

212
func updateTimerInfos(
213
        ctx context.Context,
214
        tx sqlplugin.Tx,
215
        timerInfos []*persistence.TimerInfo,
216
        deleteInfos []string,
217
        shardID int,
218
        domainID serialization.UUID,
219
        workflowID string,
220
        runID serialization.UUID,
221
        parser serialization.Parser,
222
) error {
3,337✔
223

3,337✔
224
        if len(timerInfos) > 0 {
3,362✔
225
                rows := make([]sqlplugin.TimerInfoMapsRow, len(timerInfos))
25✔
226
                for i, timerInfo := range timerInfos {
50✔
227
                        blob, err := parser.TimerInfoToBlob(&serialization.TimerInfo{
25✔
228
                                Version:         timerInfo.Version,
25✔
229
                                StartedID:       timerInfo.StartedID,
25✔
230
                                ExpiryTimestamp: timerInfo.ExpiryTime,
25✔
231
                                // TaskID is a misleading variable, it actually serves
25✔
232
                                // the purpose of indicating whether a timer task is
25✔
233
                                // generated for this timer info
25✔
234
                                TaskID: timerInfo.TaskStatus,
25✔
235
                        })
25✔
236
                        if err != nil {
25✔
237
                                return err
×
238
                        }
×
239
                        rows[i] = sqlplugin.TimerInfoMapsRow{
25✔
240
                                ShardID:      int64(shardID),
25✔
241
                                DomainID:     domainID,
25✔
242
                                WorkflowID:   workflowID,
25✔
243
                                RunID:        runID,
25✔
244
                                TimerID:      timerInfo.TimerID,
25✔
245
                                Data:         blob.Data,
25✔
246
                                DataEncoding: string(blob.Encoding),
25✔
247
                        }
25✔
248
                }
249
                if _, err := tx.ReplaceIntoTimerInfoMaps(ctx, rows); err != nil {
25✔
250
                        return convertCommonErrors(tx, "updateTimerInfos", "Failed to execute update query.", err)
×
251
                }
×
252
        }
253

254
        if len(deleteInfos) > 0 {
3,352✔
255
                if _, err := tx.DeleteFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
15✔
256
                        ShardID:    int64(shardID),
15✔
257
                        DomainID:   domainID,
15✔
258
                        WorkflowID: workflowID,
15✔
259
                        RunID:      runID,
15✔
260
                        TimerIDs:   deleteInfos,
15✔
261
                }); err != nil {
15✔
262
                        return convertCommonErrors(tx, "updateTimerInfos", "Failed to execute delete query.", err)
×
263
                }
×
264
        }
265

266
        return nil
3,337✔
267
}
268

269
func getTimerInfoMap(
270
        ctx context.Context,
271
        db sqlplugin.DB,
272
        shardID int,
273
        domainID serialization.UUID,
274
        workflowID string,
275
        runID serialization.UUID,
276
        parser serialization.Parser,
277
) (map[string]*persistence.TimerInfo, error) {
793✔
278

793✔
279
        rows, err := db.SelectFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
793✔
280
                ShardID:    int64(shardID),
793✔
281
                DomainID:   domainID,
793✔
282
                WorkflowID: workflowID,
793✔
283
                RunID:      runID,
793✔
284
        })
793✔
285
        if err != nil && err != sql.ErrNoRows {
895✔
286
                return nil, convertCommonErrors(db, "getTimerInfoMap", "", err)
102✔
287
        }
102✔
288
        ret := make(map[string]*persistence.TimerInfo)
693✔
289
        for _, row := range rows {
693✔
290
                info, err := parser.TimerInfoFromBlob(row.Data, row.DataEncoding)
×
291
                if err != nil {
×
292
                        return nil, err
×
293
                }
×
294
                ret[row.TimerID] = &persistence.TimerInfo{
×
295
                        TimerID:    row.TimerID,
×
296
                        Version:    info.GetVersion(),
×
297
                        StartedID:  info.GetStartedID(),
×
298
                        ExpiryTime: info.GetExpiryTimestamp(),
×
299
                        // TaskID is a misleading variable, it actually serves
×
300
                        // the purpose of indicating whether a timer task is
×
301
                        // generated for this timer info
×
302
                        TaskStatus: info.GetTaskID(),
×
303
                }
×
304
        }
305

306
        return ret, nil
693✔
307
}
308

309
func deleteTimerInfoMap(
310
        ctx context.Context,
311
        tx sqlplugin.Tx,
312
        shardID int,
313
        domainID serialization.UUID,
314
        workflowID string,
315
        runID serialization.UUID,
316
) error {
3✔
317

3✔
318
        if _, err := tx.DeleteFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
3✔
319
                ShardID:    int64(shardID),
3✔
320
                DomainID:   domainID,
3✔
321
                WorkflowID: workflowID,
3✔
322
                RunID:      runID,
3✔
323
        }); err != nil {
3✔
324
                return convertCommonErrors(tx, "deleteTimerInfoMap", "", err)
×
325
        }
×
326
        return nil
3✔
327
}
328

329
func updateChildExecutionInfos(
330
        ctx context.Context,
331
        tx sqlplugin.Tx,
332
        childExecutionInfos []*persistence.InternalChildExecutionInfo,
333
        deleteInfos []int64,
334
        shardID int,
335
        domainID serialization.UUID,
336
        workflowID string,
337
        runID serialization.UUID,
338
        parser serialization.Parser,
339
) error {
3,337✔
340

3,337✔
341
        if len(childExecutionInfos) > 0 {
3,366✔
342
                rows := make([]sqlplugin.ChildExecutionInfoMapsRow, len(childExecutionInfos))
29✔
343
                for i, childExecutionInfo := range childExecutionInfos {
58✔
344
                        initiateEvent, initiateEncoding := persistence.FromDataBlob(childExecutionInfo.InitiatedEvent)
29✔
345
                        startEvent, startEncoding := persistence.FromDataBlob(childExecutionInfo.StartedEvent)
29✔
346

29✔
347
                        info := &serialization.ChildExecutionInfo{
29✔
348
                                Version:                childExecutionInfo.Version,
29✔
349
                                InitiatedEventBatchID:  childExecutionInfo.InitiatedEventBatchID,
29✔
350
                                InitiatedEvent:         initiateEvent,
29✔
351
                                InitiatedEventEncoding: initiateEncoding,
29✔
352
                                StartedEvent:           startEvent,
29✔
353
                                StartedEventEncoding:   startEncoding,
29✔
354
                                StartedID:              childExecutionInfo.StartedID,
29✔
355
                                StartedWorkflowID:      childExecutionInfo.StartedWorkflowID,
29✔
356
                                StartedRunID:           serialization.MustParseUUID(childExecutionInfo.StartedRunID),
29✔
357
                                CreateRequestID:        childExecutionInfo.CreateRequestID,
29✔
358
                                DomainID:               childExecutionInfo.DomainID,
29✔
359
                                DomainNameDEPRECATED:   childExecutionInfo.DomainNameDEPRECATED,
29✔
360
                                WorkflowTypeName:       childExecutionInfo.WorkflowTypeName,
29✔
361
                                ParentClosePolicy:      int32(childExecutionInfo.ParentClosePolicy),
29✔
362
                        }
29✔
363
                        blob, err := parser.ChildExecutionInfoToBlob(info)
29✔
364
                        if err != nil {
29✔
365
                                return err
×
366
                        }
×
367
                        rows[i] = sqlplugin.ChildExecutionInfoMapsRow{
29✔
368
                                ShardID:      int64(shardID),
29✔
369
                                DomainID:     domainID,
29✔
370
                                WorkflowID:   workflowID,
29✔
371
                                RunID:        runID,
29✔
372
                                InitiatedID:  childExecutionInfo.InitiatedID,
29✔
373
                                Data:         blob.Data,
29✔
374
                                DataEncoding: string(blob.Encoding),
29✔
375
                        }
29✔
376
                }
377
                if _, err := tx.ReplaceIntoChildExecutionInfoMaps(ctx, rows); err != nil {
29✔
378
                        return convertCommonErrors(tx, "updateChildExecutionInfos", "Failed to execute update query.", err)
×
379
                }
×
380
        }
381

382
        if len(deleteInfos) > 0 {
3,350✔
383
                if _, err := tx.DeleteFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
13✔
384
                        ShardID:      int64(shardID),
13✔
385
                        DomainID:     domainID,
13✔
386
                        WorkflowID:   workflowID,
13✔
387
                        RunID:        runID,
13✔
388
                        InitiatedIDs: deleteInfos,
13✔
389
                }); err != nil {
13✔
390
                        return convertCommonErrors(tx, "updateChildExecutionInfos", "Failed to execute delete query.", err)
×
391
                }
×
392
        }
393

394
        return nil
3,337✔
395
}
396

397
func getChildExecutionInfoMap(
398
        ctx context.Context,
399
        db sqlplugin.DB,
400
        shardID int,
401
        domainID serialization.UUID,
402
        workflowID string,
403
        runID serialization.UUID,
404
        parser serialization.Parser,
405
) (map[int64]*persistence.InternalChildExecutionInfo, error) {
793✔
406

793✔
407
        rows, err := db.SelectFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
793✔
408
                ShardID:    int64(shardID),
793✔
409
                DomainID:   domainID,
793✔
410
                WorkflowID: workflowID,
793✔
411
                RunID:      runID,
793✔
412
        })
793✔
413
        if err != nil && err != sql.ErrNoRows {
900✔
414
                return nil, convertCommonErrors(db, "getChildExecutionInfoMap", "", err)
107✔
415
        }
107✔
416

417
        ret := make(map[int64]*persistence.InternalChildExecutionInfo)
688✔
418
        for _, row := range rows {
690✔
419
                rowInfo, err := parser.ChildExecutionInfoFromBlob(row.Data, row.DataEncoding)
2✔
420
                if err != nil {
2✔
421
                        return nil, err
×
422
                }
×
423
                info := &persistence.InternalChildExecutionInfo{
2✔
424
                        InitiatedID:           row.InitiatedID,
2✔
425
                        InitiatedEventBatchID: rowInfo.GetInitiatedEventBatchID(),
2✔
426
                        Version:               rowInfo.GetVersion(),
2✔
427
                        StartedID:             rowInfo.GetStartedID(),
2✔
428
                        StartedWorkflowID:     rowInfo.GetStartedWorkflowID(),
2✔
429
                        StartedRunID:          serialization.UUID(rowInfo.GetStartedRunID()).String(),
2✔
430
                        CreateRequestID:       rowInfo.GetCreateRequestID(),
2✔
431
                        DomainID:              rowInfo.GetDomainID(),
2✔
432
                        DomainNameDEPRECATED:  rowInfo.GetDomainNameDEPRECATED(),
2✔
433
                        WorkflowTypeName:      rowInfo.GetWorkflowTypeName(),
2✔
434
                        ParentClosePolicy:     types.ParentClosePolicy(rowInfo.GetParentClosePolicy()),
2✔
435
                }
2✔
436
                if rowInfo.InitiatedEvent != nil {
2✔
437
                        info.InitiatedEvent = persistence.NewDataBlob(rowInfo.InitiatedEvent, common.EncodingType(rowInfo.GetInitiatedEventEncoding()))
×
438
                }
×
439
                if rowInfo.StartedEvent != nil {
2✔
440
                        info.StartedEvent = persistence.NewDataBlob(rowInfo.StartedEvent, common.EncodingType(rowInfo.GetStartedEventEncoding()))
×
441
                }
×
442
                ret[row.InitiatedID] = info
2✔
443
        }
444

445
        return ret, nil
688✔
446
}
447

448
func deleteChildExecutionInfoMap(
449
        ctx context.Context,
450
        tx sqlplugin.Tx,
451
        shardID int,
452
        domainID serialization.UUID,
453
        workflowID string,
454
        runID serialization.UUID,
455
) error {
3✔
456

3✔
457
        if _, err := tx.DeleteFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
3✔
458
                ShardID:    int64(shardID),
3✔
459
                DomainID:   domainID,
3✔
460
                WorkflowID: workflowID,
3✔
461
                RunID:      runID,
3✔
462
        }); err != nil {
3✔
463
                return convertCommonErrors(tx, "deleteChildExecutionInfoMap", "", err)
×
464
        }
×
465
        return nil
3✔
466
}
467

468
func updateRequestCancelInfos(
469
        ctx context.Context,
470
        tx sqlplugin.Tx,
471
        requestCancelInfos []*persistence.RequestCancelInfo,
472
        deleteInfos []int64,
473
        shardID int,
474
        domainID serialization.UUID,
475
        workflowID string,
476
        runID serialization.UUID,
477
        parser serialization.Parser,
478
) error {
3,337✔
479

3,337✔
480
        if len(requestCancelInfos) > 0 {
3,346✔
481
                rows := make([]sqlplugin.RequestCancelInfoMapsRow, len(requestCancelInfos))
9✔
482
                for i, requestCancelInfo := range requestCancelInfos {
18✔
483
                        blob, err := parser.RequestCancelInfoToBlob(&serialization.RequestCancelInfo{
9✔
484
                                Version:               requestCancelInfo.Version,
9✔
485
                                InitiatedEventBatchID: requestCancelInfo.InitiatedEventBatchID,
9✔
486
                                CancelRequestID:       requestCancelInfo.CancelRequestID,
9✔
487
                        })
9✔
488
                        if err != nil {
9✔
489
                                return err
×
490
                        }
×
491
                        rows[i] = sqlplugin.RequestCancelInfoMapsRow{
9✔
492
                                ShardID:      int64(shardID),
9✔
493
                                DomainID:     domainID,
9✔
494
                                WorkflowID:   workflowID,
9✔
495
                                RunID:        runID,
9✔
496
                                InitiatedID:  requestCancelInfo.InitiatedID,
9✔
497
                                Data:         blob.Data,
9✔
498
                                DataEncoding: string(blob.Encoding),
9✔
499
                        }
9✔
500
                }
501

502
                if _, err := tx.ReplaceIntoRequestCancelInfoMaps(ctx, rows); err != nil {
9✔
503
                        return convertCommonErrors(tx, "updateRequestCancelInfos", "Failed to execute update query.", err)
×
504
                }
×
505
        }
506

507
        if len(deleteInfos) > 0 {
3,344✔
508
                if _, err := tx.DeleteFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
7✔
509
                        ShardID:      int64(shardID),
7✔
510
                        DomainID:     domainID,
7✔
511
                        WorkflowID:   workflowID,
7✔
512
                        RunID:        runID,
7✔
513
                        InitiatedIDs: deleteInfos,
7✔
514
                }); err != nil {
7✔
515
                        return convertCommonErrors(tx, "updateRequestCancelInfos", "Failed to execute delete query.", err)
×
516
                }
×
517
        }
518

519
        return nil
3,337✔
520
}
521

522
func getRequestCancelInfoMap(
523
        ctx context.Context,
524
        db sqlplugin.DB,
525
        shardID int,
526
        domainID serialization.UUID,
527
        workflowID string,
528
        runID serialization.UUID,
529
        parser serialization.Parser,
530
) (map[int64]*persistence.RequestCancelInfo, error) {
793✔
531

793✔
532
        rows, err := db.SelectFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
793✔
533
                ShardID:    int64(shardID),
793✔
534
                DomainID:   domainID,
793✔
535
                WorkflowID: workflowID,
793✔
536
                RunID:      runID,
793✔
537
        })
793✔
538
        if err != nil && err != sql.ErrNoRows {
889✔
539
                return nil, convertCommonErrors(db, "getRequestCancelInfoMap", "", err)
96✔
540
        }
96✔
541

542
        ret := make(map[int64]*persistence.RequestCancelInfo)
698✔
543
        for _, row := range rows {
698✔
544
                rowInfo, err := parser.RequestCancelInfoFromBlob(row.Data, row.DataEncoding)
×
545
                if err != nil {
×
546
                        return nil, err
×
547
                }
×
548
                ret[row.InitiatedID] = &persistence.RequestCancelInfo{
×
549
                        Version:               rowInfo.GetVersion(),
×
550
                        InitiatedID:           row.InitiatedID,
×
551
                        InitiatedEventBatchID: rowInfo.GetInitiatedEventBatchID(),
×
552
                        CancelRequestID:       rowInfo.GetCancelRequestID(),
×
553
                }
×
554
        }
555

556
        return ret, nil
698✔
557
}
558

559
func deleteRequestCancelInfoMap(
560
        ctx context.Context,
561
        tx sqlplugin.Tx,
562
        shardID int,
563
        domainID serialization.UUID,
564
        workflowID string,
565
        runID serialization.UUID,
566
) error {
3✔
567

3✔
568
        if _, err := tx.DeleteFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
3✔
569
                ShardID:    int64(shardID),
3✔
570
                DomainID:   domainID,
3✔
571
                WorkflowID: workflowID,
3✔
572
                RunID:      runID,
3✔
573
        }); err != nil {
3✔
574
                return convertCommonErrors(tx, "deleteRequestCancelInfoMap", "", err)
×
575
        }
×
576
        return nil
3✔
577
}
578

579
func updateSignalInfos(
580
        ctx context.Context,
581
        tx sqlplugin.Tx,
582
        signalInfos []*persistence.SignalInfo,
583
        deleteInfos []int64,
584
        shardID int,
585
        domainID serialization.UUID,
586
        workflowID string,
587
        runID serialization.UUID,
588
        parser serialization.Parser,
589
) error {
3,337✔
590

3,337✔
591
        if len(signalInfos) > 0 {
3,350✔
592
                rows := make([]sqlplugin.SignalInfoMapsRow, len(signalInfos))
13✔
593
                for i, signalInfo := range signalInfos {
26✔
594
                        blob, err := parser.SignalInfoToBlob(&serialization.SignalInfo{
13✔
595
                                Version:               signalInfo.Version,
13✔
596
                                InitiatedEventBatchID: signalInfo.InitiatedEventBatchID,
13✔
597
                                RequestID:             signalInfo.SignalRequestID,
13✔
598
                                Name:                  signalInfo.SignalName,
13✔
599
                                Input:                 signalInfo.Input,
13✔
600
                                Control:               signalInfo.Control,
13✔
601
                        })
13✔
602
                        if err != nil {
13✔
603
                                return err
×
604
                        }
×
605
                        rows[i] = sqlplugin.SignalInfoMapsRow{
13✔
606
                                ShardID:      int64(shardID),
13✔
607
                                DomainID:     domainID,
13✔
608
                                WorkflowID:   workflowID,
13✔
609
                                RunID:        runID,
13✔
610
                                InitiatedID:  signalInfo.InitiatedID,
13✔
611
                                Data:         blob.Data,
13✔
612
                                DataEncoding: string(blob.Encoding),
13✔
613
                        }
13✔
614
                }
615

616
                if _, err := tx.ReplaceIntoSignalInfoMaps(ctx, rows); err != nil {
13✔
617
                        return convertCommonErrors(tx, "updateSignalInfos", "Failed to execute update query.", err)
×
618
                }
×
619
        }
620

621
        if len(deleteInfos) > 0 {
3,348✔
622
                if _, err := tx.DeleteFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
11✔
623
                        ShardID:      int64(shardID),
11✔
624
                        DomainID:     domainID,
11✔
625
                        WorkflowID:   workflowID,
11✔
626
                        RunID:        runID,
11✔
627
                        InitiatedIDs: deleteInfos,
11✔
628
                }); err != nil {
11✔
629
                        return convertCommonErrors(tx, "updateSignalInfos", "Failed to execute delete query.", err)
×
630
                }
×
631
        }
632

633
        return nil
3,337✔
634
}
635

636
func getSignalInfoMap(
637
        ctx context.Context,
638
        db sqlplugin.DB,
639
        shardID int,
640
        domainID serialization.UUID,
641
        workflowID string,
642
        runID serialization.UUID,
643
        parser serialization.Parser,
644
) (map[int64]*persistence.SignalInfo, error) {
793✔
645

793✔
646
        rows, err := db.SelectFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
793✔
647
                ShardID:    int64(shardID),
793✔
648
                DomainID:   domainID,
793✔
649
                WorkflowID: workflowID,
793✔
650
                RunID:      runID,
793✔
651
        })
793✔
652
        if err != nil && err != sql.ErrNoRows {
898✔
653
                return nil, convertCommonErrors(db, "getSignalInfoMap", "", err)
105✔
654
        }
105✔
655

656
        ret := make(map[int64]*persistence.SignalInfo)
690✔
657
        for _, row := range rows {
692✔
658
                rowInfo, err := parser.SignalInfoFromBlob(row.Data, row.DataEncoding)
2✔
659
                if err != nil {
2✔
660
                        return nil, err
×
661
                }
×
662
                ret[row.InitiatedID] = &persistence.SignalInfo{
2✔
663
                        Version:               rowInfo.GetVersion(),
2✔
664
                        InitiatedID:           row.InitiatedID,
2✔
665
                        InitiatedEventBatchID: rowInfo.GetInitiatedEventBatchID(),
2✔
666
                        SignalRequestID:       rowInfo.GetRequestID(),
2✔
667
                        SignalName:            rowInfo.GetName(),
2✔
668
                        Input:                 rowInfo.GetInput(),
2✔
669
                        Control:               rowInfo.GetControl(),
2✔
670
                }
2✔
671
        }
672

673
        return ret, nil
690✔
674
}
675

676
func deleteSignalInfoMap(
677
        ctx context.Context,
678
        tx sqlplugin.Tx,
679
        shardID int,
680
        domainID serialization.UUID,
681
        workflowID string,
682
        runID serialization.UUID,
683
) error {
3✔
684

3✔
685
        if _, err := tx.DeleteFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
3✔
686
                ShardID:    int64(shardID),
3✔
687
                DomainID:   domainID,
3✔
688
                WorkflowID: workflowID,
3✔
689
                RunID:      runID,
3✔
690
        }); err != nil {
3✔
691
                return convertCommonErrors(tx, "deleteSignalInfoMap", "", err)
×
692
        }
×
693
        return nil
3✔
694
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc