• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018ef47c-7ef1-4ce1-b4c9-3fa25bfb5bee

19 Apr 2024 03:53AM UTC coverage: 67.594% (+0.02%) from 67.579%
018ef47c-7ef1-4ce1-b4c9-3fa25bfb5bee

push

buildkite

web-flow
Add document explaining the schema of cassandra executions table (#5921)

98984 of 146438 relevant lines covered (67.59%)

2380.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.78
/common/util.go
1
// Copyright (c) 2017 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package common
22

23
import (
24
        "context"
25
        "encoding/json"
26
        "errors"
27
        "fmt"
28
        "math"
29
        "math/rand"
30
        "sort"
31
        "strconv"
32
        "strings"
33
        "sync"
34
        "time"
35

36
        "github.com/dgryski/go-farm"
37
        "github.com/pborman/uuid"
38
        "go.uber.org/yarpc/yarpcerrors"
39

40
        "github.com/uber/cadence/common/backoff"
41
        "github.com/uber/cadence/common/log"
42
        "github.com/uber/cadence/common/log/tag"
43
        "github.com/uber/cadence/common/metrics"
44
        "github.com/uber/cadence/common/types"
45
)
46

47
const (
48
        golandMapReserverNumberOfBytes = 48
49

50
        retryPersistenceOperationInitialInterval    = 50 * time.Millisecond
51
        retryPersistenceOperationMaxInterval        = 10 * time.Second
52
        retryPersistenceOperationExpirationInterval = 30 * time.Second
53

54
        historyServiceOperationInitialInterval    = 50 * time.Millisecond
55
        historyServiceOperationMaxInterval        = 10 * time.Second
56
        historyServiceOperationExpirationInterval = 30 * time.Second
57

58
        matchingServiceOperationInitialInterval    = 1000 * time.Millisecond
59
        matchingServiceOperationMaxInterval        = 10 * time.Second
60
        matchingServiceOperationExpirationInterval = 30 * time.Second
61

62
        frontendServiceOperationInitialInterval    = 200 * time.Millisecond
63
        frontendServiceOperationMaxInterval        = 5 * time.Second
64
        frontendServiceOperationExpirationInterval = 15 * time.Second
65

66
        adminServiceOperationInitialInterval    = 200 * time.Millisecond
67
        adminServiceOperationMaxInterval        = 5 * time.Second
68
        adminServiceOperationExpirationInterval = 15 * time.Second
69

70
        retryKafkaOperationInitialInterval = 50 * time.Millisecond
71
        retryKafkaOperationMaxInterval     = 10 * time.Second
72
        retryKafkaOperationMaxAttempts     = 10
73

74
        retryTaskProcessingInitialInterval = 50 * time.Millisecond
75
        retryTaskProcessingMaxInterval     = 100 * time.Millisecond
76
        retryTaskProcessingMaxAttempts     = 3
77

78
        replicationServiceBusyInitialInterval    = 2 * time.Second
79
        replicationServiceBusyMaxInterval        = 10 * time.Second
80
        replicationServiceBusyExpirationInterval = 5 * time.Minute
81

82
        contextExpireThreshold = 10 * time.Millisecond
83

84
        // FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit
85
        FailureReasonCompleteResultExceedsLimit = "COMPLETE_RESULT_EXCEEDS_LIMIT"
86
        // FailureReasonFailureDetailsExceedsLimit is failureReason for failure details exceeds limit
87
        FailureReasonFailureDetailsExceedsLimit = "FAILURE_DETAILS_EXCEEDS_LIMIT"
88
        // FailureReasonCancelDetailsExceedsLimit is failureReason for cancel details exceeds limit
89
        FailureReasonCancelDetailsExceedsLimit = "CANCEL_DETAILS_EXCEEDS_LIMIT"
90
        // FailureReasonHeartbeatExceedsLimit is failureReason for heartbeat exceeds limit
91
        FailureReasonHeartbeatExceedsLimit = "HEARTBEAT_EXCEEDS_LIMIT"
92
        // FailureReasonDecisionBlobSizeExceedsLimit is the failureReason for decision blob exceeds size limit
93
        FailureReasonDecisionBlobSizeExceedsLimit = "DECISION_BLOB_SIZE_EXCEEDS_LIMIT"
94
        // FailureReasonSizeExceedsLimit is reason to fail workflow when history size or count exceed limit
95
        FailureReasonSizeExceedsLimit = "HISTORY_EXCEEDS_LIMIT"
96
        // FailureReasonTransactionSizeExceedsLimit is the failureReason for when transaction cannot be committed because it exceeds size limit
97
        FailureReasonTransactionSizeExceedsLimit = "TRANSACTION_SIZE_EXCEEDS_LIMIT"
98
        // FailureReasonDecisionAttemptsExceedsLimit is reason to fail workflow when decision attempts fail too many times
99
        FailureReasonDecisionAttemptsExceedsLimit = "DECISION_ATTEMPTS_EXCEEDS_LIMIT"
100
)
101

102
var (
103
        // ErrBlobSizeExceedsLimit is error for event blob size exceeds limit
104
        ErrBlobSizeExceedsLimit = &types.BadRequestError{Message: "Blob data size exceeds limit."}
105
        // ErrContextTimeoutTooShort is error for setting a very short context timeout when calling a long poll API
106
        ErrContextTimeoutTooShort = &types.BadRequestError{Message: "Context timeout is too short."}
107
        // ErrContextTimeoutNotSet is error for not setting a context timeout when calling a long poll API
108
        ErrContextTimeoutNotSet = &types.BadRequestError{Message: "Context timeout is not set."}
109
        // ErrDecisionResultCountTooLarge error for decision result count exceeds limit
110
        ErrDecisionResultCountTooLarge = &types.BadRequestError{Message: "Decision result count exceeds limit."}
111
)
112

113
// AwaitWaitGroup calls Wait on the given wait
114
// Returns true if the Wait() call succeeded before the timeout
115
// Returns false if the Wait() did not return before the timeout
116
func AwaitWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool {
785✔
117
        doneC := make(chan struct{})
785✔
118

785✔
119
        go func() {
1,570✔
120
                wg.Wait()
785✔
121
                close(doneC)
785✔
122
        }()
785✔
123

124
        select {
785✔
125
        case <-doneC:
784✔
126
                return true
784✔
127
        case <-time.After(timeout):
1✔
128
                return false
1✔
129
        }
130
}
131

132
// CreatePersistenceRetryPolicy creates a retry policy for persistence layer operations
133
func CreatePersistenceRetryPolicy() backoff.RetryPolicy {
9,996✔
134
        policy := backoff.NewExponentialRetryPolicy(retryPersistenceOperationInitialInterval)
9,996✔
135
        policy.SetMaximumInterval(retryPersistenceOperationMaxInterval)
9,996✔
136
        policy.SetExpirationInterval(retryPersistenceOperationExpirationInterval)
9,996✔
137

9,996✔
138
        return policy
9,996✔
139
}
9,996✔
140

141
// CreateHistoryServiceRetryPolicy creates a retry policy for calls to history service
142
func CreateHistoryServiceRetryPolicy() backoff.RetryPolicy {
52✔
143
        policy := backoff.NewExponentialRetryPolicy(historyServiceOperationInitialInterval)
52✔
144
        policy.SetMaximumInterval(historyServiceOperationMaxInterval)
52✔
145
        policy.SetExpirationInterval(historyServiceOperationExpirationInterval)
52✔
146

52✔
147
        return policy
52✔
148
}
52✔
149

150
// CreateMatchingServiceRetryPolicy creates a retry policy for calls to matching service
151
func CreateMatchingServiceRetryPolicy() backoff.RetryPolicy {
49✔
152
        policy := backoff.NewExponentialRetryPolicy(matchingServiceOperationInitialInterval)
49✔
153
        policy.SetMaximumInterval(matchingServiceOperationMaxInterval)
49✔
154
        policy.SetExpirationInterval(matchingServiceOperationExpirationInterval)
49✔
155

49✔
156
        return policy
49✔
157
}
49✔
158

159
// CreateFrontendServiceRetryPolicy creates a retry policy for calls to frontend service
160
func CreateFrontendServiceRetryPolicy() backoff.RetryPolicy {
52✔
161
        policy := backoff.NewExponentialRetryPolicy(frontendServiceOperationInitialInterval)
52✔
162
        policy.SetMaximumInterval(frontendServiceOperationMaxInterval)
52✔
163
        policy.SetExpirationInterval(frontendServiceOperationExpirationInterval)
52✔
164

52✔
165
        return policy
52✔
166
}
52✔
167

168
// CreateAdminServiceRetryPolicy creates a retry policy for calls to matching service
169
func CreateAdminServiceRetryPolicy() backoff.RetryPolicy {
7✔
170
        policy := backoff.NewExponentialRetryPolicy(adminServiceOperationInitialInterval)
7✔
171
        policy.SetMaximumInterval(adminServiceOperationMaxInterval)
7✔
172
        policy.SetExpirationInterval(adminServiceOperationExpirationInterval)
7✔
173

7✔
174
        return policy
7✔
175
}
7✔
176

177
// CreateDlqPublishRetryPolicy creates a retry policy for kafka operation
178
func CreateDlqPublishRetryPolicy() backoff.RetryPolicy {
4✔
179
        policy := backoff.NewExponentialRetryPolicy(retryKafkaOperationInitialInterval)
4✔
180
        policy.SetMaximumInterval(retryKafkaOperationMaxInterval)
4✔
181
        policy.SetMaximumAttempts(retryKafkaOperationMaxAttempts)
4✔
182

4✔
183
        return policy
4✔
184
}
4✔
185

186
// CreateTaskProcessingRetryPolicy creates a retry policy for task processing
187
func CreateTaskProcessingRetryPolicy() backoff.RetryPolicy {
22✔
188
        policy := backoff.NewExponentialRetryPolicy(retryTaskProcessingInitialInterval)
22✔
189
        policy.SetMaximumInterval(retryTaskProcessingMaxInterval)
22✔
190
        policy.SetMaximumAttempts(retryTaskProcessingMaxAttempts)
22✔
191

22✔
192
        return policy
22✔
193
}
22✔
194

195
// CreateReplicationServiceBusyRetryPolicy creates a retry policy to handle replication service busy
196
func CreateReplicationServiceBusyRetryPolicy() backoff.RetryPolicy {
124✔
197
        policy := backoff.NewExponentialRetryPolicy(replicationServiceBusyInitialInterval)
124✔
198
        policy.SetMaximumInterval(replicationServiceBusyMaxInterval)
124✔
199
        policy.SetExpirationInterval(replicationServiceBusyExpirationInterval)
124✔
200

124✔
201
        return policy
124✔
202
}
124✔
203

204
// IsValidIDLength checks if id is valid according to its length
205
func IsValidIDLength(
206
        id string,
207
        scope metrics.Scope,
208
        warnLimit int,
209
        errorLimit int,
210
        metricsCounter int,
211
        domainName string,
212
        logger log.Logger,
213
        idTypeViolationTag tag.Tag,
214
) bool {
16,682✔
215
        if len(id) > warnLimit {
16,684✔
216
                scope.IncCounter(metricsCounter)
2✔
217
                logger.Warn("ID length exceeds limit.",
2✔
218
                        tag.WorkflowDomainName(domainName),
2✔
219
                        tag.Name(id),
2✔
220
                        idTypeViolationTag)
2✔
221
        }
2✔
222
        return len(id) <= errorLimit
16,682✔
223
}
224

225
// CheckDecisionResultLimit checks if decision result count exceeds limits.
226
func CheckDecisionResultLimit(
227
        actualSize int,
228
        limit int,
229
        scope metrics.Scope,
230
) error {
929✔
231
        scope.RecordTimer(metrics.DecisionResultCount, time.Duration(actualSize))
929✔
232
        if limit > 0 && actualSize > limit {
929✔
233
                return ErrDecisionResultCountTooLarge
×
234
        }
×
235
        return nil
929✔
236
}
237

238
// ToServiceTransientError converts an error to ServiceTransientError
239
func ToServiceTransientError(err error) error {
25,941✔
240
        if err == nil || IsServiceTransientError(err) {
51,881✔
241
                return err
25,940✔
242
        }
25,940✔
243
        return yarpcerrors.Newf(yarpcerrors.CodeUnavailable, err.Error())
1✔
244
}
245

246
// HistoryRetryFuncFrontendExceptions checks if an error should be retried
247
// in a call from frontend
248
func FrontendRetry(err error) bool {
277✔
249
        var sbErr *types.ServiceBusyError
277✔
250
        if errors.As(err, &sbErr) {
295✔
251
                // If the service busy error is due to workflow id rate limiting, proxy it to the caller
18✔
252
                return sbErr.Reason != WorkflowIDRateLimitReason
18✔
253
        }
18✔
254
        return IsServiceTransientError(err)
259✔
255
}
256

257
// IsServiceTransientError checks if the error is a transient error.
258
func IsServiceTransientError(err error) bool {
466✔
259

466✔
260
        var (
466✔
261
                typesInternalServiceError    *types.InternalServiceError
466✔
262
                typesServiceBusyError        *types.ServiceBusyError
466✔
263
                typesShardOwnershipLostError *types.ShardOwnershipLostError
466✔
264
                yarpcErrorsStatus            *yarpcerrors.Status
466✔
265
        )
466✔
266

466✔
267
        switch {
466✔
268
        case errors.As(err, &typesInternalServiceError):
3✔
269
                return true
3✔
270
        case errors.As(err, &typesServiceBusyError):
1✔
271
                return true
1✔
272
        case errors.As(err, &typesShardOwnershipLostError):
1✔
273
                return true
1✔
274
        case errors.As(err, &yarpcErrorsStatus):
273✔
275
                // We only selectively retry the following yarpc errors client can safe retry with a backoff
273✔
276
                if yarpcerrors.IsUnavailable(err) ||
273✔
277
                        yarpcerrors.IsUnknown(err) ||
273✔
278
                        yarpcerrors.IsInternal(err) {
278✔
279
                        return true
5✔
280
                }
5✔
281
                return false
268✔
282
        }
283

284
        return false
188✔
285
}
286

287
// IsEntityNotExistsError checks if the error is an entity not exists error.
288
func IsEntityNotExistsError(err error) bool {
2✔
289
        _, ok := err.(*types.EntityNotExistsError)
2✔
290
        return ok
2✔
291
}
2✔
292

293
// IsServiceBusyError checks if the error is a service busy error.
294
func IsServiceBusyError(err error) bool {
6✔
295
        switch err.(type) {
6✔
296
        case *types.ServiceBusyError:
1✔
297
                return true
1✔
298
        }
299
        return false
5✔
300
}
301

302
// IsContextTimeoutError checks if the error is context timeout error
303
func IsContextTimeoutError(err error) bool {
137✔
304
        switch err := err.(type) {
137✔
305
        case *types.InternalServiceError:
3✔
306
                return err.Message == context.DeadlineExceeded.Error()
3✔
307
        }
308
        return err == context.DeadlineExceeded || yarpcerrors.IsDeadlineExceeded(err)
134✔
309
}
310

311
// WorkflowIDToHistoryShard is used to map a workflowID to a shardID
312
func WorkflowIDToHistoryShard(workflowID string, numberOfShards int) int {
13,467✔
313
        hash := farm.Fingerprint32([]byte(workflowID))
13,467✔
314
        return int(hash % uint32(numberOfShards))
13,467✔
315
}
13,467✔
316

317
// DomainIDToHistoryShard is used to map a domainID to a shardID
318
func DomainIDToHistoryShard(domainID string, numberOfShards int) int {
2✔
319
        hash := farm.Fingerprint32([]byte(domainID))
2✔
320
        return int(hash % uint32(numberOfShards))
2✔
321
}
2✔
322

323
// PrettyPrintHistory prints history in human readable format
324
func PrettyPrintHistory(history *types.History, logger log.Logger) {
66✔
325
        data, err := json.MarshalIndent(history, "", "    ")
66✔
326

66✔
327
        if err != nil {
66✔
328
                logger.Error("Error serializing history: %v\n", tag.Error(err))
×
329
        }
×
330

331
        fmt.Println("******************************************")
66✔
332
        fmt.Println("History", tag.DetailInfo(string(data)))
66✔
333
        fmt.Println("******************************************")
66✔
334
}
335

336
// IsValidContext checks that the thrift context is not expired on cancelled.
337
// Returns nil if the context is still valid. Otherwise, returns the result of
338
// ctx.Err()
339
func IsValidContext(ctx context.Context) error {
8,306✔
340
        ch := ctx.Done()
8,306✔
341
        if ch != nil {
16,611✔
342
                select {
8,305✔
343
                case <-ch:
2✔
344
                        return ctx.Err()
2✔
345
                default:
8,303✔
346
                        // go to the next line
347
                }
348
        }
349

350
        deadline, ok := ctx.Deadline()
8,304✔
351
        if ok && time.Until(deadline) < contextExpireThreshold {
8,305✔
352
                return context.DeadlineExceeded
1✔
353
        }
1✔
354
        return nil
8,303✔
355
}
356

357
// emptyCancelFunc wraps an empty func by context.CancelFunc interface
358
var emptyCancelFunc = context.CancelFunc(func() {})
×
359

360
// CreateChildContext creates a child context which shorted context timeout
361
// from the given parent context
362
// tailroom must be in range [0, 1] and
363
// (1-tailroom) * parent timeout will be the new child context timeout
364
// if tailroom is less 0, tailroom will be considered as 0
365
// if tailroom is greater than 1, tailroom wil be considered as 1
366
func CreateChildContext(
367
        parent context.Context,
368
        tailroom float64,
369
) (context.Context, context.CancelFunc) {
9✔
370
        if parent == nil {
10✔
371
                return nil, emptyCancelFunc
1✔
372
        }
1✔
373
        if parent.Err() != nil {
10✔
374
                return parent, emptyCancelFunc
2✔
375
        }
2✔
376

377
        now := time.Now()
6✔
378
        deadline, ok := parent.Deadline()
6✔
379
        if !ok || deadline.Before(now) {
7✔
380
                return parent, emptyCancelFunc
1✔
381
        }
1✔
382

383
        // if tailroom is about or less 0, then return a context with the same deadline as parent
384
        if tailroom <= 0 {
7✔
385
                return context.WithDeadline(parent, deadline)
2✔
386
        }
2✔
387
        // if tailroom is about or greater 1, then return a context with deadline of now
388
        if tailroom >= 1 {
5✔
389
                return context.WithDeadline(parent, now)
2✔
390
        }
2✔
391

392
        newDeadline := now.Add(time.Duration(math.Ceil(float64(deadline.Sub(now)) * (1.0 - tailroom))))
1✔
393
        return context.WithDeadline(parent, newDeadline)
1✔
394
}
395

396
// GenerateRandomString is used for generate test string
397
func GenerateRandomString(n int) string {
6✔
398
        if n <= 0 {
8✔
399
                return ""
2✔
400
        }
2✔
401

402
        letterRunes := []rune("random")
4✔
403
        b := make([]rune, n)
4✔
404
        for i := range b {
17✔
405
                b[i] = letterRunes[rand.Intn(len(letterRunes))]
13✔
406
        }
13✔
407
        return string(b)
4✔
408
}
409

410
// CreateMatchingPollForDecisionTaskResponse create response for matching's PollForDecisionTask
411
func CreateMatchingPollForDecisionTaskResponse(historyResponse *types.RecordDecisionTaskStartedResponse, workflowExecution *types.WorkflowExecution, token []byte) *types.MatchingPollForDecisionTaskResponse {
1,163✔
412
        matchingResp := &types.MatchingPollForDecisionTaskResponse{
1,163✔
413
                WorkflowExecution:         workflowExecution,
1,163✔
414
                TaskToken:                 token,
1,163✔
415
                Attempt:                   historyResponse.GetAttempt(),
1,163✔
416
                WorkflowType:              historyResponse.WorkflowType,
1,163✔
417
                StartedEventID:            historyResponse.StartedEventID,
1,163✔
418
                StickyExecutionEnabled:    historyResponse.StickyExecutionEnabled,
1,163✔
419
                NextEventID:               historyResponse.NextEventID,
1,163✔
420
                DecisionInfo:              historyResponse.DecisionInfo,
1,163✔
421
                WorkflowExecutionTaskList: historyResponse.WorkflowExecutionTaskList,
1,163✔
422
                BranchToken:               historyResponse.BranchToken,
1,163✔
423
                ScheduledTimestamp:        historyResponse.ScheduledTimestamp,
1,163✔
424
                StartedTimestamp:          historyResponse.StartedTimestamp,
1,163✔
425
                Queries:                   historyResponse.Queries,
1,163✔
426
                TotalHistoryBytes:         historyResponse.HistorySize,
1,163✔
427
        }
1,163✔
428
        if historyResponse.GetPreviousStartedEventID() != EmptyEventID {
2,326✔
429
                matchingResp.PreviousStartedEventID = historyResponse.PreviousStartedEventID
1,163✔
430
        }
1,163✔
431
        return matchingResp
1,163✔
432
}
433

434
// MinInt64 returns the smaller of two given int64
435
func MinInt64(a, b int64) int64 {
14,185✔
436
        if a < b {
18,546✔
437
                return a
4,361✔
438
        }
4,361✔
439
        return b
9,827✔
440
}
441

442
// MaxInt64 returns the greater of two given int64
443
func MaxInt64(a, b int64) int64 {
1,044✔
444
        if a > b {
1,044✔
445
                return a
×
446
        }
×
447
        return b
1,044✔
448
}
449

450
// MinInt32 return smaller one of two inputs int32
451
func MinInt32(a, b int32) int32 {
2,747✔
452
        if a < b {
5,449✔
453
                return a
2,702✔
454
        }
2,702✔
455
        return b
45✔
456
}
457

458
// MinInt returns the smaller of two given integers
459
func MinInt(a, b int) int {
×
460
        if a < b {
×
461
                return a
×
462
        }
×
463
        return b
×
464
}
465

466
// MaxInt returns the greater one of two given integers
467
func MaxInt(a, b int) int {
17,033✔
468
        if a > b {
17,033✔
469
                return a
×
470
        }
×
471
        return b
17,033✔
472
}
473

474
// MinDuration returns the smaller of two given time duration
475
func MinDuration(a, b time.Duration) time.Duration {
78✔
476
        if a < b {
93✔
477
                return a
15✔
478
        }
15✔
479
        return b
63✔
480
}
481

482
// MaxDuration returns the greater of two given time durations
483
func MaxDuration(a, b time.Duration) time.Duration {
18✔
484
        if a > b {
18✔
485
                return a
×
486
        }
×
487
        return b
18✔
488
}
489

490
// SortInt64Slice sorts the given int64 slice.
491
// Sort is not guaranteed to be stable.
492
func SortInt64Slice(slice []int64) {
×
493
        sort.Slice(slice, func(i int, j int) bool {
×
494
                return slice[i] < slice[j]
×
495
        })
×
496
}
497

498
// ValidateRetryPolicy validates a retry policy
499
func ValidateRetryPolicy(policy *types.RetryPolicy) error {
1,438✔
500
        if policy == nil {
2,816✔
501
                // nil policy is valid which means no retry
1,378✔
502
                return nil
1,378✔
503
        }
1,378✔
504
        if policy.GetInitialIntervalInSeconds() <= 0 {
62✔
505
                return &types.BadRequestError{Message: "InitialIntervalInSeconds must be greater than 0 on retry policy."}
2✔
506
        }
2✔
507
        if policy.GetBackoffCoefficient() < 1 {
59✔
508
                return &types.BadRequestError{Message: "BackoffCoefficient cannot be less than 1 on retry policy."}
1✔
509
        }
1✔
510
        if policy.GetMaximumIntervalInSeconds() < 0 {
58✔
511
                return &types.BadRequestError{Message: "MaximumIntervalInSeconds cannot be less than 0 on retry policy."}
1✔
512
        }
1✔
513
        if policy.GetMaximumIntervalInSeconds() > 0 && policy.GetMaximumIntervalInSeconds() < policy.GetInitialIntervalInSeconds() {
57✔
514
                return &types.BadRequestError{Message: "MaximumIntervalInSeconds cannot be less than InitialIntervalInSeconds on retry policy."}
1✔
515
        }
1✔
516
        if policy.GetMaximumAttempts() < 0 {
56✔
517
                return &types.BadRequestError{Message: "MaximumAttempts cannot be less than 0 on retry policy."}
1✔
518
        }
1✔
519
        if policy.GetExpirationIntervalInSeconds() < 0 {
55✔
520
                return &types.BadRequestError{Message: "ExpirationIntervalInSeconds cannot be less than 0 on retry policy."}
1✔
521
        }
1✔
522
        if policy.GetMaximumAttempts() == 0 && policy.GetExpirationIntervalInSeconds() == 0 {
54✔
523
                return &types.BadRequestError{Message: "MaximumAttempts and ExpirationIntervalInSeconds are both 0. At least one of them must be specified."}
1✔
524
        }
1✔
525
        return nil
52✔
526
}
527

528
// CreateHistoryStartWorkflowRequest create a start workflow request for history
529
func CreateHistoryStartWorkflowRequest(
530
        domainID string,
531
        startRequest *types.StartWorkflowExecutionRequest,
532
        now time.Time,
533
        partitionConfig map[string]string,
534
) (*types.HistoryStartWorkflowExecutionRequest, error) {
616✔
535
        histRequest := &types.HistoryStartWorkflowExecutionRequest{
616✔
536
                DomainUUID:      domainID,
616✔
537
                StartRequest:    startRequest,
616✔
538
                PartitionConfig: partitionConfig,
616✔
539
        }
616✔
540

616✔
541
        delayStartSeconds := startRequest.GetDelayStartSeconds()
616✔
542
        jitterStartSeconds := startRequest.GetJitterStartSeconds()
616✔
543
        firstDecisionTaskBackoffSeconds := delayStartSeconds
616✔
544
        if len(startRequest.GetCronSchedule()) > 0 {
683✔
545
                delayedStartTime := now.Add(time.Second * time.Duration(delayStartSeconds))
67✔
546
                var err error
67✔
547
                firstDecisionTaskBackoffSeconds, err = backoff.GetBackoffForNextScheduleInSeconds(
67✔
548
                        startRequest.GetCronSchedule(), delayedStartTime, delayedStartTime, jitterStartSeconds)
67✔
549
                if err != nil {
67✔
550
                        return nil, err
×
551
                }
×
552

553
                // backoff seconds was calculated based on delayed start time, so we need to
554
                // add the delayStartSeconds to that backoff.
555
                firstDecisionTaskBackoffSeconds += delayStartSeconds
67✔
556
        } else if jitterStartSeconds > 0 {
571✔
557
                // Add a random jitter to start time, if requested.
22✔
558
                firstDecisionTaskBackoffSeconds += rand.Int31n(jitterStartSeconds + 1)
22✔
559
        }
22✔
560

561
        histRequest.FirstDecisionTaskBackoffSeconds = Int32Ptr(firstDecisionTaskBackoffSeconds)
616✔
562

616✔
563
        if startRequest.RetryPolicy != nil && startRequest.RetryPolicy.GetExpirationIntervalInSeconds() > 0 {
645✔
564
                expirationInSeconds := startRequest.RetryPolicy.GetExpirationIntervalInSeconds() + firstDecisionTaskBackoffSeconds
29✔
565
                // expirationTime calculates from first decision task schedule to the end of the workflow
29✔
566
                deadline := now.Add(time.Duration(expirationInSeconds) * time.Second)
29✔
567
                histRequest.ExpirationTimestamp = Int64Ptr(deadline.Round(time.Millisecond).UnixNano())
29✔
568
        }
29✔
569

570
        return histRequest, nil
616✔
571
}
572

573
// CheckEventBlobSizeLimit checks if a blob data exceeds limits. It logs a warning if it exceeds warnLimit,
574
// and return ErrBlobSizeExceedsLimit if it exceeds errorLimit.
575
func CheckEventBlobSizeLimit(
576
        actualSize int,
577
        warnLimit int,
578
        errorLimit int,
579
        domainID string,
580
        workflowID string,
581
        runID string,
582
        scope metrics.Scope,
583
        logger log.Logger,
584
        blobSizeViolationOperationTag tag.Tag,
585
) error {
3,000✔
586

3,000✔
587
        scope.RecordTimer(metrics.EventBlobSize, time.Duration(actualSize))
3,000✔
588

3,000✔
589
        if actualSize > warnLimit {
3,000✔
590
                if logger != nil {
×
591
                        logger.Warn("Blob size exceeds limit.",
×
592
                                tag.WorkflowDomainID(domainID),
×
593
                                tag.WorkflowID(workflowID),
×
594
                                tag.WorkflowRunID(runID),
×
595
                                tag.WorkflowSize(int64(actualSize)),
×
596
                                blobSizeViolationOperationTag)
×
597
                }
×
598

599
                if actualSize > errorLimit {
×
600
                        return ErrBlobSizeExceedsLimit
×
601
                }
×
602
        }
603
        return nil
3,000✔
604
}
605

606
// ValidateLongPollContextTimeout check if the context timeout for a long poll handler is too short or below a normal value.
607
// If the timeout is not set or too short, it logs an error, and return ErrContextTimeoutNotSet or ErrContextTimeoutTooShort
608
// accordingly. If the timeout is only below a normal value, it just logs an info and return nil.
609
func ValidateLongPollContextTimeout(
610
        ctx context.Context,
611
        handlerName string,
612
        logger log.Logger,
613
) error {
3,837✔
614

3,837✔
615
        deadline, err := ValidateLongPollContextTimeoutIsSet(ctx, handlerName, logger)
3,837✔
616
        if err != nil {
3,838✔
617
                return err
1✔
618
        }
1✔
619
        timeout := time.Until(deadline)
3,836✔
620
        if timeout < MinLongPollTimeout {
3,837✔
621
                err := ErrContextTimeoutTooShort
1✔
622
                logger.Error("Context timeout is too short for long poll API.",
1✔
623
                        tag.WorkflowHandlerName(handlerName), tag.Error(err), tag.WorkflowPollContextTimeout(timeout))
1✔
624
                return err
1✔
625
        }
1✔
626
        if timeout < CriticalLongPollTimeout {
3,836✔
627
                logger.Debug("Context timeout is lower than critical value for long poll API.",
1✔
628
                        tag.WorkflowHandlerName(handlerName), tag.WorkflowPollContextTimeout(timeout))
1✔
629
        }
1✔
630
        return nil
3,835✔
631
}
632

633
// ValidateLongPollContextTimeoutIsSet checks if the context timeout is set for long poll requests.
634
func ValidateLongPollContextTimeoutIsSet(
635
        ctx context.Context,
636
        handlerName string,
637
        logger log.Logger,
638
) (time.Time, error) {
6,513✔
639

6,513✔
640
        deadline, ok := ctx.Deadline()
6,513✔
641
        if !ok {
6,514✔
642
                err := ErrContextTimeoutNotSet
1✔
643
                logger.Error("Context timeout not set for long poll API.",
1✔
644
                        tag.WorkflowHandlerName(handlerName), tag.Error(err))
1✔
645
                return deadline, err
1✔
646
        }
1✔
647
        return deadline, nil
6,512✔
648
}
649

650
// ValidateDomainUUID checks if the given domainID string is a valid UUID
651
func ValidateDomainUUID(
652
        domainUUID string,
653
) error {
5,510✔
654

5,510✔
655
        if domainUUID == "" {
5,511✔
656
                return &types.BadRequestError{Message: "Missing domain UUID."}
1✔
657
        } else if uuid.Parse(domainUUID) == nil {
5,511✔
658
                return &types.BadRequestError{Message: "Invalid domain UUID."}
1✔
659
        }
1✔
660
        return nil
5,508✔
661
}
662

663
// GetSizeOfMapStringToByteArray get size of map[string][]byte
664
func GetSizeOfMapStringToByteArray(input map[string][]byte) int {
58✔
665
        if input == nil {
88✔
666
                return 0
30✔
667
        }
30✔
668

669
        res := 0
28✔
670
        for k, v := range input {
69✔
671
                res += len(k) + len(v)
41✔
672
        }
41✔
673
        return res + golandMapReserverNumberOfBytes
28✔
674
}
675

676
// GetSizeOfHistoryEvent returns approximate size in bytes of the history event taking into account byte arrays only now
677
func GetSizeOfHistoryEvent(event *types.HistoryEvent) uint64 {
44✔
678
        if event == nil || event.EventType == nil {
46✔
679
                return 0
2✔
680
        }
2✔
681

682
        res := 0
42✔
683
        switch *event.EventType {
42✔
684
        case types.EventTypeWorkflowExecutionStarted:
1✔
685
                res += len(event.WorkflowExecutionStartedEventAttributes.Input)
1✔
686
                res += len(event.WorkflowExecutionStartedEventAttributes.ContinuedFailureDetails)
1✔
687
                res += len(event.WorkflowExecutionStartedEventAttributes.LastCompletionResult)
1✔
688
                if event.WorkflowExecutionStartedEventAttributes.Memo != nil {
2✔
689
                        res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionStartedEventAttributes.Memo.Fields)
1✔
690
                }
1✔
691
                if event.WorkflowExecutionStartedEventAttributes.Header != nil {
2✔
692
                        res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionStartedEventAttributes.Header.Fields)
1✔
693
                }
1✔
694
                if event.WorkflowExecutionStartedEventAttributes.SearchAttributes != nil {
2✔
695
                        res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionStartedEventAttributes.SearchAttributes.IndexedFields)
1✔
696
                }
1✔
697
        case types.EventTypeWorkflowExecutionCompleted:
1✔
698
                res += len(event.WorkflowExecutionCompletedEventAttributes.Result)
1✔
699
        case types.EventTypeWorkflowExecutionFailed:
1✔
700
                res += len(event.WorkflowExecutionFailedEventAttributes.Details)
1✔
701
        case types.EventTypeWorkflowExecutionTimedOut:
1✔
702
        case types.EventTypeDecisionTaskScheduled:
1✔
703
        case types.EventTypeDecisionTaskStarted:
1✔
704
        case types.EventTypeDecisionTaskCompleted:
1✔
705
                res += len(event.DecisionTaskCompletedEventAttributes.ExecutionContext)
1✔
706
        case types.EventTypeDecisionTaskTimedOut:
1✔
707
        case types.EventTypeDecisionTaskFailed:
1✔
708
                res += len(event.DecisionTaskFailedEventAttributes.Details)
1✔
709
        case types.EventTypeActivityTaskScheduled:
1✔
710
                res += len(event.ActivityTaskScheduledEventAttributes.Input)
1✔
711
                if event.ActivityTaskScheduledEventAttributes.Header != nil {
2✔
712
                        res += GetSizeOfMapStringToByteArray(event.ActivityTaskScheduledEventAttributes.Header.Fields)
1✔
713
                }
1✔
714
        case types.EventTypeActivityTaskStarted:
1✔
715
                res += len(event.ActivityTaskStartedEventAttributes.LastFailureDetails)
1✔
716
        case types.EventTypeActivityTaskCompleted:
1✔
717
                res += len(event.ActivityTaskCompletedEventAttributes.Result)
1✔
718
        case types.EventTypeActivityTaskFailed:
1✔
719
                res += len(event.ActivityTaskFailedEventAttributes.Details)
1✔
720
        case types.EventTypeActivityTaskTimedOut:
1✔
721
                res += len(event.ActivityTaskTimedOutEventAttributes.Details)
1✔
722
                res += len(event.ActivityTaskTimedOutEventAttributes.LastFailureDetails)
1✔
723
        case types.EventTypeActivityTaskCancelRequested:
1✔
724
        case types.EventTypeRequestCancelActivityTaskFailed:
1✔
725
        case types.EventTypeActivityTaskCanceled:
1✔
726
                res += len(event.ActivityTaskCanceledEventAttributes.Details)
1✔
727
        case types.EventTypeTimerStarted:
1✔
728
        case types.EventTypeTimerFired:
1✔
729
        case types.EventTypeCancelTimerFailed:
1✔
730
        case types.EventTypeTimerCanceled:
1✔
731
        case types.EventTypeWorkflowExecutionCancelRequested:
1✔
732
        case types.EventTypeWorkflowExecutionCanceled:
1✔
733
                res += len(event.WorkflowExecutionCanceledEventAttributes.Details)
1✔
734
        case types.EventTypeRequestCancelExternalWorkflowExecutionInitiated:
1✔
735
                res += len(event.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.Control)
1✔
736
        case types.EventTypeRequestCancelExternalWorkflowExecutionFailed:
1✔
737
                res += len(event.RequestCancelExternalWorkflowExecutionFailedEventAttributes.Control)
1✔
738
        case types.EventTypeExternalWorkflowExecutionCancelRequested:
1✔
739
        case types.EventTypeMarkerRecorded:
1✔
740
                res += len(event.MarkerRecordedEventAttributes.Details)
1✔
741
        case types.EventTypeWorkflowExecutionSignaled:
1✔
742
                res += len(event.WorkflowExecutionSignaledEventAttributes.Input)
1✔
743
        case types.EventTypeWorkflowExecutionTerminated:
1✔
744
                res += len(event.WorkflowExecutionTerminatedEventAttributes.Details)
1✔
745
        case types.EventTypeWorkflowExecutionContinuedAsNew:
1✔
746
                res += len(event.WorkflowExecutionContinuedAsNewEventAttributes.Input)
1✔
747
                if event.WorkflowExecutionContinuedAsNewEventAttributes.Memo != nil {
2✔
748
                        res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionContinuedAsNewEventAttributes.Memo.Fields)
1✔
749
                }
1✔
750
                if event.WorkflowExecutionContinuedAsNewEventAttributes.Header != nil {
2✔
751
                        res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionContinuedAsNewEventAttributes.Header.Fields)
1✔
752
                }
1✔
753
                if event.WorkflowExecutionContinuedAsNewEventAttributes.SearchAttributes != nil {
2✔
754
                        res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionContinuedAsNewEventAttributes.SearchAttributes.IndexedFields)
1✔
755
                }
1✔
756
        case types.EventTypeStartChildWorkflowExecutionInitiated:
1✔
757
                res += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Input)
1✔
758
                res += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Control)
1✔
759
                if event.StartChildWorkflowExecutionInitiatedEventAttributes.Memo != nil {
2✔
760
                        res += GetSizeOfMapStringToByteArray(event.StartChildWorkflowExecutionInitiatedEventAttributes.Memo.Fields)
1✔
761
                }
1✔
762
                if event.StartChildWorkflowExecutionInitiatedEventAttributes.Header != nil {
2✔
763
                        res += GetSizeOfMapStringToByteArray(event.StartChildWorkflowExecutionInitiatedEventAttributes.Header.Fields)
1✔
764
                }
1✔
765
                if event.StartChildWorkflowExecutionInitiatedEventAttributes.SearchAttributes != nil {
2✔
766
                        res += GetSizeOfMapStringToByteArray(event.StartChildWorkflowExecutionInitiatedEventAttributes.SearchAttributes.IndexedFields)
1✔
767
                }
1✔
768
        case types.EventTypeStartChildWorkflowExecutionFailed:
1✔
769
                res += len(event.StartChildWorkflowExecutionFailedEventAttributes.Control)
1✔
770
        case types.EventTypeChildWorkflowExecutionStarted:
1✔
771
                if event.ChildWorkflowExecutionStartedEventAttributes != nil && event.ChildWorkflowExecutionStartedEventAttributes.Header != nil {
2✔
772
                        res += GetSizeOfMapStringToByteArray(event.ChildWorkflowExecutionStartedEventAttributes.Header.Fields)
1✔
773
                }
1✔
774
        case types.EventTypeChildWorkflowExecutionCompleted:
1✔
775
                res += len(event.ChildWorkflowExecutionCompletedEventAttributes.Result)
1✔
776
        case types.EventTypeChildWorkflowExecutionFailed:
1✔
777
                res += len(event.ChildWorkflowExecutionFailedEventAttributes.Details)
1✔
778
        case types.EventTypeChildWorkflowExecutionCanceled:
1✔
779
                res += len(event.ChildWorkflowExecutionCanceledEventAttributes.Details)
1✔
780
        case types.EventTypeChildWorkflowExecutionTimedOut:
1✔
781
        case types.EventTypeChildWorkflowExecutionTerminated:
1✔
782
        case types.EventTypeSignalExternalWorkflowExecutionInitiated:
1✔
783
                res += len(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Input)
1✔
784
                res += len(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Control)
1✔
785
        case types.EventTypeSignalExternalWorkflowExecutionFailed:
1✔
786
                res += len(event.SignalExternalWorkflowExecutionFailedEventAttributes.Control)
1✔
787
        case types.EventTypeExternalWorkflowExecutionSignaled:
1✔
788
                res += len(event.ExternalWorkflowExecutionSignaledEventAttributes.Control)
1✔
789
        case types.EventTypeUpsertWorkflowSearchAttributes:
1✔
790
                if event.UpsertWorkflowSearchAttributesEventAttributes.SearchAttributes != nil {
2✔
791
                        res += GetSizeOfMapStringToByteArray(event.UpsertWorkflowSearchAttributesEventAttributes.SearchAttributes.IndexedFields)
1✔
792
                }
1✔
793
        }
794
        return uint64(res)
42✔
795
}
796

797
// IsJustOrderByClause return true is query start with order by
798
func IsJustOrderByClause(clause string) bool {
336✔
799
        whereClause := strings.TrimSpace(clause)
336✔
800
        whereClause = strings.ToLower(whereClause)
336✔
801
        return strings.HasPrefix(whereClause, "order by")
336✔
802
}
336✔
803

804
// ConvertIndexedValueTypeToInternalType takes fieldType as interface{} and convert to IndexedValueType.
805
// Because different implementation of dynamic config client may lead to different types
806
func ConvertIndexedValueTypeToInternalType(fieldType interface{}, logger log.Logger) types.IndexedValueType {
201✔
807
        switch t := fieldType.(type) {
201✔
808
        case float64:
6✔
809
                return types.IndexedValueType(t)
6✔
810
        case int:
6✔
811
                return types.IndexedValueType(t)
6✔
812
        case types.IndexedValueType:
177✔
813
                return t
177✔
814
        case []byte:
6✔
815
                var result types.IndexedValueType
6✔
816
                if err := result.UnmarshalText(t); err != nil {
6✔
817
                        logger.Error("unknown index value type", tag.Value(fieldType), tag.ValueType(t), tag.Error(err))
×
818
                        return fieldType.(types.IndexedValueType) // it will panic and been captured by logger
×
819
                }
×
820
                return result
6✔
821
        case string:
6✔
822
                var result types.IndexedValueType
6✔
823
                if err := result.UnmarshalText([]byte(t)); err != nil {
6✔
824
                        logger.Error("unknown index value type", tag.Value(fieldType), tag.ValueType(t), tag.Error(err))
×
825
                        return fieldType.(types.IndexedValueType) // it will panic and been captured by logger
×
826
                }
×
827
                return result
6✔
828
        default:
×
829
                // Unknown fieldType, please make sure dynamic config return correct value type
×
830
                logger.Error("unknown index value type", tag.Value(fieldType), tag.ValueType(t))
×
831
                return fieldType.(types.IndexedValueType) // it will panic and been captured by logger
×
832
        }
833
}
834

835
// DeserializeSearchAttributeValue takes json encoded search attribute value and it's type as input, then
836
// unmarshal the value into a concrete type and return the value
837
func DeserializeSearchAttributeValue(value []byte, valueType types.IndexedValueType) (interface{}, error) {
148✔
838
        switch valueType {
148✔
839
        case types.IndexedValueTypeString, types.IndexedValueTypeKeyword:
84✔
840
                var val string
84✔
841
                if err := json.Unmarshal(value, &val); err != nil {
94✔
842
                        var listVal []string
10✔
843
                        err = json.Unmarshal(value, &listVal)
10✔
844
                        return listVal, err
10✔
845
                }
10✔
846
                return val, nil
74✔
847
        case types.IndexedValueTypeInt:
30✔
848
                var val int64
30✔
849
                if err := json.Unmarshal(value, &val); err != nil {
32✔
850
                        var listVal []int64
2✔
851
                        err = json.Unmarshal(value, &listVal)
2✔
852
                        return listVal, err
2✔
853
                }
2✔
854
                return val, nil
28✔
855
        case types.IndexedValueTypeDouble:
15✔
856
                var val float64
15✔
857
                if err := json.Unmarshal(value, &val); err != nil {
17✔
858
                        var listVal []float64
2✔
859
                        err = json.Unmarshal(value, &listVal)
2✔
860
                        return listVal, err
2✔
861
                }
2✔
862
                return val, nil
13✔
863
        case types.IndexedValueTypeBool:
3✔
864
                var val bool
3✔
865
                if err := json.Unmarshal(value, &val); err != nil {
5✔
866
                        var listVal []bool
2✔
867
                        err = json.Unmarshal(value, &listVal)
2✔
868
                        return listVal, err
2✔
869
                }
2✔
870
                return val, nil
1✔
871
        case types.IndexedValueTypeDatetime:
15✔
872
                var val time.Time
15✔
873
                if err := json.Unmarshal(value, &val); err != nil {
17✔
874
                        var listVal []time.Time
2✔
875
                        err = json.Unmarshal(value, &listVal)
2✔
876
                        return listVal, err
2✔
877
                }
2✔
878
                return val, nil
13✔
879
        default:
1✔
880
                return nil, fmt.Errorf("error: unknown index value type [%v]", valueType)
1✔
881
        }
882
}
883

884
// IsAdvancedVisibilityWritingEnabled returns true if we should write to advanced visibility
885
func IsAdvancedVisibilityWritingEnabled(advancedVisibilityWritingMode string, isAdvancedVisConfigExist bool) bool {
57✔
886
        return advancedVisibilityWritingMode != AdvancedVisibilityWritingModeOff && isAdvancedVisConfigExist
57✔
887
}
57✔
888

889
// IsAdvancedVisibilityReadingEnabled returns true if we should read from advanced visibility
890
func IsAdvancedVisibilityReadingEnabled(isAdvancedVisReadEnabled, isAdvancedVisConfigExist bool) bool {
291✔
891
        return isAdvancedVisReadEnabled && isAdvancedVisConfigExist
291✔
892
}
291✔
893

894
// ConvertIntMapToDynamicConfigMapProperty converts a map whose key value type are both int to
895
// a map value that is compatible with dynamic config's map property
896
func ConvertIntMapToDynamicConfigMapProperty(
897
        intMap map[int]int,
898
) map[string]interface{} {
12✔
899
        dcValue := make(map[string]interface{})
12✔
900
        for key, value := range intMap {
36✔
901
                dcValue[strconv.Itoa(key)] = value
24✔
902
        }
24✔
903
        return dcValue
12✔
904
}
905

906
// ConvertDynamicConfigMapPropertyToIntMap convert a map property from dynamic config to a map
907
// whose type for both key and value are int
908
func ConvertDynamicConfigMapPropertyToIntMap(dcValue map[string]interface{}) (map[int]int, error) {
359✔
909
        intMap := make(map[int]int)
359✔
910
        for key, value := range dcValue {
1,431✔
911
                intKey, err := strconv.Atoi(strings.TrimSpace(key))
1,072✔
912
                if err != nil {
1,072✔
913
                        return nil, fmt.Errorf("failed to convert key %v, error: %v", key, err)
×
914
                }
×
915

916
                var intValue int
1,072✔
917
                switch value := value.(type) {
1,072✔
918
                case float64:
1✔
919
                        intValue = int(value)
1✔
920
                case int:
1,069✔
921
                        intValue = value
1,069✔
922
                case int32:
1✔
923
                        intValue = int(value)
1✔
924
                case int64:
1✔
925
                        intValue = int(value)
1✔
926
                default:
×
927
                        return nil, fmt.Errorf("unknown value %v with type %T", value, value)
×
928
                }
929
                intMap[intKey] = intValue
1,072✔
930
        }
931
        return intMap, nil
359✔
932
}
933

934
// IsStickyTaskConditionError is error from matching engine
935
func IsStickyTaskConditionError(err error) bool {
×
936
        if e, ok := err.(*types.InternalServiceError); ok {
×
937
                return e.GetMessage() == StickyTaskConditionFailedErrorMsg
×
938
        }
×
939
        return false
×
940
}
941

942
// DurationToDays converts time.Duration to number of 24 hour days
943
func DurationToDays(d time.Duration) int32 {
3,967✔
944
        return int32(d / (24 * time.Hour))
3,967✔
945
}
3,967✔
946

947
// DurationToSeconds converts time.Duration to number of seconds
948
func DurationToSeconds(d time.Duration) int64 {
52,054✔
949
        return int64(d / time.Second)
52,054✔
950
}
52,054✔
951

952
// DaysToDuration converts number of 24 hour days to time.Duration
953
func DaysToDuration(d int32) time.Duration {
4,082✔
954
        return time.Duration(d) * (24 * time.Hour)
4,082✔
955
}
4,082✔
956

957
// SecondsToDuration converts number of seconds to time.Duration
958
func SecondsToDuration(d int64) time.Duration {
77,079✔
959
        return time.Duration(d) * time.Second
77,079✔
960
}
77,079✔
961

962
// SleepWithMinDuration sleeps for the minimum of desired and available duration
963
// returns the remaining available time duration
964
func SleepWithMinDuration(desired time.Duration, available time.Duration) time.Duration {
48✔
965
        d := MinDuration(desired, available)
48✔
966
        if d > 0 {
48✔
967
                time.Sleep(d)
×
968
        }
×
969
        return available - d
48✔
970
}
971

972
// ConvertErrToGetTaskFailedCause converts error to GetTaskFailedCause
973
func ConvertErrToGetTaskFailedCause(err error) types.GetTaskFailedCause {
4✔
974
        if IsContextTimeoutError(err) {
5✔
975
                return types.GetTaskFailedCauseTimeout
1✔
976
        }
1✔
977
        if IsServiceBusyError(err) {
4✔
978
                return types.GetTaskFailedCauseServiceBusy
1✔
979
        }
1✔
980
        if _, ok := err.(*types.ShardOwnershipLostError); ok {
3✔
981
                return types.GetTaskFailedCauseShardOwnershipLost
1✔
982
        }
1✔
983
        return types.GetTaskFailedCauseUncategorized
1✔
984
}
985

986
// ConvertGetTaskFailedCauseToErr converts GetTaskFailedCause to error
987
func ConvertGetTaskFailedCauseToErr(failedCause types.GetTaskFailedCause) error {
4✔
988
        switch failedCause {
4✔
989
        case types.GetTaskFailedCauseServiceBusy:
1✔
990
                return &types.ServiceBusyError{}
1✔
991
        case types.GetTaskFailedCauseTimeout:
1✔
992
                return context.DeadlineExceeded
1✔
993
        case types.GetTaskFailedCauseShardOwnershipLost:
1✔
994
                return &types.ShardOwnershipLostError{}
1✔
995
        default:
1✔
996
                return &types.InternalServiceError{Message: "uncategorized error"}
1✔
997
        }
998
}
999

1000
// GetTaskPriority returns priority given a task's priority class and subclass
1001
func GetTaskPriority(
1002
        class int,
1003
        subClass int,
1004
) int {
21✔
1005
        return class | subClass
21✔
1006
}
21✔
1007

1008
// IntersectionStringSlice get the intersection of 2 string slices
1009
func IntersectionStringSlice(a, b []string) []string {
3✔
1010
        var result []string
3✔
1011
        m := make(map[string]struct{})
3✔
1012
        for _, item := range a {
12✔
1013
                m[item] = struct{}{}
9✔
1014
        }
9✔
1015
        for _, item := range b {
12✔
1016
                if _, ok := m[item]; ok {
14✔
1017
                        result = append(result, item)
5✔
1018
                }
5✔
1019
        }
1020
        return result
3✔
1021
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc