• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018d9fa8-75f8-405b-9b1e-38f93e6b0a11

12 Feb 2024 11:30PM UTC coverage: 62.748% (+0.05%) from 62.701%
018d9fa8-75f8-405b-9b1e-38f93e6b0a11

Pull #5657

buildkite

Shaddoll
Implement SignalWithStartWorkflowExecutionAsync API
Pull Request #5657: Implement SignalWithStartWorkflowExecutionAsync API

96 of 142 new or added lines in 5 files covered. (67.61%)

60 existing lines in 8 files now uncovered.

92596 of 147569 relevant lines covered (62.75%)

2318.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.69
/common/util.go
1
// Copyright (c) 2017 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package common
22

23
import (
24
        "context"
25
        "encoding/json"
26
        "errors"
27
        "fmt"
28
        "math"
29
        "math/rand"
30
        "sort"
31
        "strconv"
32
        "strings"
33
        "sync"
34
        "time"
35

36
        "github.com/dgryski/go-farm"
37
        "github.com/pborman/uuid"
38
        "go.uber.org/yarpc/yarpcerrors"
39

40
        "github.com/uber/cadence/common/backoff"
41
        "github.com/uber/cadence/common/log"
42
        "github.com/uber/cadence/common/log/tag"
43
        "github.com/uber/cadence/common/metrics"
44
        "github.com/uber/cadence/common/types"
45
)
46

47
const (
48
        golandMapReserverNumberOfBytes = 48
49

50
        retryPersistenceOperationInitialInterval    = 50 * time.Millisecond
51
        retryPersistenceOperationMaxInterval        = 10 * time.Second
52
        retryPersistenceOperationExpirationInterval = 30 * time.Second
53

54
        historyServiceOperationInitialInterval    = 50 * time.Millisecond
55
        historyServiceOperationMaxInterval        = 10 * time.Second
56
        historyServiceOperationExpirationInterval = 30 * time.Second
57

58
        matchingServiceOperationInitialInterval    = 1000 * time.Millisecond
59
        matchingServiceOperationMaxInterval        = 10 * time.Second
60
        matchingServiceOperationExpirationInterval = 30 * time.Second
61

62
        frontendServiceOperationInitialInterval    = 200 * time.Millisecond
63
        frontendServiceOperationMaxInterval        = 5 * time.Second
64
        frontendServiceOperationExpirationInterval = 15 * time.Second
65

66
        adminServiceOperationInitialInterval    = 200 * time.Millisecond
67
        adminServiceOperationMaxInterval        = 5 * time.Second
68
        adminServiceOperationExpirationInterval = 15 * time.Second
69

70
        retryKafkaOperationInitialInterval = 50 * time.Millisecond
71
        retryKafkaOperationMaxInterval     = 10 * time.Second
72
        retryKafkaOperationMaxAttempts     = 10
73

74
        retryTaskProcessingInitialInterval = 50 * time.Millisecond
75
        retryTaskProcessingMaxInterval     = 100 * time.Millisecond
76
        retryTaskProcessingMaxAttempts     = 3
77

78
        replicationServiceBusyInitialInterval    = 2 * time.Second
79
        replicationServiceBusyMaxInterval        = 10 * time.Second
80
        replicationServiceBusyExpirationInterval = 5 * time.Minute
81

82
        contextExpireThreshold = 10 * time.Millisecond
83

84
        // FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit
85
        FailureReasonCompleteResultExceedsLimit = "COMPLETE_RESULT_EXCEEDS_LIMIT"
86
        // FailureReasonFailureDetailsExceedsLimit is failureReason for failure details exceeds limit
87
        FailureReasonFailureDetailsExceedsLimit = "FAILURE_DETAILS_EXCEEDS_LIMIT"
88
        // FailureReasonCancelDetailsExceedsLimit is failureReason for cancel details exceeds limit
89
        FailureReasonCancelDetailsExceedsLimit = "CANCEL_DETAILS_EXCEEDS_LIMIT"
90
        // FailureReasonHeartbeatExceedsLimit is failureReason for heartbeat exceeds limit
91
        FailureReasonHeartbeatExceedsLimit = "HEARTBEAT_EXCEEDS_LIMIT"
92
        // FailureReasonDecisionBlobSizeExceedsLimit is the failureReason for decision blob exceeds size limit
93
        FailureReasonDecisionBlobSizeExceedsLimit = "DECISION_BLOB_SIZE_EXCEEDS_LIMIT"
94
        // FailureReasonSizeExceedsLimit is reason to fail workflow when history size or count exceed limit
95
        FailureReasonSizeExceedsLimit = "HISTORY_EXCEEDS_LIMIT"
96
        // FailureReasonTransactionSizeExceedsLimit is the failureReason for when transaction cannot be committed because it exceeds size limit
97
        FailureReasonTransactionSizeExceedsLimit = "TRANSACTION_SIZE_EXCEEDS_LIMIT"
98
        // FailureReasonDecisionAttemptsExceedsLimit is reason to fail workflow when decision attempts fail too many times
99
        FailureReasonDecisionAttemptsExceedsLimit = "DECISION_ATTEMPTS_EXCEEDS_LIMIT"
100
)
101

102
var (
103
        // ErrBlobSizeExceedsLimit is error for event blob size exceeds limit
104
        ErrBlobSizeExceedsLimit = &types.BadRequestError{Message: "Blob data size exceeds limit."}
105
        // ErrContextTimeoutTooShort is error for setting a very short context timeout when calling a long poll API
106
        ErrContextTimeoutTooShort = &types.BadRequestError{Message: "Context timeout is too short."}
107
        // ErrContextTimeoutNotSet is error for not setting a context timeout when calling a long poll API
108
        ErrContextTimeoutNotSet = &types.BadRequestError{Message: "Context timeout is not set."}
109
        // ErrDecisionResultCountTooLarge error for decision result count exceeds limit
110
        ErrDecisionResultCountTooLarge = &types.BadRequestError{Message: "Decision result count exceeds limit."}
111
)
112

113
// AwaitWaitGroup calls Wait on the given wait
114
// Returns true if the Wait() call succeeded before the timeout
115
// Returns false if the Wait() did not return before the timeout
116
func AwaitWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool {
629✔
117
        doneC := make(chan struct{})
629✔
118

629✔
119
        go func() {
1,258✔
120
                wg.Wait()
629✔
121
                close(doneC)
629✔
122
        }()
629✔
123

124
        select {
629✔
125
        case <-doneC:
628✔
126
                return true
628✔
127
        case <-time.After(timeout):
1✔
128
                return false
1✔
129
        }
130
}
131

132
// CreatePersistenceRetryPolicy creates a retry policy for persistence layer operations
133
func CreatePersistenceRetryPolicy() backoff.RetryPolicy {
9,912✔
134
        policy := backoff.NewExponentialRetryPolicy(retryPersistenceOperationInitialInterval)
9,912✔
135
        policy.SetMaximumInterval(retryPersistenceOperationMaxInterval)
9,912✔
136
        policy.SetExpirationInterval(retryPersistenceOperationExpirationInterval)
9,912✔
137

9,912✔
138
        return policy
9,912✔
139
}
9,912✔
140

141
// CreateHistoryServiceRetryPolicy creates a retry policy for calls to history service
142
func CreateHistoryServiceRetryPolicy() backoff.RetryPolicy {
43✔
143
        policy := backoff.NewExponentialRetryPolicy(historyServiceOperationInitialInterval)
43✔
144
        policy.SetMaximumInterval(historyServiceOperationMaxInterval)
43✔
145
        policy.SetExpirationInterval(historyServiceOperationExpirationInterval)
43✔
146

43✔
147
        return policy
43✔
148
}
43✔
149

150
// CreateMatchingServiceRetryPolicy creates a retry policy for calls to matching service
151
func CreateMatchingServiceRetryPolicy() backoff.RetryPolicy {
40✔
152
        policy := backoff.NewExponentialRetryPolicy(matchingServiceOperationInitialInterval)
40✔
153
        policy.SetMaximumInterval(matchingServiceOperationMaxInterval)
40✔
154
        policy.SetExpirationInterval(matchingServiceOperationExpirationInterval)
40✔
155

40✔
156
        return policy
40✔
157
}
40✔
158

159
// CreateFrontendServiceRetryPolicy creates a retry policy for calls to frontend service
160
func CreateFrontendServiceRetryPolicy() backoff.RetryPolicy {
43✔
161
        policy := backoff.NewExponentialRetryPolicy(frontendServiceOperationInitialInterval)
43✔
162
        policy.SetMaximumInterval(frontendServiceOperationMaxInterval)
43✔
163
        policy.SetExpirationInterval(frontendServiceOperationExpirationInterval)
43✔
164

43✔
165
        return policy
43✔
166
}
43✔
167

168
// CreateAdminServiceRetryPolicy creates a retry policy for calls to matching service
169
func CreateAdminServiceRetryPolicy() backoff.RetryPolicy {
7✔
170
        policy := backoff.NewExponentialRetryPolicy(adminServiceOperationInitialInterval)
7✔
171
        policy.SetMaximumInterval(adminServiceOperationMaxInterval)
7✔
172
        policy.SetExpirationInterval(adminServiceOperationExpirationInterval)
7✔
173

7✔
174
        return policy
7✔
175
}
7✔
176

177
// CreateDlqPublishRetryPolicy creates a retry policy for kafka operation
178
func CreateDlqPublishRetryPolicy() backoff.RetryPolicy {
4✔
179
        policy := backoff.NewExponentialRetryPolicy(retryKafkaOperationInitialInterval)
4✔
180
        policy.SetMaximumInterval(retryKafkaOperationMaxInterval)
4✔
181
        policy.SetMaximumAttempts(retryKafkaOperationMaxAttempts)
4✔
182

4✔
183
        return policy
4✔
184
}
4✔
185

186
// CreateTaskProcessingRetryPolicy creates a retry policy for task processing
187
func CreateTaskProcessingRetryPolicy() backoff.RetryPolicy {
19✔
188
        policy := backoff.NewExponentialRetryPolicy(retryTaskProcessingInitialInterval)
19✔
189
        policy.SetMaximumInterval(retryTaskProcessingMaxInterval)
19✔
190
        policy.SetMaximumAttempts(retryTaskProcessingMaxAttempts)
19✔
191

19✔
192
        return policy
19✔
193
}
19✔
194

195
// CreateReplicationServiceBusyRetryPolicy creates a retry policy to handle replication service busy
196
func CreateReplicationServiceBusyRetryPolicy() backoff.RetryPolicy {
100✔
197
        policy := backoff.NewExponentialRetryPolicy(replicationServiceBusyInitialInterval)
100✔
198
        policy.SetMaximumInterval(replicationServiceBusyMaxInterval)
100✔
199
        policy.SetExpirationInterval(replicationServiceBusyExpirationInterval)
100✔
200

100✔
201
        return policy
100✔
202
}
100✔
203

204
// IsValidIDLength checks if id is valid according to its length
205
func IsValidIDLength(
206
        id string,
207
        scope metrics.Scope,
208
        warnLimit int,
209
        errorLimit int,
210
        metricsCounter int,
211
        domainName string,
212
        logger log.Logger,
213
        idTypeViolationTag tag.Tag,
214
) bool {
16,482✔
215
        if len(id) > warnLimit {
16,484✔
216
                scope.IncCounter(metricsCounter)
2✔
217
                logger.Warn("ID length exceeds limit.",
2✔
218
                        tag.WorkflowDomainName(domainName),
2✔
219
                        tag.Name(id),
2✔
220
                        idTypeViolationTag)
2✔
221
        }
2✔
222
        return len(id) <= errorLimit
16,482✔
223
}
224

225
// CheckDecisionResultLimit checks if decision result count exceeds limits.
226
func CheckDecisionResultLimit(
227
        actualSize int,
228
        limit int,
229
        scope metrics.Scope,
230
) error {
927✔
231
        scope.RecordTimer(metrics.DecisionResultCount, time.Duration(actualSize))
927✔
232
        if limit > 0 && actualSize > limit {
927✔
233
                return ErrDecisionResultCountTooLarge
×
234
        }
×
235
        return nil
927✔
236
}
237

238
// ToServiceTransientError converts an error to ServiceTransientError
239
func ToServiceTransientError(err error) error {
25,545✔
240
        if err == nil || IsServiceTransientError(err) {
51,089✔
241
                return err
25,544✔
242
        }
25,544✔
243
        return yarpcerrors.Newf(yarpcerrors.CodeUnavailable, err.Error())
1✔
244
}
245

246
// IsServiceTransientError checks if the error is a transient error.
247
func IsServiceTransientError(err error) bool {
463✔
248

463✔
249
        var (
463✔
250
                typesInternalServiceError    *types.InternalServiceError
463✔
251
                typesServiceBusyError        *types.ServiceBusyError
463✔
252
                typesShardOwnershipLostError *types.ShardOwnershipLostError
463✔
253
                yarpcErrorsStatus            *yarpcerrors.Status
463✔
254
        )
463✔
255

463✔
256
        switch {
463✔
257
        case errors.As(err, &typesInternalServiceError):
3✔
258
                return true
3✔
259
        case errors.As(err, &typesServiceBusyError):
1✔
260
                return true
1✔
261
        case errors.As(err, &typesShardOwnershipLostError):
1✔
262
                return true
1✔
263
        case errors.As(err, &yarpcErrorsStatus):
273✔
264
                // We only selectively retry the following yarpc errors client can safe retry with a backoff
273✔
265
                if yarpcerrors.IsUnavailable(err) ||
273✔
266
                        yarpcerrors.IsUnknown(err) ||
273✔
267
                        yarpcerrors.IsInternal(err) {
278✔
268
                        return true
5✔
269
                }
5✔
270
                return false
268✔
271
        }
272

273
        return false
185✔
274
}
275

276
// IsEntityNotExistsError checks if the error is an entity not exists error.
277
func IsEntityNotExistsError(err error) bool {
2✔
278
        _, ok := err.(*types.EntityNotExistsError)
2✔
279
        return ok
2✔
280
}
2✔
281

282
// IsServiceBusyError checks if the error is a service busy error.
283
func IsServiceBusyError(err error) bool {
6✔
284
        switch err.(type) {
6✔
285
        case *types.ServiceBusyError:
1✔
286
                return true
1✔
287
        }
288
        return false
5✔
289
}
290

291
// IsContextTimeoutError checks if the error is context timeout error
292
func IsContextTimeoutError(err error) bool {
135✔
293
        switch err := err.(type) {
135✔
294
        case *types.InternalServiceError:
1✔
295
                return err.Message == context.DeadlineExceeded.Error()
1✔
296
        }
297
        return err == context.DeadlineExceeded || yarpcerrors.IsDeadlineExceeded(err)
134✔
298
}
299

300
// WorkflowIDToHistoryShard is used to map a workflowID to a shardID
301
func WorkflowIDToHistoryShard(workflowID string, numberOfShards int) int {
13,405✔
302
        hash := farm.Fingerprint32([]byte(workflowID))
13,405✔
303
        return int(hash % uint32(numberOfShards))
13,405✔
304
}
13,405✔
305

306
// DomainIDToHistoryShard is used to map a domainID to a shardID
307
func DomainIDToHistoryShard(domainID string, numberOfShards int) int {
2✔
308
        hash := farm.Fingerprint32([]byte(domainID))
2✔
309
        return int(hash % uint32(numberOfShards))
2✔
310
}
2✔
311

312
// PrettyPrintHistory prints history in human readable format
313
func PrettyPrintHistory(history *types.History, logger log.Logger) {
66✔
314
        data, err := json.MarshalIndent(history, "", "    ")
66✔
315

66✔
316
        if err != nil {
66✔
317
                logger.Error("Error serializing history: %v\n", tag.Error(err))
×
318
        }
×
319

320
        fmt.Println("******************************************")
66✔
321
        fmt.Println("History", tag.DetailInfo(string(data)))
66✔
322
        fmt.Println("******************************************")
66✔
323
}
324

325
// IsValidContext checks that the thrift context is not expired on cancelled.
326
// Returns nil if the context is still valid. Otherwise, returns the result of
327
// ctx.Err()
328
func IsValidContext(ctx context.Context) error {
8,294✔
329
        ch := ctx.Done()
8,294✔
330
        if ch != nil {
16,587✔
331
                select {
8,293✔
332
                case <-ch:
2✔
333
                        return ctx.Err()
2✔
334
                default:
8,291✔
335
                        // go to the next line
336
                }
337
        }
338

339
        deadline, ok := ctx.Deadline()
8,292✔
340
        if ok && time.Until(deadline) < contextExpireThreshold {
8,293✔
341
                return context.DeadlineExceeded
1✔
342
        }
1✔
343
        return nil
8,291✔
344
}
345

346
// emptyCancelFunc wraps an empty func by context.CancelFunc interface
347
var emptyCancelFunc = context.CancelFunc(func() {})
×
348

349
// CreateChildContext creates a child context which shorted context timeout
350
// from the given parent context
351
// tailroom must be in range [0, 1] and
352
// (1-tailroom) * parent timeout will be the new child context timeout
353
// if tailroom is less 0, tailroom will be considered as 0
354
// if tailroom is greater than 1, tailroom wil be considered as 1
355
func CreateChildContext(
356
        parent context.Context,
357
        tailroom float64,
358
) (context.Context, context.CancelFunc) {
9✔
359
        if parent == nil {
10✔
360
                return nil, emptyCancelFunc
1✔
361
        }
1✔
362
        if parent.Err() != nil {
10✔
363
                return parent, emptyCancelFunc
2✔
364
        }
2✔
365

366
        now := time.Now()
6✔
367
        deadline, ok := parent.Deadline()
6✔
368
        if !ok || deadline.Before(now) {
7✔
369
                return parent, emptyCancelFunc
1✔
370
        }
1✔
371

372
        // if tailroom is about or less 0, then return a context with the same deadline as parent
373
        if tailroom <= 0 {
7✔
374
                return context.WithDeadline(parent, deadline)
2✔
375
        }
2✔
376
        // if tailroom is about or greater 1, then return a context with deadline of now
377
        if tailroom >= 1 {
5✔
378
                return context.WithDeadline(parent, now)
2✔
379
        }
2✔
380

381
        newDeadline := now.Add(time.Duration(math.Ceil(float64(deadline.Sub(now)) * (1.0 - tailroom))))
1✔
382
        return context.WithDeadline(parent, newDeadline)
1✔
383
}
384

385
// GenerateRandomString is used for generate test string
386
func GenerateRandomString(n int) string {
6✔
387
        if n <= 0 {
8✔
388
                return ""
2✔
389
        }
2✔
390

391
        letterRunes := []rune("random")
4✔
392
        b := make([]rune, n)
4✔
393
        for i := range b {
17✔
394
                b[i] = letterRunes[rand.Intn(len(letterRunes))]
13✔
395
        }
13✔
396
        return string(b)
4✔
397
}
398

399
// CreateMatchingPollForDecisionTaskResponse create response for matching's PollForDecisionTask
400
func CreateMatchingPollForDecisionTaskResponse(historyResponse *types.RecordDecisionTaskStartedResponse, workflowExecution *types.WorkflowExecution, token []byte) *types.MatchingPollForDecisionTaskResponse {
1,161✔
401
        matchingResp := &types.MatchingPollForDecisionTaskResponse{
1,161✔
402
                WorkflowExecution:         workflowExecution,
1,161✔
403
                TaskToken:                 token,
1,161✔
404
                Attempt:                   historyResponse.GetAttempt(),
1,161✔
405
                WorkflowType:              historyResponse.WorkflowType,
1,161✔
406
                StartedEventID:            historyResponse.StartedEventID,
1,161✔
407
                StickyExecutionEnabled:    historyResponse.StickyExecutionEnabled,
1,161✔
408
                NextEventID:               historyResponse.NextEventID,
1,161✔
409
                DecisionInfo:              historyResponse.DecisionInfo,
1,161✔
410
                WorkflowExecutionTaskList: historyResponse.WorkflowExecutionTaskList,
1,161✔
411
                BranchToken:               historyResponse.BranchToken,
1,161✔
412
                ScheduledTimestamp:        historyResponse.ScheduledTimestamp,
1,161✔
413
                StartedTimestamp:          historyResponse.StartedTimestamp,
1,161✔
414
                Queries:                   historyResponse.Queries,
1,161✔
415
                TotalHistoryBytes:         historyResponse.HistorySize,
1,161✔
416
        }
1,161✔
417
        if historyResponse.GetPreviousStartedEventID() != EmptyEventID {
2,322✔
418
                matchingResp.PreviousStartedEventID = historyResponse.PreviousStartedEventID
1,161✔
419
        }
1,161✔
420
        return matchingResp
1,161✔
421
}
422

423
// MinInt64 returns the smaller of two given int64
424
func MinInt64(a, b int64) int64 {
14,071✔
425
        if a < b {
18,420✔
426
                return a
4,349✔
427
        }
4,349✔
428
        return b
9,725✔
429
}
430

431
// MaxInt64 returns the greater of two given int64
432
func MaxInt64(a, b int64) int64 {
1,076✔
433
        if a > b {
1,076✔
UNCOV
434
                return a
×
UNCOV
435
        }
×
436
        return b
1,076✔
437
}
438

439
// MinInt32 return smaller one of two inputs int32
440
func MinInt32(a, b int32) int32 {
2,706✔
441
        if a < b {
5,367✔
442
                return a
2,661✔
443
        }
2,661✔
444
        return b
45✔
445
}
446

447
// MinInt returns the smaller of two given integers
448
func MinInt(a, b int) int {
×
449
        if a < b {
×
450
                return a
×
451
        }
×
452
        return b
×
453
}
454

455
// MaxInt returns the greater one of two given integers
456
func MaxInt(a, b int) int {
16,679✔
457
        if a > b {
16,679✔
458
                return a
×
459
        }
×
460
        return b
16,679✔
461
}
462

463
// MinDuration returns the smaller of two given time duration
464
func MinDuration(a, b time.Duration) time.Duration {
66✔
465
        if a < b {
81✔
466
                return a
15✔
467
        }
15✔
468
        return b
51✔
469
}
470

471
// MaxDuration returns the greater of two given time durations
472
func MaxDuration(a, b time.Duration) time.Duration {
15✔
473
        if a > b {
15✔
474
                return a
×
475
        }
×
476
        return b
15✔
477
}
478

479
// SortInt64Slice sorts the given int64 slice.
480
// Sort is not guaranteed to be stable.
481
func SortInt64Slice(slice []int64) {
×
482
        sort.Slice(slice, func(i int, j int) bool {
×
483
                return slice[i] < slice[j]
×
484
        })
×
485
}
486

487
// ValidateRetryPolicy validates a retry policy
488
func ValidateRetryPolicy(policy *types.RetryPolicy) error {
1,387✔
489
        if policy == nil {
2,714✔
490
                // nil policy is valid which means no retry
1,327✔
491
                return nil
1,327✔
492
        }
1,327✔
493
        if policy.GetInitialIntervalInSeconds() <= 0 {
62✔
494
                return &types.BadRequestError{Message: "InitialIntervalInSeconds must be greater than 0 on retry policy."}
2✔
495
        }
2✔
496
        if policy.GetBackoffCoefficient() < 1 {
59✔
497
                return &types.BadRequestError{Message: "BackoffCoefficient cannot be less than 1 on retry policy."}
1✔
498
        }
1✔
499
        if policy.GetMaximumIntervalInSeconds() < 0 {
58✔
500
                return &types.BadRequestError{Message: "MaximumIntervalInSeconds cannot be less than 0 on retry policy."}
1✔
501
        }
1✔
502
        if policy.GetMaximumIntervalInSeconds() > 0 && policy.GetMaximumIntervalInSeconds() < policy.GetInitialIntervalInSeconds() {
57✔
503
                return &types.BadRequestError{Message: "MaximumIntervalInSeconds cannot be less than InitialIntervalInSeconds on retry policy."}
1✔
504
        }
1✔
505
        if policy.GetMaximumAttempts() < 0 {
56✔
506
                return &types.BadRequestError{Message: "MaximumAttempts cannot be less than 0 on retry policy."}
1✔
507
        }
1✔
508
        if policy.GetExpirationIntervalInSeconds() < 0 {
55✔
509
                return &types.BadRequestError{Message: "ExpirationIntervalInSeconds cannot be less than 0 on retry policy."}
1✔
510
        }
1✔
511
        if policy.GetMaximumAttempts() == 0 && policy.GetExpirationIntervalInSeconds() == 0 {
54✔
512
                return &types.BadRequestError{Message: "MaximumAttempts and ExpirationIntervalInSeconds are both 0. At least one of them must be specified."}
1✔
513
        }
1✔
514
        return nil
52✔
515
}
516

517
// CreateHistoryStartWorkflowRequest create a start workflow request for history
518
func CreateHistoryStartWorkflowRequest(
519
        domainID string,
520
        startRequest *types.StartWorkflowExecutionRequest,
521
        now time.Time,
522
        partitionConfig map[string]string,
523
) (*types.HistoryStartWorkflowExecutionRequest, error) {
583✔
524
        histRequest := &types.HistoryStartWorkflowExecutionRequest{
583✔
525
                DomainUUID:      domainID,
583✔
526
                StartRequest:    startRequest,
583✔
527
                PartitionConfig: partitionConfig,
583✔
528
        }
583✔
529

583✔
530
        delayStartSeconds := startRequest.GetDelayStartSeconds()
583✔
531
        jitterStartSeconds := startRequest.GetJitterStartSeconds()
583✔
532
        firstDecisionTaskBackoffSeconds := delayStartSeconds
583✔
533
        if len(startRequest.GetCronSchedule()) > 0 {
650✔
534
                delayedStartTime := now.Add(time.Second * time.Duration(delayStartSeconds))
67✔
535
                var err error
67✔
536
                firstDecisionTaskBackoffSeconds, err = backoff.GetBackoffForNextScheduleInSeconds(
67✔
537
                        startRequest.GetCronSchedule(), delayedStartTime, delayedStartTime, jitterStartSeconds)
67✔
538
                if err != nil {
67✔
539
                        return nil, err
×
540
                }
×
541

542
                // backoff seconds was calculated based on delayed start time, so we need to
543
                // add the delayStartSeconds to that backoff.
544
                firstDecisionTaskBackoffSeconds += delayStartSeconds
67✔
545
        } else if jitterStartSeconds > 0 {
538✔
546
                // Add a random jitter to start time, if requested.
22✔
547
                firstDecisionTaskBackoffSeconds += rand.Int31n(jitterStartSeconds + 1)
22✔
548
        }
22✔
549

550
        histRequest.FirstDecisionTaskBackoffSeconds = Int32Ptr(firstDecisionTaskBackoffSeconds)
583✔
551

583✔
552
        if startRequest.RetryPolicy != nil && startRequest.RetryPolicy.GetExpirationIntervalInSeconds() > 0 {
612✔
553
                expirationInSeconds := startRequest.RetryPolicy.GetExpirationIntervalInSeconds() + firstDecisionTaskBackoffSeconds
29✔
554
                // expirationTime calculates from first decision task schedule to the end of the workflow
29✔
555
                deadline := now.Add(time.Duration(expirationInSeconds) * time.Second)
29✔
556
                histRequest.ExpirationTimestamp = Int64Ptr(deadline.Round(time.Millisecond).UnixNano())
29✔
557
        }
29✔
558

559
        return histRequest, nil
583✔
560
}
561

562
// CheckEventBlobSizeLimit checks if a blob data exceeds limits. It logs a warning if it exceeds warnLimit,
563
// and return ErrBlobSizeExceedsLimit if it exceeds errorLimit.
564
func CheckEventBlobSizeLimit(
565
        actualSize int,
566
        warnLimit int,
567
        errorLimit int,
568
        domainID string,
569
        workflowID string,
570
        runID string,
571
        scope metrics.Scope,
572
        logger log.Logger,
573
        blobSizeViolationOperationTag tag.Tag,
574
) error {
2,967✔
575

2,967✔
576
        scope.RecordTimer(metrics.EventBlobSize, time.Duration(actualSize))
2,967✔
577

2,967✔
578
        if actualSize > warnLimit {
2,967✔
579
                if logger != nil {
×
580
                        logger.Warn("Blob size exceeds limit.",
×
581
                                tag.WorkflowDomainID(domainID),
×
582
                                tag.WorkflowID(workflowID),
×
583
                                tag.WorkflowRunID(runID),
×
584
                                tag.WorkflowSize(int64(actualSize)),
×
585
                                blobSizeViolationOperationTag)
×
586
                }
×
587

588
                if actualSize > errorLimit {
×
589
                        return ErrBlobSizeExceedsLimit
×
590
                }
×
591
        }
592
        return nil
2,967✔
593
}
594

595
// ValidateLongPollContextTimeout check if the context timeout for a long poll handler is too short or below a normal value.
596
// If the timeout is not set or too short, it logs an error, and return ErrContextTimeoutNotSet or ErrContextTimeoutTooShort
597
// accordingly. If the timeout is only below a normal value, it just logs an info and return nil.
598
func ValidateLongPollContextTimeout(
599
        ctx context.Context,
600
        handlerName string,
601
        logger log.Logger,
602
) error {
3,847✔
603

3,847✔
604
        deadline, err := ValidateLongPollContextTimeoutIsSet(ctx, handlerName, logger)
3,847✔
605
        if err != nil {
3,848✔
606
                return err
1✔
607
        }
1✔
608
        timeout := time.Until(deadline)
3,846✔
609
        if timeout < MinLongPollTimeout {
3,847✔
610
                err := ErrContextTimeoutTooShort
1✔
611
                logger.Error("Context timeout is too short for long poll API.",
1✔
612
                        tag.WorkflowHandlerName(handlerName), tag.Error(err), tag.WorkflowPollContextTimeout(timeout))
1✔
613
                return err
1✔
614
        }
1✔
615
        if timeout < CriticalLongPollTimeout {
3,846✔
616
                logger.Warn("Context timeout is lower than critical value for long poll API.",
1✔
617
                        tag.WorkflowHandlerName(handlerName), tag.WorkflowPollContextTimeout(timeout))
1✔
618
        }
1✔
619
        return nil
3,845✔
620
}
621

622
// ValidateLongPollContextTimeoutIsSet checks if the context timeout is set for long poll requests.
623
func ValidateLongPollContextTimeoutIsSet(
624
        ctx context.Context,
625
        handlerName string,
626
        logger log.Logger,
627
) (time.Time, error) {
6,550✔
628

6,550✔
629
        deadline, ok := ctx.Deadline()
6,550✔
630
        if !ok {
6,551✔
631
                err := ErrContextTimeoutNotSet
1✔
632
                logger.Error("Context timeout not set for long poll API.",
1✔
633
                        tag.WorkflowHandlerName(handlerName), tag.Error(err))
1✔
634
                return deadline, err
1✔
635
        }
1✔
636
        return deadline, nil
6,549✔
637
}
638

639
// ValidateDomainUUID checks if the given domainID string is a valid UUID
640
func ValidateDomainUUID(
641
        domainUUID string,
642
) error {
5,487✔
643

5,487✔
644
        if domainUUID == "" {
5,488✔
645
                return &types.BadRequestError{Message: "Missing domain UUID."}
1✔
646
        } else if uuid.Parse(domainUUID) == nil {
5,488✔
647
                return &types.BadRequestError{Message: "Invalid domain UUID."}
1✔
648
        }
1✔
649
        return nil
5,485✔
650
}
651

652
// GetSizeOfMapStringToByteArray get size of map[string][]byte
653
func GetSizeOfMapStringToByteArray(input map[string][]byte) int {
58✔
654
        if input == nil {
88✔
655
                return 0
30✔
656
        }
30✔
657

658
        res := 0
28✔
659
        for k, v := range input {
69✔
660
                res += len(k) + len(v)
41✔
661
        }
41✔
662
        return res + golandMapReserverNumberOfBytes
28✔
663
}
664

665
// GetSizeOfHistoryEvent returns approximate size in bytes of the history event taking into account byte arrays only now
666
func GetSizeOfHistoryEvent(event *types.HistoryEvent) uint64 {
44✔
667
        if event == nil || event.EventType == nil {
46✔
668
                return 0
2✔
669
        }
2✔
670

671
        res := 0
42✔
672
        switch *event.EventType {
42✔
673
        case types.EventTypeWorkflowExecutionStarted:
1✔
674
                res += len(event.WorkflowExecutionStartedEventAttributes.Input)
1✔
675
                res += len(event.WorkflowExecutionStartedEventAttributes.ContinuedFailureDetails)
1✔
676
                res += len(event.WorkflowExecutionStartedEventAttributes.LastCompletionResult)
1✔
677
                if event.WorkflowExecutionStartedEventAttributes.Memo != nil {
2✔
678
                        res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionStartedEventAttributes.Memo.Fields)
1✔
679
                }
1✔
680
                if event.WorkflowExecutionStartedEventAttributes.Header != nil {
2✔
681
                        res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionStartedEventAttributes.Header.Fields)
1✔
682
                }
1✔
683
                if event.WorkflowExecutionStartedEventAttributes.SearchAttributes != nil {
2✔
684
                        res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionStartedEventAttributes.SearchAttributes.IndexedFields)
1✔
685
                }
1✔
686
        case types.EventTypeWorkflowExecutionCompleted:
1✔
687
                res += len(event.WorkflowExecutionCompletedEventAttributes.Result)
1✔
688
        case types.EventTypeWorkflowExecutionFailed:
1✔
689
                res += len(event.WorkflowExecutionFailedEventAttributes.Details)
1✔
690
        case types.EventTypeWorkflowExecutionTimedOut:
1✔
691
        case types.EventTypeDecisionTaskScheduled:
1✔
692
        case types.EventTypeDecisionTaskStarted:
1✔
693
        case types.EventTypeDecisionTaskCompleted:
1✔
694
                res += len(event.DecisionTaskCompletedEventAttributes.ExecutionContext)
1✔
695
        case types.EventTypeDecisionTaskTimedOut:
1✔
696
        case types.EventTypeDecisionTaskFailed:
1✔
697
                res += len(event.DecisionTaskFailedEventAttributes.Details)
1✔
698
        case types.EventTypeActivityTaskScheduled:
1✔
699
                res += len(event.ActivityTaskScheduledEventAttributes.Input)
1✔
700
                if event.ActivityTaskScheduledEventAttributes.Header != nil {
2✔
701
                        res += GetSizeOfMapStringToByteArray(event.ActivityTaskScheduledEventAttributes.Header.Fields)
1✔
702
                }
1✔
703
        case types.EventTypeActivityTaskStarted:
1✔
704
                res += len(event.ActivityTaskStartedEventAttributes.LastFailureDetails)
1✔
705
        case types.EventTypeActivityTaskCompleted:
1✔
706
                res += len(event.ActivityTaskCompletedEventAttributes.Result)
1✔
707
        case types.EventTypeActivityTaskFailed:
1✔
708
                res += len(event.ActivityTaskFailedEventAttributes.Details)
1✔
709
        case types.EventTypeActivityTaskTimedOut:
1✔
710
                res += len(event.ActivityTaskTimedOutEventAttributes.Details)
1✔
711
                res += len(event.ActivityTaskTimedOutEventAttributes.LastFailureDetails)
1✔
712
        case types.EventTypeActivityTaskCancelRequested:
1✔
713
        case types.EventTypeRequestCancelActivityTaskFailed:
1✔
714
        case types.EventTypeActivityTaskCanceled:
1✔
715
                res += len(event.ActivityTaskCanceledEventAttributes.Details)
1✔
716
        case types.EventTypeTimerStarted:
1✔
717
        case types.EventTypeTimerFired:
1✔
718
        case types.EventTypeCancelTimerFailed:
1✔
719
        case types.EventTypeTimerCanceled:
1✔
720
        case types.EventTypeWorkflowExecutionCancelRequested:
1✔
721
        case types.EventTypeWorkflowExecutionCanceled:
1✔
722
                res += len(event.WorkflowExecutionCanceledEventAttributes.Details)
1✔
723
        case types.EventTypeRequestCancelExternalWorkflowExecutionInitiated:
1✔
724
                res += len(event.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.Control)
1✔
725
        case types.EventTypeRequestCancelExternalWorkflowExecutionFailed:
1✔
726
                res += len(event.RequestCancelExternalWorkflowExecutionFailedEventAttributes.Control)
1✔
727
        case types.EventTypeExternalWorkflowExecutionCancelRequested:
1✔
728
        case types.EventTypeMarkerRecorded:
1✔
729
                res += len(event.MarkerRecordedEventAttributes.Details)
1✔
730
        case types.EventTypeWorkflowExecutionSignaled:
1✔
731
                res += len(event.WorkflowExecutionSignaledEventAttributes.Input)
1✔
732
        case types.EventTypeWorkflowExecutionTerminated:
1✔
733
                res += len(event.WorkflowExecutionTerminatedEventAttributes.Details)
1✔
734
        case types.EventTypeWorkflowExecutionContinuedAsNew:
1✔
735
                res += len(event.WorkflowExecutionContinuedAsNewEventAttributes.Input)
1✔
736
                if event.WorkflowExecutionContinuedAsNewEventAttributes.Memo != nil {
2✔
737
                        res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionContinuedAsNewEventAttributes.Memo.Fields)
1✔
738
                }
1✔
739
                if event.WorkflowExecutionContinuedAsNewEventAttributes.Header != nil {
2✔
740
                        res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionContinuedAsNewEventAttributes.Header.Fields)
1✔
741
                }
1✔
742
                if event.WorkflowExecutionContinuedAsNewEventAttributes.SearchAttributes != nil {
2✔
743
                        res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionContinuedAsNewEventAttributes.SearchAttributes.IndexedFields)
1✔
744
                }
1✔
745
        case types.EventTypeStartChildWorkflowExecutionInitiated:
1✔
746
                res += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Input)
1✔
747
                res += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Control)
1✔
748
                if event.StartChildWorkflowExecutionInitiatedEventAttributes.Memo != nil {
2✔
749
                        res += GetSizeOfMapStringToByteArray(event.StartChildWorkflowExecutionInitiatedEventAttributes.Memo.Fields)
1✔
750
                }
1✔
751
                if event.StartChildWorkflowExecutionInitiatedEventAttributes.Header != nil {
2✔
752
                        res += GetSizeOfMapStringToByteArray(event.StartChildWorkflowExecutionInitiatedEventAttributes.Header.Fields)
1✔
753
                }
1✔
754
                if event.StartChildWorkflowExecutionInitiatedEventAttributes.SearchAttributes != nil {
2✔
755
                        res += GetSizeOfMapStringToByteArray(event.StartChildWorkflowExecutionInitiatedEventAttributes.SearchAttributes.IndexedFields)
1✔
756
                }
1✔
757
        case types.EventTypeStartChildWorkflowExecutionFailed:
1✔
758
                res += len(event.StartChildWorkflowExecutionFailedEventAttributes.Control)
1✔
759
        case types.EventTypeChildWorkflowExecutionStarted:
1✔
760
                if event.ChildWorkflowExecutionStartedEventAttributes != nil && event.ChildWorkflowExecutionStartedEventAttributes.Header != nil {
2✔
761
                        res += GetSizeOfMapStringToByteArray(event.ChildWorkflowExecutionStartedEventAttributes.Header.Fields)
1✔
762
                }
1✔
763
        case types.EventTypeChildWorkflowExecutionCompleted:
1✔
764
                res += len(event.ChildWorkflowExecutionCompletedEventAttributes.Result)
1✔
765
        case types.EventTypeChildWorkflowExecutionFailed:
1✔
766
                res += len(event.ChildWorkflowExecutionFailedEventAttributes.Details)
1✔
767
        case types.EventTypeChildWorkflowExecutionCanceled:
1✔
768
                res += len(event.ChildWorkflowExecutionCanceledEventAttributes.Details)
1✔
769
        case types.EventTypeChildWorkflowExecutionTimedOut:
1✔
770
        case types.EventTypeChildWorkflowExecutionTerminated:
1✔
771
        case types.EventTypeSignalExternalWorkflowExecutionInitiated:
1✔
772
                res += len(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Input)
1✔
773
                res += len(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Control)
1✔
774
        case types.EventTypeSignalExternalWorkflowExecutionFailed:
1✔
775
                res += len(event.SignalExternalWorkflowExecutionFailedEventAttributes.Control)
1✔
776
        case types.EventTypeExternalWorkflowExecutionSignaled:
1✔
777
                res += len(event.ExternalWorkflowExecutionSignaledEventAttributes.Control)
1✔
778
        case types.EventTypeUpsertWorkflowSearchAttributes:
1✔
779
                if event.UpsertWorkflowSearchAttributesEventAttributes.SearchAttributes != nil {
2✔
780
                        res += GetSizeOfMapStringToByteArray(event.UpsertWorkflowSearchAttributesEventAttributes.SearchAttributes.IndexedFields)
1✔
781
                }
1✔
782
        }
783
        return uint64(res)
42✔
784
}
785

786
// IsJustOrderByClause return true is query start with order by
787
func IsJustOrderByClause(clause string) bool {
336✔
788
        whereClause := strings.TrimSpace(clause)
336✔
789
        whereClause = strings.ToLower(whereClause)
336✔
790
        return strings.HasPrefix(whereClause, "order by")
336✔
791
}
336✔
792

793
// ConvertIndexedValueTypeToInternalType takes fieldType as interface{} and convert to IndexedValueType.
794
// Because different implementation of dynamic config client may lead to different types
795
func ConvertIndexedValueTypeToInternalType(fieldType interface{}, logger log.Logger) types.IndexedValueType {
201✔
796
        switch t := fieldType.(type) {
201✔
797
        case float64:
6✔
798
                return types.IndexedValueType(t)
6✔
799
        case int:
6✔
800
                return types.IndexedValueType(t)
6✔
801
        case types.IndexedValueType:
177✔
802
                return t
177✔
803
        case []byte:
6✔
804
                var result types.IndexedValueType
6✔
805
                if err := result.UnmarshalText(t); err != nil {
6✔
806
                        logger.Error("unknown index value type", tag.Value(fieldType), tag.ValueType(t), tag.Error(err))
×
807
                        return fieldType.(types.IndexedValueType) // it will panic and been captured by logger
×
808
                }
×
809
                return result
6✔
810
        case string:
6✔
811
                var result types.IndexedValueType
6✔
812
                if err := result.UnmarshalText([]byte(t)); err != nil {
6✔
813
                        logger.Error("unknown index value type", tag.Value(fieldType), tag.ValueType(t), tag.Error(err))
×
814
                        return fieldType.(types.IndexedValueType) // it will panic and been captured by logger
×
815
                }
×
816
                return result
6✔
817
        default:
×
818
                // Unknown fieldType, please make sure dynamic config return correct value type
×
819
                logger.Error("unknown index value type", tag.Value(fieldType), tag.ValueType(t))
×
820
                return fieldType.(types.IndexedValueType) // it will panic and been captured by logger
×
821
        }
822
}
823

824
// DeserializeSearchAttributeValue takes json encoded search attribute value and it's type as input, then
825
// unmarshal the value into a concrete type and return the value
826
func DeserializeSearchAttributeValue(value []byte, valueType types.IndexedValueType) (interface{}, error) {
148✔
827
        switch valueType {
148✔
828
        case types.IndexedValueTypeString, types.IndexedValueTypeKeyword:
84✔
829
                var val string
84✔
830
                if err := json.Unmarshal(value, &val); err != nil {
94✔
831
                        var listVal []string
10✔
832
                        err = json.Unmarshal(value, &listVal)
10✔
833
                        return listVal, err
10✔
834
                }
10✔
835
                return val, nil
74✔
836
        case types.IndexedValueTypeInt:
30✔
837
                var val int64
30✔
838
                if err := json.Unmarshal(value, &val); err != nil {
32✔
839
                        var listVal []int64
2✔
840
                        err = json.Unmarshal(value, &listVal)
2✔
841
                        return listVal, err
2✔
842
                }
2✔
843
                return val, nil
28✔
844
        case types.IndexedValueTypeDouble:
15✔
845
                var val float64
15✔
846
                if err := json.Unmarshal(value, &val); err != nil {
17✔
847
                        var listVal []float64
2✔
848
                        err = json.Unmarshal(value, &listVal)
2✔
849
                        return listVal, err
2✔
850
                }
2✔
851
                return val, nil
13✔
852
        case types.IndexedValueTypeBool:
3✔
853
                var val bool
3✔
854
                if err := json.Unmarshal(value, &val); err != nil {
5✔
855
                        var listVal []bool
2✔
856
                        err = json.Unmarshal(value, &listVal)
2✔
857
                        return listVal, err
2✔
858
                }
2✔
859
                return val, nil
1✔
860
        case types.IndexedValueTypeDatetime:
15✔
861
                var val time.Time
15✔
862
                if err := json.Unmarshal(value, &val); err != nil {
17✔
863
                        var listVal []time.Time
2✔
864
                        err = json.Unmarshal(value, &listVal)
2✔
865
                        return listVal, err
2✔
866
                }
2✔
867
                return val, nil
13✔
868
        default:
1✔
869
                return nil, fmt.Errorf("error: unknown index value type [%v]", valueType)
1✔
870
        }
871
}
872

873
// IsAdvancedVisibilityWritingEnabled returns true if we should write to advanced visibility
874
func IsAdvancedVisibilityWritingEnabled(advancedVisibilityWritingMode string, isAdvancedVisConfigExist bool) bool {
57✔
875
        return advancedVisibilityWritingMode != AdvancedVisibilityWritingModeOff && isAdvancedVisConfigExist
57✔
876
}
57✔
877

878
// IsAdvancedVisibilityReadingEnabled returns true if we should read from advanced visibility
879
func IsAdvancedVisibilityReadingEnabled(isAdvancedVisReadEnabled, isAdvancedVisConfigExist bool) bool {
291✔
880
        return isAdvancedVisReadEnabled && isAdvancedVisConfigExist
291✔
881
}
291✔
882

883
// ConvertIntMapToDynamicConfigMapProperty converts a map whose key value type are both int to
884
// a map value that is compatible with dynamic config's map property
885
func ConvertIntMapToDynamicConfigMapProperty(
886
        intMap map[int]int,
887
) map[string]interface{} {
12✔
888
        dcValue := make(map[string]interface{})
12✔
889
        for key, value := range intMap {
36✔
890
                dcValue[strconv.Itoa(key)] = value
24✔
891
        }
24✔
892
        return dcValue
12✔
893
}
894

895
// ConvertDynamicConfigMapPropertyToIntMap convert a map property from dynamic config to a map
896
// whose type for both key and value are int
897
func ConvertDynamicConfigMapPropertyToIntMap(dcValue map[string]interface{}) (map[int]int, error) {
344✔
898
        intMap := make(map[int]int)
344✔
899
        for key, value := range dcValue {
1,371✔
900
                intKey, err := strconv.Atoi(strings.TrimSpace(key))
1,027✔
901
                if err != nil {
1,027✔
902
                        return nil, fmt.Errorf("failed to convert key %v, error: %v", key, err)
×
903
                }
×
904

905
                var intValue int
1,027✔
906
                switch value := value.(type) {
1,027✔
907
                case float64:
1✔
908
                        intValue = int(value)
1✔
909
                case int:
1,024✔
910
                        intValue = value
1,024✔
911
                case int32:
1✔
912
                        intValue = int(value)
1✔
913
                case int64:
1✔
914
                        intValue = int(value)
1✔
915
                default:
×
916
                        return nil, fmt.Errorf("unknown value %v with type %T", value, value)
×
917
                }
918
                intMap[intKey] = intValue
1,027✔
919
        }
920
        return intMap, nil
344✔
921
}
922

923
// IsStickyTaskConditionError is error from matching engine
924
func IsStickyTaskConditionError(err error) bool {
×
925
        if e, ok := err.(*types.InternalServiceError); ok {
×
926
                return e.GetMessage() == StickyTaskConditionFailedErrorMsg
×
927
        }
×
928
        return false
×
929
}
930

931
// DurationToDays converts time.Duration to number of 24 hour days
932
func DurationToDays(d time.Duration) int32 {
3,834✔
933
        return int32(d / (24 * time.Hour))
3,834✔
934
}
3,834✔
935

936
// DurationToSeconds converts time.Duration to number of seconds
937
func DurationToSeconds(d time.Duration) int64 {
51,486✔
938
        return int64(d / time.Second)
51,486✔
939
}
51,486✔
940

941
// DaysToDuration converts number of 24 hour days to time.Duration
942
func DaysToDuration(d int32) time.Duration {
3,949✔
943
        return time.Duration(d) * (24 * time.Hour)
3,949✔
944
}
3,949✔
945

946
// SecondsToDuration converts number of seconds to time.Duration
947
func SecondsToDuration(d int64) time.Duration {
76,199✔
948
        return time.Duration(d) * time.Second
76,199✔
949
}
76,199✔
950

951
// SleepWithMinDuration sleeps for the minimum of desired and available duration
952
// returns the remaining available time duration
953
func SleepWithMinDuration(desired time.Duration, available time.Duration) time.Duration {
39✔
954
        d := MinDuration(desired, available)
39✔
955
        if d > 0 {
39✔
956
                time.Sleep(d)
×
957
        }
×
958
        return available - d
39✔
959
}
960

961
// ConvertErrToGetTaskFailedCause converts error to GetTaskFailedCause
962
func ConvertErrToGetTaskFailedCause(err error) types.GetTaskFailedCause {
4✔
963
        if IsContextTimeoutError(err) {
5✔
964
                return types.GetTaskFailedCauseTimeout
1✔
965
        }
1✔
966
        if IsServiceBusyError(err) {
4✔
967
                return types.GetTaskFailedCauseServiceBusy
1✔
968
        }
1✔
969
        if _, ok := err.(*types.ShardOwnershipLostError); ok {
3✔
970
                return types.GetTaskFailedCauseShardOwnershipLost
1✔
971
        }
1✔
972
        return types.GetTaskFailedCauseUncategorized
1✔
973
}
974

975
// ConvertGetTaskFailedCauseToErr converts GetTaskFailedCause to error
976
func ConvertGetTaskFailedCauseToErr(failedCause types.GetTaskFailedCause) error {
4✔
977
        switch failedCause {
4✔
978
        case types.GetTaskFailedCauseServiceBusy:
1✔
979
                return &types.ServiceBusyError{}
1✔
980
        case types.GetTaskFailedCauseTimeout:
1✔
981
                return context.DeadlineExceeded
1✔
982
        case types.GetTaskFailedCauseShardOwnershipLost:
1✔
983
                return &types.ShardOwnershipLostError{}
1✔
984
        default:
1✔
985
                return &types.InternalServiceError{Message: "uncategorized error"}
1✔
986
        }
987
}
988

989
// GetTaskPriority returns priority given a task's priority class and subclass
990
func GetTaskPriority(
991
        class int,
992
        subClass int,
993
) int {
21✔
994
        return class | subClass
21✔
995
}
21✔
996

997
// IntersectionStringSlice get the intersection of 2 string slices
998
func IntersectionStringSlice(a, b []string) []string {
3✔
999
        var result []string
3✔
1000
        m := make(map[string]struct{})
3✔
1001
        for _, item := range a {
12✔
1002
                m[item] = struct{}{}
9✔
1003
        }
9✔
1004
        for _, item := range b {
12✔
1005
                if _, ok := m[item]; ok {
14✔
1006
                        result = append(result, item)
5✔
1007
                }
5✔
1008
        }
1009
        return result
3✔
1010
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc