• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018daedd-c4d7-4e21-b456-76c002abd778

15 Feb 2024 10:23PM UTC coverage: 62.721% (-0.03%) from 62.749%
018daedd-c4d7-4e21-b456-76c002abd778

Pull #5666

buildkite

neil-xie
Update in submodule
Pull Request #5666: Upgrade pinot client version

92590 of 147622 relevant lines covered (62.72%)

2306.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.46
/service/matching/taskListManager.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
4

5
// Permission is hereby granted, free of charge, to any person obtaining a copy
6
// of this software and associated documentation files (the "Software"), to deal
7
// in the Software without restriction, including without limitation the rights
8
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
// copies of the Software, and to permit persons to whom the Software is
10
// furnished to do so, subject to the following conditions:
11
//
12
// The above copyright notice and this permission notice shall be included in all
13
// copies or substantial portions of the Software.
14
//
15
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
// SOFTWARE.
22

23
package matching
24

25
import (
26
        "bytes"
27
        "context"
28
        "errors"
29
        "fmt"
30
        "sort"
31
        "sync"
32
        "sync/atomic"
33
        "time"
34

35
        "github.com/uber/cadence/common"
36
        "github.com/uber/cadence/common/backoff"
37
        "github.com/uber/cadence/common/cache"
38
        "github.com/uber/cadence/common/clock"
39
        "github.com/uber/cadence/common/cluster"
40
        "github.com/uber/cadence/common/log"
41
        "github.com/uber/cadence/common/log/tag"
42
        "github.com/uber/cadence/common/messaging"
43
        "github.com/uber/cadence/common/metrics"
44
        "github.com/uber/cadence/common/partition"
45
        "github.com/uber/cadence/common/persistence"
46
        "github.com/uber/cadence/common/types"
47
)
48

49
const (
50
        // Time budget for empty task to propagate through the function stack and be returned to
51
        // pollForActivityTask or pollForDecisionTask handler.
52
        returnEmptyTaskTimeBudget time.Duration = time.Second
53
)
54

55
var (
56
        taskListActivityTypeTag = metrics.TaskListTypeTag("activity")
57
        taskListDecisionTypeTag = metrics.TaskListTypeTag("decision")
58
)
59

60
type (
61
        addTaskParams struct {
62
                execution                *types.WorkflowExecution
63
                taskInfo                 *persistence.TaskInfo
64
                source                   types.TaskSource
65
                forwardedFrom            string
66
                activityTaskDispatchInfo *types.ActivityTaskDispatchInfo
67
        }
68

69
        taskListManager interface {
70
                Start() error
71
                Stop()
72
                // AddTask adds a task to the task list. This method will first attempt a synchronous
73
                // match with a poller. When that fails, task will be written to database and later
74
                // asynchronously matched with a poller
75
                AddTask(ctx context.Context, params addTaskParams) (syncMatch bool, err error)
76
                // GetTask blocks waiting for a task Returns error when context deadline is exceeded
77
                // maxDispatchPerSecond is the max rate at which tasks are allowed to be dispatched
78
                // from this task list to pollers
79
                GetTask(ctx context.Context, maxDispatchPerSecond *float64) (*InternalTask, error)
80
                // DispatchTask dispatches a task to a poller. When there are no pollers to pick
81
                // up the task, this method will return error. Task will not be persisted to db
82
                DispatchTask(ctx context.Context, task *InternalTask) error
83
                // DispatchQueryTask will dispatch query to local or remote poller. If forwarded then result or error is returned,
84
                // if dispatched to local poller then nil and nil is returned.
85
                DispatchQueryTask(ctx context.Context, taskID string, request *types.MatchingQueryWorkflowRequest) (*types.QueryWorkflowResponse, error)
86
                CancelPoller(pollerID string)
87
                GetAllPollerInfo() []*types.PollerInfo
88
                HasPollerAfter(accessTime time.Time) bool
89
                // DescribeTaskList returns information about the target tasklist
90
                DescribeTaskList(includeTaskListStatus bool) *types.DescribeTaskListResponse
91
                String() string
92
                GetTaskListKind() types.TaskListKind
93
                TaskListID() *taskListID
94
        }
95

96
        outstandingPollerInfo struct {
97
                isolationGroup string
98
                cancel         context.CancelFunc
99
        }
100

101
        // Single task list in memory state
102
        taskListManagerImpl struct {
103
                createTime      time.Time
104
                enableIsolation bool
105
                taskListID      *taskListID
106
                taskListKind    types.TaskListKind // sticky taskList has different process in persistence
107
                config          *taskListConfig
108
                db              *taskListDB
109
                taskWriter      *taskWriter
110
                taskReader      *taskReader // reads tasks from db and async matches it with poller
111
                liveness        *liveness
112
                taskGC          *taskGC
113
                taskAckManager  messaging.AckManager // tracks ackLevel for delivered messages
114
                matcher         *TaskMatcher         // for matching a task producer with a poller
115
                clusterMetadata cluster.Metadata
116
                domainCache     cache.DomainCache
117
                partitioner     partition.Partitioner
118
                logger          log.Logger
119
                scope           metrics.Scope
120
                domainName      string
121
                // pollerHistory stores poller which poll from this tasklist in last few minutes
122
                pollerHistory *pollerHistory
123
                // outstandingPollsMap is needed to keep track of all outstanding pollers for a
124
                // particular tasklist.  PollerID generated by frontend is used as the key and
125
                // CancelFunc is the value.  This is used to cancel the context to unblock any
126
                // outstanding poller when the frontend detects client connection is closed to
127
                // prevent tasks being dispatched to zombie pollers.
128
                outstandingPollsLock sync.Mutex
129
                outstandingPollsMap  map[string]outstandingPollerInfo
130
                startWG              sync.WaitGroup // ensures that background processes do not start until setup is ready
131
                stopped              int32
132
                closeCallback        func(taskListManager)
133
        }
134
)
135

136
const (
137
        // maxSyncMatchWaitTime is the max amount of time that we are willing to wait for a sync match to happen
138
        maxSyncMatchWaitTime = 200 * time.Millisecond
139
)
140

141
var _ taskListManager = (*taskListManagerImpl)(nil)
142

143
var errRemoteSyncMatchFailed = &types.RemoteSyncMatchedError{Message: "remote sync match failed"}
144

145
func newTaskListManager(
146
        e *matchingEngineImpl,
147
        taskList *taskListID,
148
        taskListKind *types.TaskListKind,
149
        config *Config,
150
        createTime time.Time,
151
) (taskListManager, error) {
1,383✔
152

1,383✔
153
        taskListConfig, err := newTaskListConfig(taskList, config, e.domainCache)
1,383✔
154
        if err != nil {
1,383✔
155
                return nil, err
×
156
        }
×
157

158
        if taskListKind == nil {
2,341✔
159
                normalTaskListKind := types.TaskListKindNormal
958✔
160
                taskListKind = &normalTaskListKind
958✔
161
        }
958✔
162
        domainName, err := e.domainCache.GetDomainName(taskList.domainID)
1,383✔
163
        if err != nil {
1,383✔
164
                return nil, err
×
165
        }
×
166
        scope := newPerTaskListScope(domainName, taskList.name, *taskListKind, e.metricsClient, metrics.MatchingTaskListMgrScope)
1,383✔
167
        db := newTaskListDB(e.taskManager, taskList.domainID, domainName, taskList.name, taskList.taskType, int(*taskListKind), e.logger)
1,383✔
168

1,383✔
169
        tlMgr := &taskListManagerImpl{
1,383✔
170
                createTime:          createTime,
1,383✔
171
                enableIsolation:     taskListConfig.EnableTasklistIsolation(),
1,383✔
172
                domainCache:         e.domainCache,
1,383✔
173
                clusterMetadata:     e.clusterMetadata,
1,383✔
174
                partitioner:         e.partitioner,
1,383✔
175
                taskListID:          taskList,
1,383✔
176
                taskListKind:        *taskListKind,
1,383✔
177
                logger:              e.logger.WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(taskList.name), tag.WorkflowTaskListType(taskList.taskType)),
1,383✔
178
                db:                  db,
1,383✔
179
                taskAckManager:      messaging.NewAckManager(e.logger),
1,383✔
180
                taskGC:              newTaskGC(db, taskListConfig),
1,383✔
181
                config:              taskListConfig,
1,383✔
182
                outstandingPollsMap: make(map[string]outstandingPollerInfo),
1,383✔
183
                domainName:          domainName,
1,383✔
184
                scope:               scope,
1,383✔
185
                closeCallback:       e.removeTaskListManager,
1,383✔
186
        }
1,383✔
187

1,383✔
188
        taskListTypeMetricScope := tlMgr.scope.Tagged(
1,383✔
189
                getTaskListTypeTag(taskList.taskType),
1,383✔
190
        )
1,383✔
191
        tlMgr.pollerHistory = newPollerHistory(func() {
49,677✔
192
                taskListTypeMetricScope.UpdateGauge(metrics.PollerPerTaskListCounter,
48,294✔
193
                        float64(len(tlMgr.pollerHistory.getPollerInfo(time.Time{}))))
48,294✔
194
        })
48,294✔
195
        tlMgr.liveness = newLiveness(clock.NewRealTimeSource(), taskListConfig.IdleTasklistCheckInterval(), tlMgr.Stop)
1,383✔
196
        var isolationGroups []string
1,383✔
197
        if tlMgr.isIsolationMatcherEnabled() {
1,397✔
198
                isolationGroups = config.AllIsolationGroups
14✔
199
        }
14✔
200
        var fwdr *Forwarder
1,383✔
201
        if tlMgr.isFowardingAllowed(taskList, *taskListKind) {
2,129✔
202
                fwdr = newForwarder(&taskListConfig.forwarderConfig, taskList, *taskListKind, e.matchingClient, isolationGroups)
746✔
203
        }
746✔
204
        tlMgr.matcher = newTaskMatcher(taskListConfig, fwdr, tlMgr.scope, isolationGroups, tlMgr.logger)
1,383✔
205
        tlMgr.taskWriter = newTaskWriter(tlMgr)
1,383✔
206
        tlMgr.taskReader = newTaskReader(tlMgr, isolationGroups)
1,383✔
207
        tlMgr.startWG.Add(1)
1,383✔
208
        return tlMgr, nil
1,383✔
209
}
210

211
// Starts reading pump for the given task list.
212
// The pump fills up taskBuffer from persistence.
213
func (c *taskListManagerImpl) Start() error {
1,372✔
214
        defer c.startWG.Done()
1,372✔
215

1,372✔
216
        c.liveness.Start()
1,372✔
217
        if err := c.taskWriter.Start(); err != nil {
1,372✔
218
                c.Stop()
×
219
                return err
×
220
        }
×
221
        c.taskReader.Start()
1,372✔
222

1,372✔
223
        return nil
1,372✔
224
}
225

226
// Stops pump that fills up taskBuffer from persistence.
227
func (c *taskListManagerImpl) Stop() {
2,711✔
228
        if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
4,058✔
229
                return
1,347✔
230
        }
1,347✔
231
        c.closeCallback(c)
1,364✔
232
        c.liveness.Stop()
1,364✔
233
        c.taskWriter.Stop()
1,364✔
234
        c.taskReader.Stop()
1,364✔
235
        c.logger.Info("Task list manager state changed", tag.LifeCycleStopped)
1,364✔
236
}
237

238
func (c *taskListManagerImpl) handleErr(err error) error {
52,543✔
239
        var e *persistence.ConditionFailedError
52,543✔
240
        if errors.As(err, &e) {
52,543✔
241
                // This indicates the task list may have moved to another host.
×
242
                c.scope.IncCounter(metrics.ConditionFailedErrorPerTaskListCounter)
×
243
                c.logger.Debug("Stopping task list due to persistence condition failure.", tag.Error(err))
×
244
                c.Stop()
×
245
                if c.taskListKind == types.TaskListKindSticky {
×
246
                        // TODO: we don't see this error in our logs, we might be able to remove this error
×
247
                        err = &types.InternalServiceError{Message: common.StickyTaskConditionFailedErrorMsg}
×
248
                }
×
249
        }
250
        return err
52,543✔
251
}
252

253
// AddTask adds a task to the task list. This method will first attempt a synchronous
254
// match with a poller. When there are no pollers or if rate limit is exceeded, task will
255
// be written to database and later asynchronously matched with a poller
256
func (c *taskListManagerImpl) AddTask(ctx context.Context, params addTaskParams) (bool, error) {
37,120✔
257
        c.startWG.Wait()
37,120✔
258
        if c.shouldReload() {
37,122✔
259
                c.Stop()
2✔
260
                return false, errShutdown
2✔
261
        }
2✔
262
        if params.forwardedFrom == "" {
61,600✔
263
                // request sent by history service
24,482✔
264
                c.liveness.markAlive(time.Now())
24,482✔
265
        }
24,482✔
266
        var syncMatch bool
37,118✔
267
        _, err := c.executeWithRetry(func() (interface{}, error) {
74,236✔
268
                if err := ctx.Err(); err != nil {
37,118✔
269
                        return nil, err
×
270
                }
×
271

272
                domainEntry, err := c.domainCache.GetDomainByID(params.taskInfo.DomainID)
37,118✔
273
                if err != nil {
37,118✔
274
                        return nil, err
×
275
                }
×
276

277
                isForwarded := params.forwardedFrom != ""
37,118✔
278

37,118✔
279
                if _, err := domainEntry.IsActiveIn(c.clusterMetadata.GetCurrentClusterName()); err != nil {
37,118✔
280
                        // standby task, only persist when task is not forwarded from child partition
×
281
                        syncMatch = false
×
282
                        if isForwarded {
×
283
                                return &persistence.CreateTasksResponse{}, errRemoteSyncMatchFailed
×
284
                        }
×
285

286
                        r, err := c.taskWriter.appendTask(params.execution, params.taskInfo)
×
287
                        return r, err
×
288
                }
289

290
                isolationGroup, err := c.getIsolationGroupForTask(ctx, params.taskInfo)
37,118✔
291
                if err != nil {
37,118✔
292
                        return false, err
×
293
                }
×
294
                // active task, try sync match first
295
                syncMatch, err = c.trySyncMatch(ctx, params, isolationGroup)
37,118✔
296
                if syncMatch {
49,456✔
297
                        return &persistence.CreateTasksResponse{}, err
12,338✔
298
                }
12,338✔
299
                if params.activityTaskDispatchInfo != nil {
24,780✔
300
                        return false, errRemoteSyncMatchFailed
×
301
                }
×
302

303
                if isForwarded {
36,496✔
304
                        // forwarded from child partition - only do sync match
11,716✔
305
                        // child partition will persist the task when sync match fails
11,716✔
306
                        return &persistence.CreateTasksResponse{}, errRemoteSyncMatchFailed
11,716✔
307
                }
11,716✔
308

309
                return c.taskWriter.appendTask(params.execution, params.taskInfo)
13,064✔
310
        })
311

312
        if err == nil && !syncMatch {
50,179✔
313
                c.taskReader.Signal()
13,061✔
314
        }
13,061✔
315

316
        return syncMatch, err
37,118✔
317
}
318

319
// DispatchTask dispatches a task to a poller. When there are no pollers to pick
320
// up the task or if rate limit is exceeded, this method will return error. Task
321
// *will not* be persisted to db
322
func (c *taskListManagerImpl) DispatchTask(ctx context.Context, task *InternalTask) error {
27,414✔
323
        return c.matcher.MustOffer(ctx, task)
27,414✔
324
}
27,414✔
325

326
// DispatchQueryTask will dispatch query to local or remote poller. If forwarded then result or error is returned,
327
// if dispatched to local poller then nil and nil is returned.
328
func (c *taskListManagerImpl) DispatchQueryTask(
329
        ctx context.Context,
330
        taskID string,
331
        request *types.MatchingQueryWorkflowRequest,
332
) (*types.QueryWorkflowResponse, error) {
48✔
333
        c.startWG.Wait()
48✔
334
        task := newInternalQueryTask(taskID, request)
48✔
335
        return c.matcher.OfferQuery(ctx, task)
48✔
336
}
48✔
337

338
// GetTask blocks waiting for a task.
339
// Returns error when context deadline is exceeded
340
// maxDispatchPerSecond is the max rate at which tasks are allowed
341
// to be dispatched from this task list to pollers
342
func (c *taskListManagerImpl) GetTask(
343
        ctx context.Context,
344
        maxDispatchPerSecond *float64,
345
) (*InternalTask, error) {
24,151✔
346
        if c.shouldReload() {
24,153✔
347
                c.Stop()
2✔
348
                return nil, ErrNoTasks
2✔
349
        }
2✔
350
        c.liveness.markAlive(time.Now())
24,149✔
351
        task, err := c.getTask(ctx, maxDispatchPerSecond)
24,149✔
352
        if err != nil {
24,484✔
353
                return nil, fmt.Errorf("couldn't get task: %w", err)
335✔
354
        }
335✔
355
        task.domainName = c.domainName
23,814✔
356
        task.backlogCountHint = c.taskAckManager.GetBacklogCount()
23,814✔
357
        return task, nil
23,814✔
358
}
359

360
func (c *taskListManagerImpl) getTask(ctx context.Context, maxDispatchPerSecond *float64) (*InternalTask, error) {
24,149✔
361
        // We need to set a shorter timeout than the original ctx; otherwise, by the time ctx deadline is
24,149✔
362
        // reached, instead of emptyTask, context timeout error is returned to the frontend by the rpc stack,
24,149✔
363
        // which counts against our SLO. By shortening the timeout by a very small amount, the emptyTask can be
24,149✔
364
        // returned to the handler before a context timeout error is generated.
24,149✔
365
        childCtx, cancel := c.newChildContext(ctx, c.config.LongPollExpirationInterval(), returnEmptyTaskTimeBudget)
24,149✔
366
        defer cancel()
24,149✔
367

24,149✔
368
        isolationGroup, _ := ctx.Value(_isolationGroupKey).(string)
24,149✔
369
        pollerID, ok := ctx.Value(pollerIDKey).(string)
24,149✔
370
        if ok && pollerID != "" {
26,845✔
371
                // Found pollerID on context, add it to the map to allow it to be canceled in
2,696✔
372
                // response to CancelPoller call
2,696✔
373
                c.outstandingPollsLock.Lock()
2,696✔
374
                c.outstandingPollsMap[pollerID] = outstandingPollerInfo{isolationGroup: isolationGroup, cancel: cancel}
2,696✔
375
                c.outstandingPollsLock.Unlock()
2,696✔
376
                defer func() {
5,392✔
377
                        c.outstandingPollsLock.Lock()
2,696✔
378
                        delete(c.outstandingPollsMap, pollerID)
2,696✔
379
                        c.outstandingPollsLock.Unlock()
2,696✔
380
                }()
2,696✔
381
        }
382

383
        identity, ok := ctx.Value(identityKey).(string)
24,149✔
384
        if ok && identity != "" {
48,295✔
385
                c.pollerHistory.updatePollerInfo(pollerIdentity(identity), pollerInfo{ratePerSecond: maxDispatchPerSecond, isolationGroup: isolationGroup})
24,146✔
386
                defer func() {
48,292✔
387
                        // to update timestamp of this poller when long poll ends
24,146✔
388
                        c.pollerHistory.updatePollerInfo(pollerIdentity(identity), pollerInfo{ratePerSecond: maxDispatchPerSecond, isolationGroup: isolationGroup})
24,146✔
389
                }()
24,146✔
390
        }
391

392
        domainEntry, err := c.domainCache.GetDomainByID(c.taskListID.domainID)
24,149✔
393
        if err != nil {
24,149✔
394
                return nil, fmt.Errorf("unable to fetch domain from cache: %w", err)
×
395
        }
×
396

397
        // the desired global rate limit for the task list comes from the
398
        // poller, which lives inside the client side worker. There is
399
        // one rateLimiter for this entire task list and as we get polls,
400
        // we update the ratelimiter rps if it has changed from the last
401
        // value. Last poller wins if different pollers provide different values
402
        c.matcher.UpdateRatelimit(maxDispatchPerSecond)
24,149✔
403

24,149✔
404
        if _, err := domainEntry.IsActiveIn(c.clusterMetadata.GetCurrentClusterName()); err != nil {
24,149✔
405
                return c.matcher.PollForQuery(childCtx)
×
406
        }
×
407

408
        if c.isIsolationMatcherEnabled() {
34,177✔
409
                return c.matcher.Poll(childCtx, isolationGroup)
10,028✔
410
        }
10,028✔
411
        return c.matcher.Poll(childCtx, "")
14,121✔
412
}
413

414
// GetAllPollerInfo returns all pollers that polled from this tasklist in last few minutes
415
func (c *taskListManagerImpl) GetAllPollerInfo() []*types.PollerInfo {
66✔
416
        return c.pollerHistory.getPollerInfo(time.Time{})
66✔
417
}
66✔
418

419
// HasPollerAfter checks if there is any poller after a timestamp
420
func (c *taskListManagerImpl) HasPollerAfter(accessTime time.Time) bool {
68✔
421
        inflightPollerCount := 0
68✔
422
        c.outstandingPollsLock.Lock()
68✔
423
        inflightPollerCount = len(c.outstandingPollsMap)
68✔
424
        c.outstandingPollsLock.Unlock()
68✔
425
        if inflightPollerCount > 0 {
100✔
426
                return true
32✔
427
        }
32✔
428
        recentPollers := c.pollerHistory.getPollerInfo(accessTime)
36✔
429
        return len(recentPollers) > 0
36✔
430
}
431

432
func (c *taskListManagerImpl) CancelPoller(pollerID string) {
132✔
433
        c.outstandingPollsLock.Lock()
132✔
434
        info, ok := c.outstandingPollsMap[pollerID]
132✔
435
        c.outstandingPollsLock.Unlock()
132✔
436

132✔
437
        if ok && info.cancel != nil {
133✔
438
                info.cancel()
1✔
439
                c.logger.Info("canceled outstanding poller", tag.WorkflowDomainName(c.domainName))
1✔
440
        }
1✔
441
}
442

443
// DescribeTaskList returns information about the target tasklist, right now this API returns the
444
// pollers which polled this tasklist in last few minutes and status of tasklist's ackManager
445
// (readLevel, ackLevel, backlogCountHint and taskIDBlock).
446
func (c *taskListManagerImpl) DescribeTaskList(includeTaskListStatus bool) *types.DescribeTaskListResponse {
66✔
447
        response := &types.DescribeTaskListResponse{Pollers: c.GetAllPollerInfo()}
66✔
448
        if !includeTaskListStatus {
125✔
449
                return response
59✔
450
        }
59✔
451

452
        taskIDBlock := rangeIDToTaskIDBlock(c.db.RangeID(), c.config.RangeSize)
7✔
453
        backlogCount, err := c.db.GetTaskListSize(c.taskAckManager.GetAckLevel())
7✔
454
        if err != nil {
7✔
455
                // fallback to im-memory backlog, if failed to get count from db
×
456
                backlogCount = c.taskAckManager.GetBacklogCount()
×
457
        }
×
458
        response.TaskListStatus = &types.TaskListStatus{
7✔
459
                ReadLevel:        c.taskAckManager.GetReadLevel(),
7✔
460
                AckLevel:         c.taskAckManager.GetAckLevel(),
7✔
461
                BacklogCountHint: backlogCount,
7✔
462
                RatePerSecond:    c.matcher.Rate(),
7✔
463
                TaskIDBlock: &types.TaskIDBlock{
7✔
464
                        StartID: taskIDBlock.start,
7✔
465
                        EndID:   taskIDBlock.end,
7✔
466
                },
7✔
467
        }
7✔
468

7✔
469
        return response
7✔
470
}
471

472
func (c *taskListManagerImpl) String() string {
×
473
        buf := new(bytes.Buffer)
×
474
        if c.taskListID.taskType == persistence.TaskListTypeActivity {
×
475
                buf.WriteString("Activity")
×
476
        } else {
×
477
                buf.WriteString("Decision")
×
478
        }
×
479
        rangeID := c.db.RangeID()
×
480
        fmt.Fprintf(buf, " task list %v\n", c.taskListID.name)
×
481
        fmt.Fprintf(buf, "RangeID=%v\n", rangeID)
×
482
        fmt.Fprintf(buf, "TaskIDBlock=%+v\n", rangeIDToTaskIDBlock(rangeID, c.config.RangeSize))
×
483
        fmt.Fprintf(buf, "AckLevel=%v\n", c.taskAckManager.GetAckLevel())
×
484
        fmt.Fprintf(buf, "MaxReadLevel=%v\n", c.taskAckManager.GetReadLevel())
×
485

×
486
        return buf.String()
×
487
}
488

489
func (c *taskListManagerImpl) GetTaskListKind() types.TaskListKind {
×
490
        return c.taskListKind
×
491
}
×
492

493
func (c *taskListManagerImpl) TaskListID() *taskListID {
1,366✔
494
        return c.taskListID
1,366✔
495
}
1,366✔
496

497
// Retry operation on transient error. On rangeID update by another process calls c.Stop().
498
func (c *taskListManagerImpl) executeWithRetry(
499
        operation func() (interface{}, error),
500
) (result interface{}, err error) {
37,118✔
501

37,118✔
502
        op := func() error {
74,236✔
503
                result, err = operation()
37,118✔
504
                return err
37,118✔
505
        }
37,118✔
506

507
        throttleRetry := backoff.NewThrottleRetry(
37,118✔
508
                backoff.WithRetryPolicy(persistenceOperationRetryPolicy),
37,118✔
509
                backoff.WithRetryableError(persistence.IsTransientError),
37,118✔
510
        )
37,118✔
511
        err = c.handleErr(throttleRetry.Do(context.Background(), op))
37,118✔
512
        return
37,118✔
513
}
514

515
func (c *taskListManagerImpl) trySyncMatch(ctx context.Context, params addTaskParams, isolationGroup string) (bool, error) {
37,118✔
516
        task := newInternalTask(params.taskInfo, nil, params.source, params.forwardedFrom, true, params.activityTaskDispatchInfo, isolationGroup)
37,118✔
517
        childCtx := ctx
37,118✔
518
        cancel := func() {}
49,754✔
519
        waitTime := maxSyncMatchWaitTime
37,118✔
520
        if params.activityTaskDispatchInfo != nil {
37,118✔
521
                waitTime = c.config.ActivityTaskSyncMatchWaitTime(params.activityTaskDispatchInfo.WorkflowDomain)
×
522
        }
×
523
        if !task.isForwarded() {
61,600✔
524
                // when task is forwarded from another matching host, we trust the context as is
24,482✔
525
                // otherwise, we override to limit the amount of time we can block on sync match
24,482✔
526
                childCtx, cancel = c.newChildContext(ctx, waitTime, time.Second)
24,482✔
527
        }
24,482✔
528
        var matched bool
37,118✔
529
        var err error
37,118✔
530
        if params.activityTaskDispatchInfo != nil {
37,118✔
531
                matched, err = c.matcher.offerOrTimeout(childCtx, task)
×
532
        } else {
37,118✔
533
                matched, err = c.matcher.Offer(childCtx, task)
37,118✔
534
        }
37,118✔
535
        cancel()
37,118✔
536
        return matched, err
37,118✔
537
}
538

539
// newChildContext creates a child context with desired timeout.
540
// if tailroom is non-zero, then child context timeout will be
541
// the minOf(parentCtx.Deadline()-tailroom, timeout). Use this
542
// method to create child context when childContext cannot use
543
// all of parent's deadline but instead there is a need to leave
544
// some time for parent to do some post-work
545
func (c *taskListManagerImpl) newChildContext(
546
        parent context.Context,
547
        timeout time.Duration,
548
        tailroom time.Duration,
549
) (context.Context, context.CancelFunc) {
48,631✔
550
        select {
48,631✔
551
        case <-parent.Done():
×
552
                return parent, func() {}
×
553
        default:
48,631✔
554
        }
555
        deadline, ok := parent.Deadline()
48,631✔
556
        if !ok {
92,834✔
557
                return context.WithTimeout(parent, timeout)
44,203✔
558
        }
44,203✔
559
        remaining := time.Until(deadline) - tailroom
4,428✔
560
        if remaining < timeout {
5,462✔
561
                timeout = time.Duration(common.MaxInt64(0, int64(remaining)))
1,034✔
562
        }
1,034✔
563
        return context.WithTimeout(parent, timeout)
4,428✔
564
}
565

566
func (c *taskListManagerImpl) isFowardingAllowed(taskList *taskListID, kind types.TaskListKind) bool {
1,383✔
567
        return !taskList.IsRoot() && kind != types.TaskListKindSticky
1,383✔
568
}
1,383✔
569

570
func (c *taskListManagerImpl) isIsolationMatcherEnabled() bool {
25,532✔
571
        return c.taskListKind != types.TaskListKindSticky && c.enableIsolation
25,532✔
572
}
25,532✔
573

574
func (c *taskListManagerImpl) shouldReload() bool {
61,271✔
575
        return c.config.EnableTasklistIsolation() != c.enableIsolation
61,271✔
576
}
61,271✔
577

578
func (c *taskListManagerImpl) getIsolationGroupForTask(ctx context.Context, taskInfo *persistence.TaskInfo) (string, error) {
58,027✔
579
        if c.enableIsolation && len(taskInfo.PartitionConfig) > 0 && c.taskListKind != types.TaskListKindSticky {
73,018✔
580
                partitionConfig := make(map[string]string)
14,991✔
581
                for k, v := range taskInfo.PartitionConfig {
29,982✔
582
                        partitionConfig[k] = v
14,991✔
583
                }
14,991✔
584
                partitionConfig[partition.WorkflowIDKey] = taskInfo.WorkflowID
14,991✔
585
                pollerIsolationGroups := c.config.AllIsolationGroups
14,991✔
586
                // Not all poller information are available at the time of task list manager creation,
14,991✔
587
                // because we don't persist poller information in database, so in the first minute, we always assume
14,991✔
588
                // pollers are available in all isolation groups to avoid the risk of leaking a task to another isolation group.
14,991✔
589
                // Besides, for sticky and scalable tasklists, not all poller information are available, we also use all isolation group.
14,991✔
590
                if time.Now().Sub(c.createTime) > time.Minute && c.taskListKind != types.TaskListKindSticky && c.taskListID.IsRoot() {
19,171✔
591
                        pollerIsolationGroups = c.getPollerIsolationGroups()
4,180✔
592
                        if len(pollerIsolationGroups) == 0 {
6,356✔
593
                                // we don't have any pollers, use all isolation groups and wait for pollers' arriving
2,176✔
594
                                pollerIsolationGroups = c.config.AllIsolationGroups
2,176✔
595
                        }
2,176✔
596
                }
597
                group, err := c.partitioner.GetIsolationGroupByDomainID(ctx, taskInfo.DomainID, partitionConfig, pollerIsolationGroups)
14,991✔
598
                if err != nil {
14,991✔
599
                        // For a sticky tasklist, return StickyUnavailableError to let it be added to the non-sticky tasklist.
×
600
                        if err == partition.ErrNoIsolationGroupsAvailable && c.taskListKind == types.TaskListKindSticky {
×
601
                                return "", _stickyPollerUnavailableError
×
602
                        }
×
603
                        // if we're unable to get the isolation group, log the error and fallback to no isolation
604
                        c.logger.Error("Failed to get isolation group from partition library", tag.WorkflowID(taskInfo.WorkflowID), tag.WorkflowRunID(taskInfo.RunID), tag.TaskID(taskInfo.TaskID), tag.Error(err))
×
605
                        return defaultTaskBufferIsolationGroup, nil
×
606
                }
607
                c.logger.Debug("get isolation group", tag.PollerGroups(pollerIsolationGroups), tag.IsolationGroup(group), tag.PartitionConfig(partitionConfig))
14,991✔
608
                // For a sticky tasklist, it is possible that when an isolation group is undrained, the tasks from one workflow is reassigned
14,991✔
609
                // to the isolation group undrained. If there is no poller from the isolation group, we should return StickyUnavailableError
14,991✔
610
                // to let the task to be re-enqueued to the non-sticky tasklist. If there is poller, just return an empty isolation group, because
14,991✔
611
                // there is at most one isolation group for sticky tasklist and we could just use empty isolation group for matching.
14,991✔
612
                if c.taskListKind == types.TaskListKindSticky {
14,991✔
613
                        pollerIsolationGroups = c.getPollerIsolationGroups()
×
614
                        for _, pollerGroup := range pollerIsolationGroups {
×
615
                                if group == pollerGroup {
×
616
                                        return "", nil
×
617
                                }
×
618
                        }
619
                        return "", _stickyPollerUnavailableError
×
620
                }
621
                return group, nil
14,991✔
622
        }
623
        return defaultTaskBufferIsolationGroup, nil
43,036✔
624
}
625

626
func (c *taskListManagerImpl) getPollerIsolationGroups() []string {
4,183✔
627
        groupSet := c.pollerHistory.getPollerIsolationGroups(time.Now().Add(-10 * time.Second))
4,183✔
628
        c.outstandingPollsLock.Lock()
4,183✔
629
        for _, poller := range c.outstandingPollsMap {
4,184✔
630
                groupSet[poller.isolationGroup] = struct{}{}
1✔
631
        }
1✔
632
        c.outstandingPollsLock.Unlock()
4,183✔
633
        result := make([]string, 0, len(groupSet))
4,183✔
634
        for k := range groupSet {
6,189✔
635
                result = append(result, k)
2,006✔
636
        }
2,006✔
637
        sort.Strings(result)
4,183✔
638
        return result
4,183✔
639
}
640

641
func getTaskListTypeTag(taskListType int) metrics.Tag {
14,154✔
642
        switch taskListType {
14,154✔
643
        case persistence.TaskListTypeActivity:
5,034✔
644
                return taskListActivityTypeTag
5,034✔
645
        case persistence.TaskListTypeDecision:
9,120✔
646
                return taskListDecisionTypeTag
9,120✔
647
        default:
×
648
                return metrics.TaskListTypeTag("")
×
649
        }
650
}
651

652
func createServiceBusyError(msg string) *types.ServiceBusyError {
×
653
        return &types.ServiceBusyError{Message: msg}
×
654
}
×
655

656
func rangeIDToTaskIDBlock(rangeID, rangeSize int64) taskIDBlock {
3,705✔
657
        return taskIDBlock{
3,705✔
658
                start: (rangeID-1)*rangeSize + 1,
3,705✔
659
                end:   rangeID * rangeSize,
3,705✔
660
        }
3,705✔
661
}
3,705✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc