• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fc45-1fa7-4f9b-9c10-2aef2ebb54e0

08 May 2023 05:10PM UTC coverage: 57.153% (-0.07%) from 57.225%
0187fc45-1fa7-4f9b-9c10-2aef2ebb54e0

push

buildkite

GitHub
Update persistence layer to adopt idl update for isolation (#5254)

204 of 204 new or added lines in 15 files covered. (100.0%)

85781 of 150089 relevant lines covered (57.15%)

2419.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.15
/service/matching/taskListManager.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
4

5
// Permission is hereby granted, free of charge, to any person obtaining a copy
6
// of this software and associated documentation files (the "Software"), to deal
7
// in the Software without restriction, including without limitation the rights
8
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
// copies of the Software, and to permit persons to whom the Software is
10
// furnished to do so, subject to the following conditions:
11
//
12
// The above copyright notice and this permission notice shall be included in all
13
// copies or substantial portions of the Software.
14
//
15
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
// SOFTWARE.
22

23
package matching
24

25
import (
26
        "bytes"
27
        "context"
28
        "errors"
29
        "fmt"
30
        "sync"
31
        "sync/atomic"
32
        "time"
33

34
        "github.com/uber/cadence/common"
35
        "github.com/uber/cadence/common/backoff"
36
        "github.com/uber/cadence/common/cache"
37
        "github.com/uber/cadence/common/clock"
38
        "github.com/uber/cadence/common/cluster"
39
        "github.com/uber/cadence/common/log"
40
        "github.com/uber/cadence/common/log/tag"
41
        "github.com/uber/cadence/common/messaging"
42
        "github.com/uber/cadence/common/metrics"
43
        "github.com/uber/cadence/common/persistence"
44
        "github.com/uber/cadence/common/types"
45
)
46

47
const (
48
        // Time budget for empty task to propagate through the function stack and be returned to
49
        // pollForActivityTask or pollForDecisionTask handler.
50
        returnEmptyTaskTimeBudget time.Duration = time.Second
51
)
52

53
var (
54
        taskListActivityTypeTag = metrics.TaskListTypeTag("activity")
55
        taskListDecisionTypeTag = metrics.TaskListTypeTag("decision")
56
)
57

58
type (
59
        addTaskParams struct {
60
                execution                *types.WorkflowExecution
61
                taskInfo                 *persistence.TaskInfo
62
                source                   types.TaskSource
63
                forwardedFrom            string
64
                activityTaskDispatchInfo *types.ActivityTaskDispatchInfo
65
        }
66

67
        taskListManager interface {
68
                Start() error
69
                Stop()
70
                // AddTask adds a task to the task list. This method will first attempt a synchronous
71
                // match with a poller. When that fails, task will be written to database and later
72
                // asynchronously matched with a poller
73
                AddTask(ctx context.Context, params addTaskParams) (syncMatch bool, err error)
74
                // GetTask blocks waiting for a task Returns error when context deadline is exceeded
75
                // maxDispatchPerSecond is the max rate at which tasks are allowed to be dispatched
76
                // from this task list to pollers
77
                GetTask(ctx context.Context, maxDispatchPerSecond *float64) (*InternalTask, error)
78
                // DispatchTask dispatches a task to a poller. When there are no pollers to pick
79
                // up the task, this method will return error. Task will not be persisted to db
80
                DispatchTask(ctx context.Context, task *InternalTask) error
81
                // DispatchQueryTask will dispatch query to local or remote poller. If forwarded then result or error is returned,
82
                // if dispatched to local poller then nil and nil is returned.
83
                DispatchQueryTask(ctx context.Context, taskID string, request *types.MatchingQueryWorkflowRequest) (*types.QueryWorkflowResponse, error)
84
                CancelPoller(pollerID string)
85
                GetAllPollerInfo() []*types.PollerInfo
86
                HasPollerAfter(accessTime time.Time) bool
87
                // DescribeTaskList returns information about the target tasklist
88
                DescribeTaskList(includeTaskListStatus bool) *types.DescribeTaskListResponse
89
                String() string
90
                GetTaskListKind() types.TaskListKind
91
                TaskListID() *taskListID
92
        }
93

94
        // Single task list in memory state
95
        taskListManagerImpl struct {
96
                taskListID      *taskListID
97
                taskListKind    types.TaskListKind // sticky taskList has different process in persistence
98
                config          *taskListConfig
99
                db              *taskListDB
100
                taskWriter      *taskWriter
101
                taskReader      *taskReader // reads tasks from db and async matches it with poller
102
                liveness        *liveness
103
                taskGC          *taskGC
104
                taskAckManager  messaging.AckManager // tracks ackLevel for delivered messages
105
                matcher         *TaskMatcher         // for matching a task producer with a poller
106
                clusterMetadata cluster.Metadata
107
                domainCache     cache.DomainCache
108
                logger          log.Logger
109
                scope           metrics.Scope
110
                domainName      string
111
                // pollerHistory stores poller which poll from this tasklist in last few minutes
112
                pollerHistory *pollerHistory
113
                // outstandingPollsMap is needed to keep track of all outstanding pollers for a
114
                // particular tasklist.  PollerID generated by frontend is used as the key and
115
                // CancelFunc is the value.  This is used to cancel the context to unblock any
116
                // outstanding poller when the frontend detects client connection is closed to
117
                // prevent tasks being dispatched to zombie pollers.
118
                outstandingPollsLock sync.Mutex
119
                outstandingPollsMap  map[string]context.CancelFunc
120
                startWG              sync.WaitGroup // ensures that background processes do not start until setup is ready
121
                stopped              int32
122
                closeCallback        func(taskListManager)
123
        }
124
)
125

126
const (
127
        // maxSyncMatchWaitTime is the max amount of time that we are willing to wait for a sync match to happen
128
        maxSyncMatchWaitTime = 200 * time.Millisecond
129
)
130

131
var _ taskListManager = (*taskListManagerImpl)(nil)
132

133
var errRemoteSyncMatchFailed = &types.RemoteSyncMatchedError{Message: "remote sync match failed"}
134

135
func newTaskListManager(
136
        e *matchingEngineImpl,
137
        taskList *taskListID,
138
        taskListKind *types.TaskListKind,
139
        config *Config,
140
) (taskListManager, error) {
1,358✔
141

1,358✔
142
        taskListConfig, err := newTaskListConfig(taskList, config, e.domainCache)
1,358✔
143
        if err != nil {
1,358✔
144
                return nil, err
×
145
        }
×
146

147
        if taskListKind == nil {
2,307✔
148
                normalTaskListKind := types.TaskListKindNormal
949✔
149
                taskListKind = &normalTaskListKind
949✔
150
        }
949✔
151
        domainName, err := e.domainCache.GetDomainName(taskList.domainID)
1,358✔
152
        if err != nil {
1,358✔
153
                return nil, err
×
154
        }
×
155
        scope := newPerTaskListScope(domainName, taskList.name, *taskListKind, e.metricsClient, metrics.MatchingTaskListMgrScope)
1,358✔
156
        db := newTaskListDB(e.taskManager, taskList.domainID, domainName, taskList.name, taskList.taskType, int(*taskListKind), e.logger)
1,358✔
157

1,358✔
158
        tlMgr := &taskListManagerImpl{
1,358✔
159
                domainCache:         e.domainCache,
1,358✔
160
                clusterMetadata:     e.clusterMetadata,
1,358✔
161
                taskListID:          taskList,
1,358✔
162
                taskListKind:        *taskListKind,
1,358✔
163
                logger:              e.logger.WithTags(tag.WorkflowTaskListName(taskList.name), tag.WorkflowTaskListType(taskList.taskType)),
1,358✔
164
                db:                  db,
1,358✔
165
                taskAckManager:      messaging.NewAckManager(e.logger),
1,358✔
166
                taskGC:              newTaskGC(db, taskListConfig),
1,358✔
167
                config:              taskListConfig,
1,358✔
168
                outstandingPollsMap: make(map[string]context.CancelFunc),
1,358✔
169
                domainName:          domainName,
1,358✔
170
                scope:               scope,
1,358✔
171
                closeCallback:       e.removeTaskListManager,
1,358✔
172
        }
1,358✔
173

1,358✔
174
        taskListTypeMetricScope := tlMgr.scope.Tagged(
1,358✔
175
                getTaskListTypeTag(taskList.taskType),
1,358✔
176
        )
1,358✔
177
        tlMgr.pollerHistory = newPollerHistory(func() {
34,232✔
178
                taskListTypeMetricScope.UpdateGauge(metrics.PollerPerTaskListCounter,
32,874✔
179
                        float64(len(tlMgr.pollerHistory.getPollerInfo(time.Time{}))))
32,874✔
180
        })
32,874✔
181
        tlMgr.liveness = newLiveness(clock.NewRealTimeSource(), taskListConfig.IdleTasklistCheckInterval(), tlMgr.Stop)
1,358✔
182
        tlMgr.taskWriter = newTaskWriter(tlMgr)
1,358✔
183
        tlMgr.taskReader = newTaskReader(tlMgr)
1,358✔
184
        var fwdr *Forwarder
1,358✔
185
        if tlMgr.isFowardingAllowed(taskList, *taskListKind) {
2,089✔
186
                fwdr = newForwarder(&taskListConfig.forwarderConfig, taskList, *taskListKind, e.matchingClient)
731✔
187
        }
731✔
188
        tlMgr.matcher = newTaskMatcher(taskListConfig, fwdr, tlMgr.scope)
1,358✔
189
        tlMgr.startWG.Add(1)
1,358✔
190
        return tlMgr, nil
1,358✔
191
}
192

193
// Starts reading pump for the given task list.
194
// The pump fills up taskBuffer from persistence.
195
func (c *taskListManagerImpl) Start() error {
1,346✔
196
        defer c.startWG.Done()
1,346✔
197

1,346✔
198
        c.liveness.Start()
1,346✔
199
        if err := c.taskWriter.Start(); err != nil {
1,346✔
200
                c.Stop()
×
201
                return err
×
202
        }
×
203
        c.taskReader.Start()
1,346✔
204

1,346✔
205
        return nil
1,346✔
206
}
207

208
// Stops pump that fills up taskBuffer from persistence.
209
func (c *taskListManagerImpl) Stop() {
2,661✔
210
        if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
3,986✔
211
                return
1,325✔
212
        }
1,325✔
213
        c.closeCallback(c)
1,336✔
214
        c.liveness.Stop()
1,336✔
215
        c.taskWriter.Stop()
1,336✔
216
        c.taskReader.Stop()
1,336✔
217
        c.logger.Info("Task list manager state changed", tag.LifeCycleStopped)
1,336✔
218
}
219

220
func (c *taskListManagerImpl) handleErr(err error) error {
40,549✔
221
        var e *persistence.ConditionFailedError
40,549✔
222
        if errors.As(err, &e) {
40,549✔
223
                // This indicates the task list may have moved to another host.
×
224
                c.scope.IncCounter(metrics.ConditionFailedErrorPerTaskListCounter)
×
225
                c.logger.Debug("Stopping task list due to persistence condition failure.", tag.Error(err))
×
226
                c.Stop()
×
227
                if c.taskListKind == types.TaskListKindSticky {
×
228
                        // TODO: we don't see this error in our logs, we might be able to remove this error
×
229
                        err = &types.InternalServiceError{Message: common.StickyTaskConditionFailedErrorMsg}
×
230
                }
×
231
        }
232
        return err
40,549✔
233
}
234

235
// AddTask adds a task to the task list. This method will first attempt a synchronous
236
// match with a poller. When there are no pollers or if rate limit is exceeded, task will
237
// be written to database and later asynchronously matched with a poller
238
func (c *taskListManagerImpl) AddTask(ctx context.Context, params addTaskParams) (bool, error) {
28,358✔
239
        c.startWG.Wait()
28,358✔
240
        if params.forwardedFrom == "" {
42,820✔
241
                // request sent by history service
14,462✔
242
                c.liveness.markAlive(time.Now())
14,462✔
243
        }
14,462✔
244
        var syncMatch bool
28,358✔
245
        _, err := c.executeWithRetry(func() (interface{}, error) {
56,716✔
246
                if err := ctx.Err(); err != nil {
28,359✔
247
                        return nil, err
1✔
248
                }
1✔
249

250
                domainEntry, err := c.domainCache.GetDomainByID(params.taskInfo.DomainID)
28,357✔
251
                if err != nil {
28,357✔
252
                        return nil, err
×
253
                }
×
254

255
                isForwarded := params.forwardedFrom != ""
28,357✔
256

28,357✔
257
                if _, err := domainEntry.IsActiveIn(c.clusterMetadata.GetCurrentClusterName()); err != nil {
28,357✔
258
                        // standby task, only persist when task is not forwarded from child partition
×
259
                        syncMatch = false
×
260
                        if isForwarded {
×
261
                                return &persistence.CreateTasksResponse{}, errRemoteSyncMatchFailed
×
262
                        }
×
263

264
                        r, err := c.taskWriter.appendTask(params.execution, params.taskInfo)
×
265
                        return r, err
×
266
                }
267

268
                // active task, try sync match first
269
                syncMatch, err = c.trySyncMatch(ctx, params)
28,357✔
270
                if syncMatch {
36,006✔
271
                        return &persistence.CreateTasksResponse{}, err
7,649✔
272
                }
7,649✔
273
                if params.activityTaskDispatchInfo != nil {
20,708✔
274
                        return false, errRemoteSyncMatchFailed
×
275
                }
×
276

277
                if isForwarded {
33,724✔
278
                        // forwarded from child partition - only do sync match
13,016✔
279
                        // child partition will persist the task when sync match fails
13,016✔
280
                        return &persistence.CreateTasksResponse{}, errRemoteSyncMatchFailed
13,016✔
281
                }
13,016✔
282

283
                return c.taskWriter.appendTask(params.execution, params.taskInfo)
7,692✔
284
        })
285

286
        if err != nil {
41,378✔
287
                c.logger.Error("Failed to add task",
13,020✔
288
                        tag.Error(err),
13,020✔
289
                        tag.WorkflowTaskListName(c.taskListID.name),
13,020✔
290
                        tag.WorkflowTaskListType(c.taskListID.taskType),
13,020✔
291
                )
13,020✔
292
        } else {
28,358✔
293
                c.taskReader.Signal()
15,338✔
294
        }
15,338✔
295

296
        return syncMatch, err
28,358✔
297
}
298

299
// DispatchTask dispatches a task to a poller. When there are no pollers to pick
300
// up the task or if rate limit is exceeded, this method will return error. Task
301
// *will not* be persisted to db
302
func (c *taskListManagerImpl) DispatchTask(ctx context.Context, task *InternalTask) error {
9,613✔
303
        return c.matcher.MustOffer(ctx, task)
9,613✔
304
}
9,613✔
305

306
// DispatchQueryTask will dispatch query to local or remote poller. If forwarded then result or error is returned,
307
// if dispatched to local poller then nil and nil is returned.
308
func (c *taskListManagerImpl) DispatchQueryTask(
309
        ctx context.Context,
310
        taskID string,
311
        request *types.MatchingQueryWorkflowRequest,
312
) (*types.QueryWorkflowResponse, error) {
45✔
313
        c.startWG.Wait()
45✔
314
        task := newInternalQueryTask(taskID, request)
45✔
315
        return c.matcher.OfferQuery(ctx, task)
45✔
316
}
45✔
317

318
// GetTask blocks waiting for a task.
319
// Returns error when context deadline is exceeded
320
// maxDispatchPerSecond is the max rate at which tasks are allowed
321
// to be dispatched from this task list to pollers
322
func (c *taskListManagerImpl) GetTask(
323
        ctx context.Context,
324
        maxDispatchPerSecond *float64,
325
) (*InternalTask, error) {
16,439✔
326
        c.liveness.markAlive(time.Now())
16,439✔
327
        task, err := c.getTask(ctx, maxDispatchPerSecond)
16,439✔
328
        if err != nil {
16,794✔
329
                return nil, err
355✔
330
        }
355✔
331
        task.domainName = c.domainName
16,084✔
332
        task.backlogCountHint = c.taskAckManager.GetBacklogCount()
16,084✔
333
        return task, nil
16,084✔
334
}
335

336
func (c *taskListManagerImpl) getTask(ctx context.Context, maxDispatchPerSecond *float64) (*InternalTask, error) {
16,439✔
337
        // We need to set a shorter timeout than the original ctx; otherwise, by the time ctx deadline is
16,439✔
338
        // reached, instead of emptyTask, context timeout error is returned to the frontend by the rpc stack,
16,439✔
339
        // which counts against our SLO. By shortening the timeout by a very small amount, the emptyTask can be
16,439✔
340
        // returned to the handler before a context timeout error is generated.
16,439✔
341
        childCtx, cancel := c.newChildContext(ctx, c.config.LongPollExpirationInterval(), returnEmptyTaskTimeBudget)
16,439✔
342
        defer cancel()
16,439✔
343

16,439✔
344
        pollerID, ok := ctx.Value(pollerIDKey).(string)
16,439✔
345
        if ok && pollerID != "" {
19,155✔
346
                // Found pollerID on context, add it to the map to allow it to be canceled in
2,716✔
347
                // response to CancelPoller call
2,716✔
348
                c.outstandingPollsLock.Lock()
2,716✔
349
                c.outstandingPollsMap[pollerID] = cancel
2,716✔
350
                c.outstandingPollsLock.Unlock()
2,716✔
351
                defer func() {
5,432✔
352
                        c.outstandingPollsLock.Lock()
2,716✔
353
                        delete(c.outstandingPollsMap, pollerID)
2,716✔
354
                        c.outstandingPollsLock.Unlock()
2,716✔
355
                }()
2,716✔
356
        }
357

358
        identity, ok := ctx.Value(identityKey).(string)
16,439✔
359
        if ok && identity != "" {
32,875✔
360
                c.pollerHistory.updatePollerInfo(pollerIdentity(identity), maxDispatchPerSecond)
16,436✔
361
                defer func() {
32,872✔
362
                        // to update timestamp of this poller when long poll ends
16,436✔
363
                        c.pollerHistory.updatePollerInfo(pollerIdentity(identity), maxDispatchPerSecond)
16,436✔
364
                }()
16,436✔
365
        }
366

367
        domainEntry, err := c.domainCache.GetDomainByID(c.taskListID.domainID)
16,439✔
368
        if err != nil {
16,439✔
369
                return nil, err
×
370
        }
×
371

372
        // the desired global rate limit for the task list comes from the
373
        // poller, which lives inside the client side worker. There is
374
        // one rateLimiter for this entire task list and as we get polls,
375
        // we update the ratelimiter rps if it has changed from the last
376
        // value. Last poller wins if different pollers provide different values
377
        c.matcher.UpdateRatelimit(maxDispatchPerSecond)
16,439✔
378

16,439✔
379
        if _, err := domainEntry.IsActiveIn(c.clusterMetadata.GetCurrentClusterName()); err != nil {
16,439✔
380
                return c.matcher.PollForQuery(childCtx)
×
381
        }
×
382

383
        return c.matcher.Poll(childCtx)
16,439✔
384
}
385

386
// GetAllPollerInfo returns all pollers that polled from this tasklist in last few minutes
387
func (c *taskListManagerImpl) GetAllPollerInfo() []*types.PollerInfo {
64✔
388
        return c.pollerHistory.getPollerInfo(time.Time{})
64✔
389
}
64✔
390

391
// HasPollerAfter checks if there is any poller after a timestamp
392
func (c *taskListManagerImpl) HasPollerAfter(accessTime time.Time) bool {
68✔
393
        inflightPollerCount := 0
68✔
394
        c.outstandingPollsLock.Lock()
68✔
395
        inflightPollerCount = len(c.outstandingPollsMap)
68✔
396
        c.outstandingPollsLock.Unlock()
68✔
397
        if inflightPollerCount > 0 {
98✔
398
                return true
30✔
399
        }
30✔
400
        recentPollers := c.pollerHistory.getPollerInfo(accessTime)
38✔
401
        return len(recentPollers) > 0
38✔
402
}
403

404
func (c *taskListManagerImpl) CancelPoller(pollerID string) {
132✔
405
        c.outstandingPollsLock.Lock()
132✔
406
        cancel, ok := c.outstandingPollsMap[pollerID]
132✔
407
        c.outstandingPollsLock.Unlock()
132✔
408

132✔
409
        if ok && cancel != nil {
132✔
410
                cancel()
×
411
                c.logger.Info("canceled outstanding poller", tag.WorkflowDomainName(c.domainName))
×
412
        }
×
413
}
414

415
// DescribeTaskList returns information about the target tasklist, right now this API returns the
416
// pollers which polled this tasklist in last few minutes and status of tasklist's ackManager
417
// (readLevel, ackLevel, backlogCountHint and taskIDBlock).
418
func (c *taskListManagerImpl) DescribeTaskList(includeTaskListStatus bool) *types.DescribeTaskListResponse {
64✔
419
        response := &types.DescribeTaskListResponse{Pollers: c.GetAllPollerInfo()}
64✔
420
        if !includeTaskListStatus {
123✔
421
                return response
59✔
422
        }
59✔
423

424
        taskIDBlock := rangeIDToTaskIDBlock(c.db.RangeID(), c.config.RangeSize)
5✔
425
        response.TaskListStatus = &types.TaskListStatus{
5✔
426
                ReadLevel:        c.taskAckManager.GetReadLevel(),
5✔
427
                AckLevel:         c.taskAckManager.GetAckLevel(),
5✔
428
                BacklogCountHint: c.taskAckManager.GetBacklogCount(),
5✔
429
                RatePerSecond:    c.matcher.Rate(),
5✔
430
                TaskIDBlock: &types.TaskIDBlock{
5✔
431
                        StartID: taskIDBlock.start,
5✔
432
                        EndID:   taskIDBlock.end,
5✔
433
                },
5✔
434
        }
5✔
435

5✔
436
        return response
5✔
437
}
438

439
func (c *taskListManagerImpl) String() string {
×
440
        buf := new(bytes.Buffer)
×
441
        if c.taskListID.taskType == persistence.TaskListTypeActivity {
×
442
                buf.WriteString("Activity")
×
443
        } else {
×
444
                buf.WriteString("Decision")
×
445
        }
×
446
        rangeID := c.db.RangeID()
×
447
        fmt.Fprintf(buf, " task list %v\n", c.taskListID.name)
×
448
        fmt.Fprintf(buf, "RangeID=%v\n", rangeID)
×
449
        fmt.Fprintf(buf, "TaskIDBlock=%+v\n", rangeIDToTaskIDBlock(rangeID, c.config.RangeSize))
×
450
        fmt.Fprintf(buf, "AckLevel=%v\n", c.taskAckManager.GetAckLevel())
×
451
        fmt.Fprintf(buf, "MaxReadLevel=%v\n", c.taskAckManager.GetReadLevel())
×
452

×
453
        return buf.String()
×
454
}
455

456
func (c *taskListManagerImpl) GetTaskListKind() types.TaskListKind {
×
457
        return c.taskListKind
×
458
}
×
459

460
func (c *taskListManagerImpl) TaskListID() *taskListID {
1,338✔
461
        return c.taskListID
1,338✔
462
}
1,338✔
463

464
// Retry operation on transient error. On rangeID update by another process calls c.Stop().
465
func (c *taskListManagerImpl) executeWithRetry(
466
        operation func() (interface{}, error),
467
) (result interface{}, err error) {
28,358✔
468

28,358✔
469
        op := func() error {
56,716✔
470
                result, err = operation()
28,358✔
471
                return err
28,358✔
472
        }
28,358✔
473

474
        throttleRetry := backoff.NewThrottleRetry(
28,358✔
475
                backoff.WithRetryPolicy(persistenceOperationRetryPolicy),
28,358✔
476
                backoff.WithRetryableError(persistence.IsTransientError),
28,358✔
477
        )
28,358✔
478
        err = c.handleErr(throttleRetry.Do(context.Background(), op))
28,358✔
479
        return
28,358✔
480
}
481

482
func (c *taskListManagerImpl) trySyncMatch(ctx context.Context, params addTaskParams) (bool, error) {
28,357✔
483
        task := newInternalTask(params.taskInfo, nil, params.source, params.forwardedFrom, true, params.activityTaskDispatchInfo)
28,357✔
484
        childCtx := ctx
28,357✔
485
        cancel := func() {}
42,253✔
486
        waitTime := maxSyncMatchWaitTime
28,357✔
487
        if params.activityTaskDispatchInfo != nil {
28,357✔
488
                waitTime = c.config.ActivityTaskSyncMatchWaitTime(params.activityTaskDispatchInfo.WorkflowDomain)
×
489
        }
×
490
        if !task.isForwarded() {
42,818✔
491
                // when task is forwarded from another matching host, we trust the context as is
14,461✔
492
                // otherwise, we override to limit the amount of time we can block on sync match
14,461✔
493
                childCtx, cancel = c.newChildContext(ctx, waitTime, time.Second)
14,461✔
494
        }
14,461✔
495
        var matched bool
28,357✔
496
        var err error
28,357✔
497
        if params.activityTaskDispatchInfo != nil {
28,357✔
498
                matched, err = c.matcher.offerOrTimeout(childCtx, task)
×
499
        } else {
28,357✔
500
                matched, err = c.matcher.Offer(childCtx, task)
28,357✔
501
        }
28,357✔
502
        cancel()
28,357✔
503
        return matched, err
28,357✔
504
}
505

506
// newChildContext creates a child context with desired timeout.
507
// if tailroom is non-zero, then child context timeout will be
508
// the minOf(parentCtx.Deadline()-tailroom, timeout). Use this
509
// method to create child context when childContext cannot use
510
// all of parent's deadline but instead there is a need to leave
511
// some time for parent to do some post-work
512
func (c *taskListManagerImpl) newChildContext(
513
        parent context.Context,
514
        timeout time.Duration,
515
        tailroom time.Duration,
516
) (context.Context, context.CancelFunc) {
30,900✔
517
        select {
30,900✔
518
        case <-parent.Done():
2✔
519
                return parent, func() {}
4✔
520
        default:
30,898✔
521
        }
522
        deadline, ok := parent.Deadline()
30,898✔
523
        if !ok {
57,346✔
524
                return context.WithTimeout(parent, timeout)
26,448✔
525
        }
26,448✔
526
        remaining := time.Until(deadline) - tailroom
4,450✔
527
        if remaining < timeout {
5,484✔
528
                timeout = time.Duration(common.MaxInt64(0, int64(remaining)))
1,034✔
529
        }
1,034✔
530
        return context.WithTimeout(parent, timeout)
4,450✔
531
}
532

533
func (c *taskListManagerImpl) isFowardingAllowed(taskList *taskListID, kind types.TaskListKind) bool {
1,358✔
534
        return !taskList.IsRoot() && kind != types.TaskListKindSticky
1,358✔
535
}
1,358✔
536

537
func getTaskListTypeTag(taskListType int) metrics.Tag {
18,218✔
538
        switch taskListType {
18,218✔
539
        case persistence.TaskListTypeActivity:
6,387✔
540
                return taskListActivityTypeTag
6,387✔
541
        case persistence.TaskListTypeDecision:
11,831✔
542
                return taskListDecisionTypeTag
11,831✔
543
        default:
×
544
                return metrics.TaskListTypeTag("")
×
545
        }
546
}
547

548
func createServiceBusyError(msg string) *types.ServiceBusyError {
×
549
        return &types.ServiceBusyError{Message: msg}
×
550
}
×
551

552
func rangeIDToTaskIDBlock(rangeID, rangeSize int64) taskIDBlock {
2,415✔
553
        return taskIDBlock{
2,415✔
554
                start: (rangeID-1)*rangeSize + 1,
2,415✔
555
                end:   rangeID * rangeSize,
2,415✔
556
        }
2,415✔
557
}
2,415✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc