• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fdd2-f4a4-4c9a-97b4-6604937bf7be

09 May 2023 12:23AM UTC coverage: 57.253% (-0.002%) from 57.255%
0187fdd2-f4a4-4c9a-97b4-6604937bf7be

Pull #5252

buildkite

David Porter
Merge branch 'master' into feature/zonal-partitioning
Pull Request #5252: Feature/zonal partitioning

1460 of 1460 new or added lines in 51 files covered. (100.0%)

86909 of 151799 relevant lines covered (57.25%)

2482.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.55
/service/matching/taskListManager.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
4

5
// Permission is hereby granted, free of charge, to any person obtaining a copy
6
// of this software and associated documentation files (the "Software"), to deal
7
// in the Software without restriction, including without limitation the rights
8
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
// copies of the Software, and to permit persons to whom the Software is
10
// furnished to do so, subject to the following conditions:
11
//
12
// The above copyright notice and this permission notice shall be included in all
13
// copies or substantial portions of the Software.
14
//
15
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
// SOFTWARE.
22

23
package matching
24

25
import (
26
        "bytes"
27
        "context"
28
        "errors"
29
        "fmt"
30
        "sync"
31
        "sync/atomic"
32
        "time"
33

34
        "github.com/uber/cadence/common"
35
        "github.com/uber/cadence/common/backoff"
36
        "github.com/uber/cadence/common/cache"
37
        "github.com/uber/cadence/common/clock"
38
        "github.com/uber/cadence/common/cluster"
39
        "github.com/uber/cadence/common/log"
40
        "github.com/uber/cadence/common/log/tag"
41
        "github.com/uber/cadence/common/messaging"
42
        "github.com/uber/cadence/common/metrics"
43
        "github.com/uber/cadence/common/partition"
44
        "github.com/uber/cadence/common/persistence"
45
        "github.com/uber/cadence/common/types"
46
)
47

48
const (
49
        // Time budget for empty task to propagate through the function stack and be returned to
50
        // pollForActivityTask or pollForDecisionTask handler.
51
        returnEmptyTaskTimeBudget time.Duration = time.Second
52
)
53

54
var (
55
        taskListActivityTypeTag = metrics.TaskListTypeTag("activity")
56
        taskListDecisionTypeTag = metrics.TaskListTypeTag("decision")
57
)
58

59
type (
60
        addTaskParams struct {
61
                execution                *types.WorkflowExecution
62
                taskInfo                 *persistence.TaskInfo
63
                source                   types.TaskSource
64
                forwardedFrom            string
65
                activityTaskDispatchInfo *types.ActivityTaskDispatchInfo
66
        }
67

68
        taskListManager interface {
69
                Start() error
70
                Stop()
71
                // AddTask adds a task to the task list. This method will first attempt a synchronous
72
                // match with a poller. When that fails, task will be written to database and later
73
                // asynchronously matched with a poller
74
                AddTask(ctx context.Context, params addTaskParams) (syncMatch bool, err error)
75
                // GetTask blocks waiting for a task Returns error when context deadline is exceeded
76
                // maxDispatchPerSecond is the max rate at which tasks are allowed to be dispatched
77
                // from this task list to pollers
78
                GetTask(ctx context.Context, maxDispatchPerSecond *float64) (*InternalTask, error)
79
                // DispatchTask dispatches a task to a poller. When there are no pollers to pick
80
                // up the task, this method will return error. Task will not be persisted to db
81
                DispatchTask(ctx context.Context, task *InternalTask) error
82
                // DispatchQueryTask will dispatch query to local or remote poller. If forwarded then result or error is returned,
83
                // if dispatched to local poller then nil and nil is returned.
84
                DispatchQueryTask(ctx context.Context, taskID string, request *types.MatchingQueryWorkflowRequest) (*types.QueryWorkflowResponse, error)
85
                CancelPoller(pollerID string)
86
                GetAllPollerInfo() []*types.PollerInfo
87
                HasPollerAfter(accessTime time.Time) bool
88
                // DescribeTaskList returns information about the target tasklist
89
                DescribeTaskList(includeTaskListStatus bool) *types.DescribeTaskListResponse
90
                String() string
91
                GetTaskListKind() types.TaskListKind
92
                TaskListID() *taskListID
93
        }
94

95
        // Single task list in memory state
96
        taskListManagerImpl struct {
97
                createTime      time.Time
98
                enableIsolation bool
99
                taskListID      *taskListID
100
                taskListKind    types.TaskListKind // sticky taskList has different process in persistence
101
                config          *taskListConfig
102
                db              *taskListDB
103
                taskWriter      *taskWriter
104
                taskReader      *taskReader // reads tasks from db and async matches it with poller
105
                liveness        *liveness
106
                taskGC          *taskGC
107
                taskAckManager  messaging.AckManager // tracks ackLevel for delivered messages
108
                matcher         *TaskMatcher         // for matching a task producer with a poller
109
                clusterMetadata cluster.Metadata
110
                domainCache     cache.DomainCache
111
                partitioner     partition.Partitioner
112
                logger          log.Logger
113
                scope           metrics.Scope
114
                domainName      string
115
                // pollerHistory stores poller which poll from this tasklist in last few minutes
116
                pollerHistory *pollerHistory
117
                // outstandingPollsMap is needed to keep track of all outstanding pollers for a
118
                // particular tasklist.  PollerID generated by frontend is used as the key and
119
                // CancelFunc is the value.  This is used to cancel the context to unblock any
120
                // outstanding poller when the frontend detects client connection is closed to
121
                // prevent tasks being dispatched to zombie pollers.
122
                outstandingPollsLock sync.Mutex
123
                outstandingPollsMap  map[string]context.CancelFunc
124
                startWG              sync.WaitGroup // ensures that background processes do not start until setup is ready
125
                stopped              int32
126
                closeCallback        func(taskListManager)
127
        }
128
)
129

130
const (
131
        // maxSyncMatchWaitTime is the max amount of time that we are willing to wait for a sync match to happen
132
        maxSyncMatchWaitTime = 200 * time.Millisecond
133
)
134

135
var _ taskListManager = (*taskListManagerImpl)(nil)
136

137
var errRemoteSyncMatchFailed = &types.RemoteSyncMatchedError{Message: "remote sync match failed"}
138

139
func newTaskListManager(
140
        e *matchingEngineImpl,
141
        taskList *taskListID,
142
        taskListKind *types.TaskListKind,
143
        config *Config,
144
        createTime time.Time,
145
) (taskListManager, error) {
1,399✔
146

1,399✔
147
        taskListConfig, err := newTaskListConfig(taskList, config, e.domainCache)
1,399✔
148
        if err != nil {
1,399✔
149
                return nil, err
×
150
        }
×
151

152
        if taskListKind == nil {
2,354✔
153
                normalTaskListKind := types.TaskListKindNormal
955✔
154
                taskListKind = &normalTaskListKind
955✔
155
        }
955✔
156
        domainName, err := e.domainCache.GetDomainName(taskList.domainID)
1,399✔
157
        if err != nil {
1,399✔
158
                return nil, err
×
159
        }
×
160
        scope := newPerTaskListScope(domainName, taskList.name, *taskListKind, e.metricsClient, metrics.MatchingTaskListMgrScope)
1,399✔
161
        db := newTaskListDB(e.taskManager, taskList.domainID, domainName, taskList.name, taskList.taskType, int(*taskListKind), e.logger)
1,399✔
162

1,399✔
163
        tlMgr := &taskListManagerImpl{
1,399✔
164
                createTime:          createTime,
1,399✔
165
                enableIsolation:     taskListConfig.EnableTasklistIsolation(),
1,399✔
166
                domainCache:         e.domainCache,
1,399✔
167
                clusterMetadata:     e.clusterMetadata,
1,399✔
168
                partitioner:         e.partitioner,
1,399✔
169
                taskListID:          taskList,
1,399✔
170
                taskListKind:        *taskListKind,
1,399✔
171
                logger:              e.logger.WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(taskList.name), tag.WorkflowTaskListType(taskList.taskType)),
1,399✔
172
                db:                  db,
1,399✔
173
                taskAckManager:      messaging.NewAckManager(e.logger),
1,399✔
174
                taskGC:              newTaskGC(db, taskListConfig),
1,399✔
175
                config:              taskListConfig,
1,399✔
176
                outstandingPollsMap: make(map[string]context.CancelFunc),
1,399✔
177
                domainName:          domainName,
1,399✔
178
                scope:               scope,
1,399✔
179
                closeCallback:       e.removeTaskListManager,
1,399✔
180
        }
1,399✔
181

1,399✔
182
        taskListTypeMetricScope := tlMgr.scope.Tagged(
1,399✔
183
                getTaskListTypeTag(taskList.taskType),
1,399✔
184
        )
1,399✔
185
        tlMgr.pollerHistory = newPollerHistory(func() {
54,467✔
186
                taskListTypeMetricScope.UpdateGauge(metrics.PollerPerTaskListCounter,
53,068✔
187
                        float64(len(tlMgr.pollerHistory.getPollerInfo(time.Time{}))))
53,068✔
188
        })
53,068✔
189
        tlMgr.liveness = newLiveness(clock.NewRealTimeSource(), taskListConfig.IdleTasklistCheckInterval(), tlMgr.Stop)
1,399✔
190
        tlMgr.taskWriter = newTaskWriter(tlMgr)
1,399✔
191
        tlMgr.taskReader = newTaskReader(tlMgr)
1,399✔
192
        var fwdr *Forwarder
1,399✔
193
        if tlMgr.isFowardingAllowed(taskList, *taskListKind) {
2,139✔
194
                fwdr = newForwarder(&taskListConfig.forwarderConfig, taskList, *taskListKind, e.matchingClient)
740✔
195
        }
740✔
196
        var isolationGroups []string
1,399✔
197
        if tlMgr.isIsolationMatcherEnabled() {
1,410✔
198
                isolationGroups = config.AllIsolationGroups
11✔
199
        }
11✔
200
        tlMgr.matcher = newTaskMatcher(taskListConfig, fwdr, tlMgr.scope, isolationGroups)
1,399✔
201
        tlMgr.startWG.Add(1)
1,399✔
202
        return tlMgr, nil
1,399✔
203
}
204

205
// Starts reading pump for the given task list.
206
// The pump fills up taskBuffer from persistence.
207
func (c *taskListManagerImpl) Start() error {
1,387✔
208
        defer c.startWG.Done()
1,387✔
209

1,387✔
210
        c.liveness.Start()
1,387✔
211
        if err := c.taskWriter.Start(); err != nil {
1,387✔
212
                c.Stop()
×
213
                return err
×
214
        }
×
215
        c.taskReader.Start()
1,387✔
216

1,387✔
217
        return nil
1,387✔
218
}
219

220
// Stops pump that fills up taskBuffer from persistence.
221
func (c *taskListManagerImpl) Stop() {
2,705✔
222
        if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
4,045✔
223
                return
1,340✔
224
        }
1,340✔
225
        c.closeCallback(c)
1,365✔
226
        c.liveness.Stop()
1,365✔
227
        c.taskWriter.Stop()
1,365✔
228
        c.taskReader.Stop()
1,365✔
229
        c.logger.Info("Task list manager state changed", tag.LifeCycleStopped)
1,365✔
230
}
231

232
func (c *taskListManagerImpl) handleErr(err error) error {
56,781✔
233
        var e *persistence.ConditionFailedError
56,781✔
234
        if errors.As(err, &e) {
56,781✔
235
                // This indicates the task list may have moved to another host.
×
236
                c.scope.IncCounter(metrics.ConditionFailedErrorPerTaskListCounter)
×
237
                c.logger.Debug("Stopping task list due to persistence condition failure.", tag.Error(err))
×
238
                c.Stop()
×
239
                if c.taskListKind == types.TaskListKindSticky {
×
240
                        // TODO: we don't see this error in our logs, we might be able to remove this error
×
241
                        err = &types.InternalServiceError{Message: common.StickyTaskConditionFailedErrorMsg}
×
242
                }
×
243
        }
244
        return err
56,781✔
245
}
246

247
// AddTask adds a task to the task list. This method will first attempt a synchronous
248
// match with a poller. When there are no pollers or if rate limit is exceeded, task will
249
// be written to database and later asynchronously matched with a poller
250
func (c *taskListManagerImpl) AddTask(ctx context.Context, params addTaskParams) (bool, error) {
40,094✔
251
        c.startWG.Wait()
40,094✔
252
        if c.shouldReload() {
40,096✔
253
                c.Stop()
2✔
254
                return false, errShutdown
2✔
255
        }
2✔
256
        if params.forwardedFrom == "" {
64,588✔
257
                // request sent by history service
24,496✔
258
                c.liveness.markAlive(time.Now())
24,496✔
259
        }
24,496✔
260
        var syncMatch bool
40,092✔
261
        _, err := c.executeWithRetry(func() (interface{}, error) {
80,184✔
262
                if err := ctx.Err(); err != nil {
40,093✔
263
                        return nil, err
1✔
264
                }
1✔
265

266
                domainEntry, err := c.domainCache.GetDomainByID(params.taskInfo.DomainID)
40,091✔
267
                if err != nil {
40,091✔
268
                        return nil, err
×
269
                }
×
270

271
                isForwarded := params.forwardedFrom != ""
40,091✔
272

40,091✔
273
                if _, err := domainEntry.IsActiveIn(c.clusterMetadata.GetCurrentClusterName()); err != nil {
40,091✔
274
                        // standby task, only persist when task is not forwarded from child partition
×
275
                        syncMatch = false
×
276
                        if isForwarded {
×
277
                                return &persistence.CreateTasksResponse{}, errRemoteSyncMatchFailed
×
278
                        }
×
279

280
                        r, err := c.taskWriter.appendTask(params.execution, params.taskInfo)
×
281
                        return r, err
×
282
                }
283

284
                isolationGroup, err := c.getIsolationGroupForTask(ctx, params.taskInfo)
40,091✔
285
                if err != nil {
40,096✔
286
                        return false, err
5✔
287
                }
5✔
288
                // active task, try sync match first
289
                syncMatch, err = c.trySyncMatch(ctx, params, isolationGroup)
40,086✔
290
                if syncMatch {
53,096✔
291
                        return &persistence.CreateTasksResponse{}, err
13,010✔
292
                }
13,010✔
293
                if params.activityTaskDispatchInfo != nil {
27,076✔
294
                        return false, errRemoteSyncMatchFailed
×
295
                }
×
296

297
                if isForwarded {
41,786✔
298
                        // forwarded from child partition - only do sync match
14,710✔
299
                        // child partition will persist the task when sync match fails
14,710✔
300
                        return &persistence.CreateTasksResponse{}, errRemoteSyncMatchFailed
14,710✔
301
                }
14,710✔
302

303
                return c.taskWriter.appendTask(params.execution, params.taskInfo)
12,366✔
304
        })
305

306
        if err != nil {
54,811✔
307
                c.logger.Error("Failed to add task",
14,719✔
308
                        tag.Error(err),
14,719✔
309
                        tag.WorkflowTaskListName(c.taskListID.name),
14,719✔
310
                        tag.WorkflowTaskListType(c.taskListID.taskType),
14,719✔
311
                )
14,719✔
312
        } else {
40,092✔
313
                c.taskReader.Signal()
25,373✔
314
        }
25,373✔
315

316
        return syncMatch, err
40,092✔
317
}
318

319
// DispatchTask dispatches a task to a poller. When there are no pollers to pick
320
// up the task or if rate limit is exceeded, this method will return error. Task
321
// *will not* be persisted to db
322
func (c *taskListManagerImpl) DispatchTask(ctx context.Context, task *InternalTask) error {
14,939✔
323
        return c.matcher.MustOffer(ctx, task)
14,939✔
324
}
14,939✔
325

326
// DispatchQueryTask will dispatch query to local or remote poller. If forwarded then result or error is returned,
327
// if dispatched to local poller then nil and nil is returned.
328
func (c *taskListManagerImpl) DispatchQueryTask(
329
        ctx context.Context,
330
        taskID string,
331
        request *types.MatchingQueryWorkflowRequest,
332
) (*types.QueryWorkflowResponse, error) {
47✔
333
        c.startWG.Wait()
47✔
334
        task := newInternalQueryTask(taskID, request)
47✔
335
        return c.matcher.OfferQuery(ctx, task)
47✔
336
}
47✔
337

338
// GetTask blocks waiting for a task.
339
// Returns error when context deadline is exceeded
340
// maxDispatchPerSecond is the max rate at which tasks are allowed
341
// to be dispatched from this task list to pollers
342
func (c *taskListManagerImpl) GetTask(
343
        ctx context.Context,
344
        maxDispatchPerSecond *float64,
345
) (*InternalTask, error) {
26,538✔
346
        if c.shouldReload() {
26,540✔
347
                c.Stop()
2✔
348
                return nil, ErrNoTasks
2✔
349
        }
2✔
350
        c.liveness.markAlive(time.Now())
26,536✔
351
        task, err := c.getTask(ctx, maxDispatchPerSecond)
26,536✔
352
        if err != nil {
26,896✔
353
                return nil, err
360✔
354
        }
360✔
355
        task.domainName = c.domainName
26,176✔
356
        task.backlogCountHint = c.taskAckManager.GetBacklogCount()
26,176✔
357
        return task, nil
26,176✔
358
}
359

360
func (c *taskListManagerImpl) getTask(ctx context.Context, maxDispatchPerSecond *float64) (*InternalTask, error) {
26,536✔
361
        // We need to set a shorter timeout than the original ctx; otherwise, by the time ctx deadline is
26,536✔
362
        // reached, instead of emptyTask, context timeout error is returned to the frontend by the rpc stack,
26,536✔
363
        // which counts against our SLO. By shortening the timeout by a very small amount, the emptyTask can be
26,536✔
364
        // returned to the handler before a context timeout error is generated.
26,536✔
365
        childCtx, cancel := c.newChildContext(ctx, c.config.LongPollExpirationInterval(), returnEmptyTaskTimeBudget)
26,536✔
366
        defer cancel()
26,536✔
367

26,536✔
368
        isolationGroup, _ := ctx.Value(_isolationGroupKey).(string)
26,536✔
369
        pollerID, ok := ctx.Value(pollerIDKey).(string)
26,536✔
370
        if ok && pollerID != "" {
29,315✔
371
                // Found pollerID on context, add it to the map to allow it to be canceled in
2,779✔
372
                // response to CancelPoller call
2,779✔
373
                c.outstandingPollsLock.Lock()
2,779✔
374
                c.outstandingPollsMap[pollerID] = cancel
2,779✔
375
                c.outstandingPollsLock.Unlock()
2,779✔
376
                defer func() {
5,558✔
377
                        c.outstandingPollsLock.Lock()
2,779✔
378
                        delete(c.outstandingPollsMap, pollerID)
2,779✔
379
                        c.outstandingPollsLock.Unlock()
2,779✔
380
                }()
2,779✔
381
        }
382

383
        identity, ok := ctx.Value(identityKey).(string)
26,536✔
384
        if ok && identity != "" {
53,069✔
385
                c.pollerHistory.updatePollerInfo(pollerIdentity(identity), pollerInfo{ratePerSecond: maxDispatchPerSecond, isolationGroup: isolationGroup})
26,533✔
386
                defer func() {
53,066✔
387
                        // to update timestamp of this poller when long poll ends
26,533✔
388
                        c.pollerHistory.updatePollerInfo(pollerIdentity(identity), pollerInfo{ratePerSecond: maxDispatchPerSecond, isolationGroup: isolationGroup})
26,533✔
389
                }()
26,533✔
390
        }
391

392
        domainEntry, err := c.domainCache.GetDomainByID(c.taskListID.domainID)
26,536✔
393
        if err != nil {
26,536✔
394
                return nil, err
×
395
        }
×
396

397
        // the desired global rate limit for the task list comes from the
398
        // poller, which lives inside the client side worker. There is
399
        // one rateLimiter for this entire task list and as we get polls,
400
        // we update the ratelimiter rps if it has changed from the last
401
        // value. Last poller wins if different pollers provide different values
402
        c.matcher.UpdateRatelimit(maxDispatchPerSecond)
26,536✔
403

26,536✔
404
        if _, err := domainEntry.IsActiveIn(c.clusterMetadata.GetCurrentClusterName()); err != nil {
26,536✔
405
                return c.matcher.PollForQuery(childCtx)
×
406
        }
×
407

408
        if c.isIsolationMatcherEnabled() {
36,564✔
409
                return c.matcher.Poll(childCtx, isolationGroup)
10,028✔
410
        }
10,028✔
411
        return c.matcher.Poll(childCtx, "")
16,508✔
412
}
413

414
// GetAllPollerInfo returns all pollers that polled from this tasklist in last few minutes
415
func (c *taskListManagerImpl) GetAllPollerInfo() []*types.PollerInfo {
66✔
416
        return c.pollerHistory.getPollerInfo(time.Time{})
66✔
417
}
66✔
418

419
// HasPollerAfter checks if there is any poller after a timestamp
420
func (c *taskListManagerImpl) HasPollerAfter(accessTime time.Time) bool {
78✔
421
        inflightPollerCount := 0
78✔
422
        c.outstandingPollsLock.Lock()
78✔
423
        inflightPollerCount = len(c.outstandingPollsMap)
78✔
424
        c.outstandingPollsLock.Unlock()
78✔
425
        if inflightPollerCount > 0 {
108✔
426
                return true
30✔
427
        }
30✔
428
        recentPollers := c.pollerHistory.getPollerInfo(accessTime)
48✔
429
        return len(recentPollers) > 0
48✔
430
}
431

432
func (c *taskListManagerImpl) CancelPoller(pollerID string) {
132✔
433
        c.outstandingPollsLock.Lock()
132✔
434
        cancel, ok := c.outstandingPollsMap[pollerID]
132✔
435
        c.outstandingPollsLock.Unlock()
132✔
436

132✔
437
        if ok && cancel != nil {
135✔
438
                cancel()
3✔
439
                c.logger.Info("canceled outstanding poller", tag.WorkflowDomainName(c.domainName))
3✔
440
        }
3✔
441
}
442

443
// DescribeTaskList returns information about the target tasklist, right now this API returns the
444
// pollers which polled this tasklist in last few minutes and status of tasklist's ackManager
445
// (readLevel, ackLevel, backlogCountHint and taskIDBlock).
446
func (c *taskListManagerImpl) DescribeTaskList(includeTaskListStatus bool) *types.DescribeTaskListResponse {
66✔
447
        response := &types.DescribeTaskListResponse{Pollers: c.GetAllPollerInfo()}
66✔
448
        if !includeTaskListStatus {
125✔
449
                return response
59✔
450
        }
59✔
451

452
        taskIDBlock := rangeIDToTaskIDBlock(c.db.RangeID(), c.config.RangeSize)
7✔
453
        response.TaskListStatus = &types.TaskListStatus{
7✔
454
                ReadLevel:        c.taskAckManager.GetReadLevel(),
7✔
455
                AckLevel:         c.taskAckManager.GetAckLevel(),
7✔
456
                BacklogCountHint: c.taskAckManager.GetBacklogCount(),
7✔
457
                RatePerSecond:    c.matcher.Rate(),
7✔
458
                TaskIDBlock: &types.TaskIDBlock{
7✔
459
                        StartID: taskIDBlock.start,
7✔
460
                        EndID:   taskIDBlock.end,
7✔
461
                },
7✔
462
        }
7✔
463

7✔
464
        return response
7✔
465
}
466

467
func (c *taskListManagerImpl) String() string {
×
468
        buf := new(bytes.Buffer)
×
469
        if c.taskListID.taskType == persistence.TaskListTypeActivity {
×
470
                buf.WriteString("Activity")
×
471
        } else {
×
472
                buf.WriteString("Decision")
×
473
        }
×
474
        rangeID := c.db.RangeID()
×
475
        fmt.Fprintf(buf, " task list %v\n", c.taskListID.name)
×
476
        fmt.Fprintf(buf, "RangeID=%v\n", rangeID)
×
477
        fmt.Fprintf(buf, "TaskIDBlock=%+v\n", rangeIDToTaskIDBlock(rangeID, c.config.RangeSize))
×
478
        fmt.Fprintf(buf, "AckLevel=%v\n", c.taskAckManager.GetAckLevel())
×
479
        fmt.Fprintf(buf, "MaxReadLevel=%v\n", c.taskAckManager.GetReadLevel())
×
480

×
481
        return buf.String()
×
482
}
483

484
func (c *taskListManagerImpl) GetTaskListKind() types.TaskListKind {
×
485
        return c.taskListKind
×
486
}
×
487

488
func (c *taskListManagerImpl) TaskListID() *taskListID {
1,367✔
489
        return c.taskListID
1,367✔
490
}
1,367✔
491

492
// Retry operation on transient error. On rangeID update by another process calls c.Stop().
493
func (c *taskListManagerImpl) executeWithRetry(
494
        operation func() (interface{}, error),
495
) (result interface{}, err error) {
40,092✔
496

40,092✔
497
        op := func() error {
80,184✔
498
                result, err = operation()
40,092✔
499
                return err
40,092✔
500
        }
40,092✔
501

502
        throttleRetry := backoff.NewThrottleRetry(
40,092✔
503
                backoff.WithRetryPolicy(persistenceOperationRetryPolicy),
40,092✔
504
                backoff.WithRetryableError(persistence.IsTransientError),
40,092✔
505
        )
40,092✔
506
        err = c.handleErr(throttleRetry.Do(context.Background(), op))
40,092✔
507
        return
40,092✔
508
}
509

510
func (c *taskListManagerImpl) trySyncMatch(ctx context.Context, params addTaskParams, isolationGroup string) (bool, error) {
40,086✔
511
        task := newInternalTask(params.taskInfo, nil, params.source, params.forwardedFrom, true, params.activityTaskDispatchInfo, isolationGroup)
40,086✔
512
        childCtx := ctx
40,086✔
513
        cancel := func() {}
55,682✔
514
        waitTime := maxSyncMatchWaitTime
40,086✔
515
        if params.activityTaskDispatchInfo != nil {
40,086✔
516
                waitTime = c.config.ActivityTaskSyncMatchWaitTime(params.activityTaskDispatchInfo.WorkflowDomain)
×
517
        }
×
518
        if !task.isForwarded() {
64,576✔
519
                // when task is forwarded from another matching host, we trust the context as is
24,490✔
520
                // otherwise, we override to limit the amount of time we can block on sync match
24,490✔
521
                childCtx, cancel = c.newChildContext(ctx, waitTime, time.Second)
24,490✔
522
        }
24,490✔
523
        var matched bool
40,086✔
524
        var err error
40,086✔
525
        if params.activityTaskDispatchInfo != nil {
40,086✔
526
                matched, err = c.matcher.offerOrTimeout(childCtx, task)
×
527
        } else {
40,086✔
528
                matched, err = c.matcher.Offer(childCtx, task)
40,086✔
529
        }
40,086✔
530
        cancel()
40,086✔
531
        return matched, err
40,086✔
532
}
533

534
// newChildContext creates a child context with desired timeout.
535
// if tailroom is non-zero, then child context timeout will be
536
// the minOf(parentCtx.Deadline()-tailroom, timeout). Use this
537
// method to create child context when childContext cannot use
538
// all of parent's deadline but instead there is a need to leave
539
// some time for parent to do some post-work
540
func (c *taskListManagerImpl) newChildContext(
541
        parent context.Context,
542
        timeout time.Duration,
543
        tailroom time.Duration,
544
) (context.Context, context.CancelFunc) {
51,026✔
545
        select {
51,026✔
546
        case <-parent.Done():
1✔
547
                return parent, func() {}
2✔
548
        default:
51,025✔
549
        }
550
        deadline, ok := parent.Deadline()
51,025✔
551
        if !ok {
97,537✔
552
                return context.WithTimeout(parent, timeout)
46,512✔
553
        }
46,512✔
554
        remaining := time.Until(deadline) - tailroom
4,513✔
555
        if remaining < timeout {
5,608✔
556
                timeout = time.Duration(common.MaxInt64(0, int64(remaining)))
1,095✔
557
        }
1,095✔
558
        return context.WithTimeout(parent, timeout)
4,513✔
559
}
560

561
func (c *taskListManagerImpl) isFowardingAllowed(taskList *taskListID, kind types.TaskListKind) bool {
1,399✔
562
        return !taskList.IsRoot() && kind != types.TaskListKindSticky
1,399✔
563
}
1,399✔
564

565
func (c *taskListManagerImpl) isIsolationMatcherEnabled() bool {
27,935✔
566
        return c.taskListKind != types.TaskListKindSticky && c.enableIsolation
27,935✔
567
}
27,935✔
568

569
func (c *taskListManagerImpl) shouldReload() bool {
66,632✔
570
        return c.config.EnableTasklistIsolation() != c.enableIsolation && c.taskListKind != types.TaskListKindSticky
66,632✔
571
}
66,632✔
572

573
func (c *taskListManagerImpl) getIsolationGroupForTask(ctx context.Context, taskInfo *persistence.TaskInfo) (string, error) {
55,030✔
574
        if c.enableIsolation && len(taskInfo.PartitionConfig) > 0 {
70,424✔
575
                partitionConfig := make(map[string]string)
15,394✔
576
                for k, v := range taskInfo.PartitionConfig {
30,788✔
577
                        partitionConfig[k] = v
15,394✔
578
                }
15,394✔
579
                partitionConfig[partition.WorkflowRunIDKey] = taskInfo.RunID
15,394✔
580
                pollerIsolationGroups := c.config.AllIsolationGroups
15,394✔
581
                // Not all poller information are available at the time of task list manager creation,
15,394✔
582
                // because we don't persist poller information in database, so in the first minute, we always assume
15,394✔
583
                // pollers are available in all isolation groups to avoid the risk of leaking a task to another isolation group.
15,394✔
584
                // Besides, for sticky tasklists, not all poller information are available, we also use all isolation group.
15,394✔
585
                if time.Now().Sub(c.createTime) > time.Minute && c.taskListKind != types.TaskListKindSticky {
19,408✔
586
                        pollerIsolationGroups = c.pollerHistory.getPollerIsolationGroups(time.Now().Add(-time.Minute))
4,014✔
587
                        if len(pollerIsolationGroups) == 0 {
6,030✔
588
                                // we don't have any pollers, use all isolation groups and wait for pollers' arriving
2,016✔
589
                                pollerIsolationGroups = c.config.AllIsolationGroups
2,016✔
590
                        }
2,016✔
591
                }
592
                group, err := c.partitioner.GetIsolationGroupByDomainID(ctx, taskInfo.DomainID, partitionConfig, pollerIsolationGroups)
15,394✔
593
                if err != nil {
15,394✔
594
                        // For a sticky tasklist, return StickyUnavailableError to let it be added to the non-sticky tasklist.
×
595
                        if err == partition.ErrNoIsolationGroupsAvailable && c.taskListKind == types.TaskListKindSticky {
×
596
                                return "", _stickyPollerUnavailableError
×
597
                        }
×
598
                        // if we're unable to get the isolation group, log the error and fallback to no isolation
599
                        c.logger.Error("Failed to get isolation group from partition library", tag.WorkflowID(taskInfo.WorkflowID), tag.WorkflowRunID(taskInfo.RunID), tag.TaskID(taskInfo.TaskID), tag.Error(err))
×
600
                        return "", nil
×
601
                }
602
                // For a sticky tasklist, it is possible that when an isolation group is undrained, the tasks from one workflow is reassigned
603
                // to the isolation group undrained. If there is no poller from the isolation group, we should return StickyUnavailableError
604
                // to let the task to be re-enqueued to the non-sticky tasklist. If there is poller, just return an empty isolation group, because
605
                // there is at most one isolation group for sticky tasklist and we could just use empty isolation group for matching.
606
                if c.taskListKind == types.TaskListKindSticky {
15,409✔
607
                        pollerIsolationGroups = c.pollerHistory.getPollerIsolationGroups(time.Now().Add(-time.Minute))
15✔
608
                        for _, pollerGroup := range pollerIsolationGroups {
30✔
609
                                if group == pollerGroup {
25✔
610
                                        return "", nil
10✔
611
                                }
10✔
612
                        }
613
                        return "", _stickyPollerUnavailableError
5✔
614
                }
615
                return group, nil
15,379✔
616
        }
617
        return "", nil
39,636✔
618
}
619

620
func getTaskListTypeTag(taskListType int) metrics.Tag {
22,716✔
621
        switch taskListType {
22,716✔
622
        case persistence.TaskListTypeActivity:
7,766✔
623
                return taskListActivityTypeTag
7,766✔
624
        case persistence.TaskListTypeDecision:
14,950✔
625
                return taskListDecisionTypeTag
14,950✔
626
        default:
×
627
                return metrics.TaskListTypeTag("")
×
628
        }
629
}
630

631
func createServiceBusyError(msg string) *types.ServiceBusyError {
×
632
        return &types.ServiceBusyError{Message: msg}
×
633
}
×
634

635
func rangeIDToTaskIDBlock(rangeID, rangeSize int64) taskIDBlock {
3,136✔
636
        return taskIDBlock{
3,136✔
637
                start: (rangeID-1)*rangeSize + 1,
3,136✔
638
                end:   rangeID * rangeSize,
3,136✔
639
        }
3,136✔
640
}
3,136✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc