• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01902805-1940-414a-a712-360172dfc37f

17 Jun 2024 09:05PM UTC coverage: 71.431% (-0.02%) from 71.451%
01902805-1940-414a-a712-360172dfc37f

push

buildkite

web-flow
Bump docker/build-push-action to v5 (#6134)

106521 of 149124 relevant lines covered (71.43%)

2596.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.33
/service/matching/tasklist/task_reader.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
4

5
// Permission is hereby granted, free of charge, to any person obtaining a copy
6
// of this software and associated documentation files (the "Software"), to deal
7
// in the Software without restriction, including without limitation the rights
8
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
// copies of the Software, and to permit persons to whom the Software is
10
// furnished to do so, subject to the following conditions:
11
//
12
// The above copyright notice and this permission notice shall be included in all
13
// copies or substantial portions of the Software.
14
//
15
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
// SOFTWARE.
22

23
package tasklist
24

25
import (
26
        "context"
27
        "errors"
28
        "runtime"
29
        "sync"
30
        "sync/atomic"
31
        "time"
32

33
        "github.com/uber/cadence/common/backoff"
34
        "github.com/uber/cadence/common/cache"
35
        "github.com/uber/cadence/common/clock"
36
        "github.com/uber/cadence/common/cluster"
37
        "github.com/uber/cadence/common/log"
38
        "github.com/uber/cadence/common/log/tag"
39
        "github.com/uber/cadence/common/messaging"
40
        "github.com/uber/cadence/common/metrics"
41
        "github.com/uber/cadence/common/persistence"
42
        "github.com/uber/cadence/common/types"
43
        "github.com/uber/cadence/service/matching/config"
44
)
45

46
var epochStartTime = time.Unix(0, 0)
47

48
const (
49
        defaultTaskBufferIsolationGroup = "" // a task buffer which is not using an isolation group
50
)
51

52
type (
53
        taskReader struct {
54

55
                // taskBuffers: This is the in-memory queue of tasks for dispatch
56
                // that are enqueued for pollers to pickup. It's written to by
57
                // - getTasksPump - the primary means of loading async matching tasks
58
                // - task dispatch redirection - when a task is redirected from another isolation group
59
                taskBuffers     map[string]chan *persistence.TaskInfo
60
                notifyC         chan struct{} // Used as signal to notify pump of new tasks
61
                tlMgr           *taskListManagerImpl
62
                taskListID      *Identifier
63
                config          *config.TaskListConfig
64
                db              *taskListDB
65
                taskWriter      *taskWriter
66
                taskGC          *taskGC
67
                taskAckManager  messaging.AckManager
68
                domainCache     cache.DomainCache
69
                clusterMetadata cluster.Metadata
70
                timeSource      clock.TimeSource
71
                // The cancel objects are to cancel the ratelimiter Wait in dispatchBufferedTasks. The ideal
72
                // approach is to use request-scoped contexts and use a unique one for each call to Wait. However
73
                // in order to cancel it on shutdown, we need a new goroutine for each call that would wait on
74
                // the shutdown channel. To optimize on efficiency, we instead create one and tag it on the struct
75
                // so the cancel can be called directly on shutdown.
76
                cancelCtx                context.Context
77
                cancelFunc               context.CancelFunc
78
                stopped                  int64 // set to 1 if the reader is stopped or is shutting down
79
                logger                   log.Logger
80
                scope                    metrics.Scope
81
                throttleRetry            *backoff.ThrottleRetry
82
                handleErr                func(error) error
83
                onFatalErr               func()
84
                dispatchTask             func(context.Context, *InternalTask) error
85
                getIsolationGroupForTask func(context.Context, *persistence.TaskInfo) (string, error)
86
                ratePerSecond            func() float64
87

88
                // stopWg is used to wait for all dispatchers to stop.
89
                stopWg sync.WaitGroup
90
        }
91
)
92

93
func newTaskReader(tlMgr *taskListManagerImpl, isolationGroups []string) *taskReader {
1,362✔
94
        ctx, cancel := context.WithCancel(context.Background())
1,362✔
95
        taskBuffers := make(map[string]chan *persistence.TaskInfo)
1,362✔
96
        taskBuffers[defaultTaskBufferIsolationGroup] = make(chan *persistence.TaskInfo, tlMgr.config.GetTasksBatchSize()-1)
1,362✔
97
        for _, g := range isolationGroups {
1,368✔
98
                taskBuffers[g] = make(chan *persistence.TaskInfo, tlMgr.config.GetTasksBatchSize()-1)
6✔
99
        }
6✔
100
        return &taskReader{
1,362✔
101
                tlMgr:          tlMgr,
1,362✔
102
                taskListID:     tlMgr.taskListID,
1,362✔
103
                config:         tlMgr.config,
1,362✔
104
                db:             tlMgr.db,
1,362✔
105
                taskWriter:     tlMgr.taskWriter,
1,362✔
106
                taskGC:         tlMgr.taskGC,
1,362✔
107
                taskAckManager: tlMgr.taskAckManager,
1,362✔
108
                cancelCtx:      ctx,
1,362✔
109
                cancelFunc:     cancel,
1,362✔
110
                notifyC:        make(chan struct{}, 1),
1,362✔
111
                // we always dequeue the head of the buffer and try to dispatch it to a poller
1,362✔
112
                // so allocate one less than desired target buffer size
1,362✔
113
                taskBuffers:              taskBuffers,
1,362✔
114
                domainCache:              tlMgr.domainCache,
1,362✔
115
                clusterMetadata:          tlMgr.clusterMetadata,
1,362✔
116
                timeSource:               tlMgr.timeSource,
1,362✔
117
                logger:                   tlMgr.logger,
1,362✔
118
                scope:                    tlMgr.scope,
1,362✔
119
                handleErr:                tlMgr.handleErr,
1,362✔
120
                onFatalErr:               tlMgr.Stop,
1,362✔
121
                dispatchTask:             tlMgr.DispatchTask,
1,362✔
122
                getIsolationGroupForTask: tlMgr.getIsolationGroupForTask,
1,362✔
123
                ratePerSecond:            tlMgr.matcher.Rate,
1,362✔
124
                throttleRetry: backoff.NewThrottleRetry(
1,362✔
125
                        backoff.WithRetryPolicy(persistenceOperationRetryPolicy),
1,362✔
126
                        backoff.WithRetryableError(persistence.IsTransientError),
1,362✔
127
                ),
1,362✔
128
        }
1,362✔
129
}
130

131
func (tr *taskReader) Start() {
1,352✔
132
        tr.Signal()
1,352✔
133
        for g := range tr.taskBuffers {
2,704✔
134
                g := g
1,352✔
135
                tr.stopWg.Add(1)
1,352✔
136
                go func() {
2,704✔
137
                        defer tr.stopWg.Done()
1,352✔
138
                        tr.dispatchBufferedTasks(g)
1,352✔
139
                }()
1,352✔
140
        }
141
        tr.stopWg.Add(1)
1,352✔
142
        go func() {
2,704✔
143
                defer tr.stopWg.Done()
1,352✔
144
                tr.getTasksPump()
1,352✔
145
        }()
1,352✔
146
}
147

148
func (tr *taskReader) Stop() {
1,340✔
149
        if atomic.CompareAndSwapInt64(&tr.stopped, 0, 1) {
2,680✔
150
                tr.cancelFunc()
1,340✔
151
                if err := tr.persistAckLevel(); err != nil {
1,340✔
152
                        tr.logger.Error("Persistent store operation failure",
×
153
                                tag.StoreOperationUpdateTaskList,
×
154
                                tag.Error(err))
×
155
                }
×
156
                tr.taskGC.RunNow(tr.taskAckManager.GetAckLevel())
1,340✔
157
                tr.stopWg.Wait()
1,340✔
158
        }
159
}
160

161
func (tr *taskReader) Signal() {
8,000✔
162
        var event struct{}
8,000✔
163
        select {
8,000✔
164
        case tr.notifyC <- event:
6,767✔
165
        default: // channel already has an event, don't block
1,233✔
166
        }
167
}
168

169
func (tr *taskReader) dispatchBufferedTasks(isolationGroup string) {
1,355✔
170
dispatchLoop:
1,355✔
171
        for {
3,195✔
172
                select {
1,840✔
173
                case taskInfo, ok := <-tr.taskBuffers[isolationGroup]:
748✔
174
                        if !ok { // Task list getTasks pump is shutdown
748✔
175
                                break dispatchLoop
×
176
                        }
177
                        breakDispatchLoop := tr.dispatchSingleTaskFromBufferWithRetries(isolationGroup, taskInfo)
748✔
178
                        if breakDispatchLoop {
1,011✔
179
                                // shutting down
263✔
180
                                break dispatchLoop
263✔
181
                        }
182
                case <-tr.cancelCtx.Done():
1,080✔
183
                        break dispatchLoop
1,080✔
184
                }
185
        }
186
}
187

188
func (tr *taskReader) getTasksPump() {
1,352✔
189
        updateAckTimer := time.NewTimer(tr.config.UpdateAckInterval())
1,352✔
190
        defer updateAckTimer.Stop()
1,352✔
191
getTasksPumpLoop:
1,352✔
192
        for {
13,308✔
193
                select {
11,956✔
194
                case <-tr.cancelCtx.Done():
1,338✔
195
                        break getTasksPumpLoop
1,338✔
196
                case <-tr.notifyC:
6,764✔
197
                        {
13,528✔
198
                                tasks, readLevel, isReadBatchDone, err := tr.getTaskBatch()
6,764✔
199
                                if err != nil {
6,764✔
200
                                        tr.Signal() // re-enqueue the event
×
201
                                        // TODO: Should we ever stop retrying on db errors?
×
202
                                        continue getTasksPumpLoop
×
203
                                }
204

205
                                if len(tasks) == 0 {
12,720✔
206
                                        tr.taskAckManager.SetReadLevel(readLevel)
5,956✔
207
                                        if !isReadBatchDone {
5,956✔
208
                                                tr.Signal()
×
209
                                        }
×
210
                                        continue getTasksPumpLoop
5,956✔
211
                                }
212

213
                                if !tr.addTasksToBuffer(tasks) {
810✔
214
                                        break getTasksPumpLoop
2✔
215
                                }
216
                                // There maybe more tasks. We yield now, but signal pump to check again later.
217
                                tr.Signal()
806✔
218
                        }
219
                case <-updateAckTimer.C:
3,842✔
220
                        {
7,684✔
221
                                ackLevel := tr.taskAckManager.GetAckLevel()
3,842✔
222
                                if size, err := tr.db.GetTaskListSize(ackLevel); err == nil {
7,684✔
223
                                        tr.scope.Tagged(getTaskListTypeTag(tr.taskListID.GetType())).
3,842✔
224
                                                UpdateGauge(metrics.TaskCountPerTaskListGauge, float64(size))
3,842✔
225
                                }
3,842✔
226
                                if err := tr.handleErr(tr.persistAckLevel()); err != nil {
3,842✔
227
                                        tr.logger.Error("Persistent store operation failure",
×
228
                                                tag.StoreOperationUpdateTaskList,
×
229
                                                tag.Error(err))
×
230
                                        // keep going as saving ack is not critical
×
231
                                }
×
232
                                tr.Signal() // periodically signal pump to check persistence for tasks
3,842✔
233
                                updateAckTimer = time.NewTimer(tr.config.UpdateAckInterval())
3,842✔
234
                        }
235
                }
236
                scope := tr.scope.Tagged(getTaskListTypeTag(tr.taskListID.GetType()))
4,648✔
237
                scope.UpdateGauge(metrics.TaskBacklogPerTaskListGauge, float64(tr.taskAckManager.GetBacklogCount()))
4,648✔
238
        }
239
}
240

241
func (tr *taskReader) getTaskBatchWithRange(readLevel int64, maxReadLevel int64) ([]*persistence.TaskInfo, error) {
851✔
242
        var response *persistence.GetTasksResponse
851✔
243
        op := func() (err error) {
1,702✔
244
                response, err = tr.db.GetTasks(readLevel, maxReadLevel, tr.config.GetTasksBatchSize())
851✔
245
                return
851✔
246
        }
851✔
247
        err := tr.throttleRetry.Do(context.Background(), op)
851✔
248
        if err != nil {
851✔
249
                tr.logger.Error("Persistent store operation failure",
×
250
                        tag.StoreOperationGetTasks,
×
251
                        tag.Error(err),
×
252
                        tag.WorkflowTaskListName(tr.taskListID.GetName()),
×
253
                        tag.WorkflowTaskListType(tr.taskListID.GetType()))
×
254
                return nil, err
×
255
        }
×
256
        return response.Tasks, nil
851✔
257
}
258

259
// Returns a batch of tasks from persistence starting form current read level.
260
// Also return a number that can be used to update readLevel
261
// Also return a bool to indicate whether read is finished
262
func (tr *taskReader) getTaskBatch() ([]*persistence.TaskInfo, int64, bool, error) {
6,769✔
263
        var tasks []*persistence.TaskInfo
6,769✔
264
        readLevel := tr.taskAckManager.GetReadLevel()
6,769✔
265
        maxReadLevel := tr.taskWriter.GetMaxReadLevel()
6,769✔
266

6,769✔
267
        // counter i is used to break and let caller check whether tasklist is still alive and need resume read.
6,769✔
268
        for i := 0; i < 10 && readLevel < maxReadLevel; i++ {
7,620✔
269
                upper := readLevel + tr.config.RangeSize
851✔
270
                if upper > maxReadLevel {
1,658✔
271
                        upper = maxReadLevel
807✔
272
                }
807✔
273
                tasks, err := tr.getTaskBatchWithRange(readLevel, upper)
851✔
274
                if err != nil {
851✔
275
                        return nil, readLevel, true, err
×
276
                }
×
277
                // return as long as it grabs any tasks
278
                if len(tasks) > 0 {
1,661✔
279
                        return tasks, upper, true, nil
810✔
280
                }
810✔
281
                readLevel = upper
41✔
282
        }
283
        return tasks, readLevel, readLevel == maxReadLevel, nil // caller will update readLevel when no task grabbed
5,959✔
284
}
285

286
func (tr *taskReader) isTaskExpired(t *persistence.TaskInfo) bool {
836✔
287
        return t.Expiry.After(epochStartTime) && tr.timeSource.Now().After(t.Expiry)
836✔
288
}
836✔
289

290
func (tr *taskReader) addTasksToBuffer(tasks []*persistence.TaskInfo) bool {
810✔
291
        for _, t := range tasks {
1,646✔
292
                if !tr.addSingleTaskToBuffer(t) {
838✔
293
                        return false // we are shutting down the task list
2✔
294
                }
2✔
295
        }
296
        return true
808✔
297
}
298

299
func (tr *taskReader) addSingleTaskToBuffer(task *persistence.TaskInfo) bool {
836✔
300
        if tr.isTaskExpired(task) {
859✔
301
                tr.scope.IncCounter(metrics.ExpiredTasksPerTaskListCounter)
23✔
302
                // Also increment readLevel for expired tasks otherwise it could result in
23✔
303
                // looping over the same tasks if all tasks read in the batch are expired
23✔
304
                tr.taskAckManager.SetReadLevel(task.TaskID)
23✔
305
                return true
23✔
306
        }
23✔
307
        err := tr.taskAckManager.ReadItem(task.TaskID)
813✔
308
        if err != nil {
813✔
309
                tr.logger.Fatal("critical bug when adding item to ackManager", tag.Error(err))
×
310
        }
×
311
        isolationGroup, err := tr.getIsolationGroupForTask(tr.cancelCtx, task)
813✔
312
        if err != nil {
813✔
313
                // it only errors when the tasklist is a sticky tasklist and
×
314
                // the sticky pollers are not available, in this case, we just complete the task
×
315
                // and let the decision get timed out and rescheduled to non-sticky tasklist
×
316
                if err == _stickyPollerUnavailableError {
×
317
                        tr.completeTask(task, nil)
×
318
                } else {
×
319
                        // it should never happen, unless there is a bug in 'getIsolationGroupForTask' method
×
320
                        tr.logger.Error("taskReader: unexpected error getting isolation group", tag.Error(err))
×
321
                        tr.completeTask(task, err)
×
322
                }
×
323
                return true
×
324
        }
325
        select {
813✔
326
        case tr.taskBuffers[isolationGroup] <- task:
811✔
327
                return true
811✔
328
        case <-tr.cancelCtx.Done():
2✔
329
                return false
2✔
330
        }
331
}
332

333
func (tr *taskReader) persistAckLevel() error {
5,182✔
334
        ackLevel := tr.taskAckManager.GetAckLevel()
5,182✔
335
        if ackLevel >= 0 {
10,364✔
336
                maxReadLevel := tr.taskWriter.GetMaxReadLevel()
5,182✔
337
                scope := tr.scope.Tagged(getTaskListTypeTag(tr.taskListID.GetType()))
5,182✔
338
                // note: this metrics is only an estimation for the lag. taskID in DB may not be continuous,
5,182✔
339
                // especially when task list ownership changes.
5,182✔
340
                scope.UpdateGauge(metrics.TaskLagPerTaskListGauge, float64(maxReadLevel-ackLevel))
5,182✔
341

5,182✔
342
                return tr.db.UpdateState(ackLevel)
5,182✔
343
        }
5,182✔
344
        return nil
×
345
}
346

347
// completeTask marks a task as processed. Only tasks created by taskReader (i.e. backlog from db) reach
348
// here. As part of completion:
349
//   - task is deleted from the database when err is nil
350
//   - new task is created and current task is deleted when err is not nil
351
func (tr *taskReader) completeTask(task *persistence.TaskInfo, err error) {
485✔
352
        if err != nil {
485✔
353
                // failed to start the task.
×
354
                // We cannot just remove it from persistence because then it will be lost.
×
355
                // We handle this by writing the task back to persistence with a higher taskID.
×
356
                // This will allow subsequent tasks to make progress, and hopefully by the time this task is picked-up
×
357
                // again the underlying reason for failing to start will be resolved.
×
358
                // Note that RecordTaskStarted only fails after retrying for a long time, so a single task will not be
×
359
                // re-written to persistence frequently.
×
360
                op := func() error {
×
361
                        _, err := tr.taskWriter.appendTask(task)
×
362
                        return err
×
363
                }
×
364
                err = tr.throttleRetry.Do(context.Background(), op)
×
365
                if err != nil {
×
366
                        // OK, we also failed to write to persistence.
×
367
                        // This should only happen in very extreme cases where persistence is completely down.
×
368
                        // We still can't lose the old task so we just unload the entire task list
×
369
                        tr.logger.Error("Failed to complete task", tag.Error(err))
×
370
                        tr.onFatalErr()
×
371
                        return
×
372
                }
×
373
                tr.Signal()
×
374
        }
375
        ackLevel := tr.taskAckManager.AckItem(task.TaskID)
485✔
376
        tr.taskGC.Run(ackLevel)
485✔
377
}
378

379
func (tr *taskReader) newDispatchContext(isolationGroup string) (context.Context, context.CancelFunc) {
12,158✔
380
        rps := tr.ratePerSecond()
12,158✔
381
        if isolationGroup != "" || rps > 1e-7 { // 1e-7 is a random number chosen to avoid overflow, normally user don't set such a low rps
24,316✔
382
                // this is the minimum timeout required to dispatch a task, if the timeout value is smaller than this
12,158✔
383
                // async task dispatch can be completely throttled, which could happen when ratePerSecond is pretty low
12,158✔
384
                minTimeout := time.Duration(float64(len(tr.taskBuffers))/rps) * time.Second
12,158✔
385
                timeout := tr.config.AsyncTaskDispatchTimeout()
12,158✔
386
                if timeout < minTimeout {
12,158✔
387
                        timeout = minTimeout
×
388
                }
×
389
                domainEntry, err := tr.domainCache.GetDomainByID(tr.taskListID.GetDomainID())
12,158✔
390
                if err != nil {
12,158✔
391
                        // we don't know if the domain is active in the current cluster, assume it is active and set the timeout
×
392
                        return context.WithTimeout(tr.cancelCtx, timeout)
×
393
                }
×
394
                if _, err := domainEntry.IsActiveIn(tr.clusterMetadata.GetCurrentClusterName()); err == nil {
24,316✔
395
                        // if the domain is active in the current cluster, set the timeout
12,158✔
396
                        return context.WithTimeout(tr.cancelCtx, timeout)
12,158✔
397
                }
12,158✔
398
        }
399
        return tr.cancelCtx, func() {}
×
400
}
401

402
func (tr *taskReader) dispatchSingleTaskFromBufferWithRetries(isolationGroup string, taskInfo *persistence.TaskInfo) (breakDispatchLoop bool) {
748✔
403
        // retry loop for dispatching a single task
748✔
404
        for {
12,884✔
405
                breakDispatchLoop, breakRetryLoop := tr.dispatchSingleTaskFromBuffer(isolationGroup, taskInfo)
12,136✔
406
                if breakRetryLoop {
12,884✔
407
                        return breakDispatchLoop
748✔
408
                }
748✔
409
        }
410
}
411

412
func (tr *taskReader) dispatchSingleTaskFromBuffer(isolationGroup string, taskInfo *persistence.TaskInfo) (breakDispatchLoop bool, breakRetries bool) {
12,158✔
413
        task := newInternalTask(taskInfo, tr.completeTask, types.TaskSourceDbBacklog, "", false, nil, isolationGroup)
12,158✔
414
        dispatchCtx, cancel := tr.newDispatchContext(isolationGroup)
12,158✔
415
        timerScope := tr.scope.StartTimer(metrics.AsyncMatchLatencyPerTaskList)
12,158✔
416
        err := tr.dispatchTask(dispatchCtx, task)
12,158✔
417
        timerScope.Stop()
12,158✔
418
        cancel()
12,158✔
419

12,158✔
420
        if err == nil {
12,643✔
421
                return false, true
485✔
422
        }
485✔
423

424
        if errors.Is(err, context.Canceled) {
11,936✔
425
                tr.logger.Info("Tasklist manager context is cancelled, shutting down")
263✔
426
                return true, true
263✔
427
        }
263✔
428

429
        if errors.Is(err, context.DeadlineExceeded) {
22,819✔
430
                // it only happens when isolation is enabled and there is no pollers from the given isolation group
11,409✔
431
                // if this happens, we don't want to block the task dispatching, because there might be pollers from
11,409✔
432
                // other isolation groups, we just simply continue and dispatch the task to a new isolation group which
11,409✔
433
                // has pollers
11,409✔
434
                tr.logger.Warn("Async task dispatch timed out",
11,409✔
435
                        tag.IsolationGroup(isolationGroup),
11,409✔
436
                        tag.WorkflowRunID(taskInfo.RunID),
11,409✔
437
                        tag.WorkflowID(taskInfo.WorkflowID),
11,409✔
438
                        tag.TaskID(taskInfo.TaskID),
11,409✔
439
                        tag.Error(err),
11,409✔
440
                        tag.WorkflowDomainID(taskInfo.DomainID),
11,409✔
441
                )
11,409✔
442
                tr.scope.IncCounter(metrics.AsyncMatchDispatchTimeoutCounterPerTaskList)
11,409✔
443

11,409✔
444
                // the idea here is that by re-fetching the isolation-groups, if something has shifted
11,409✔
445
                // it will get a new isolation group to be placed. If it needs re-routing, then
11,409✔
446
                // this will be the new routing destination.
11,409✔
447
                group, err := tr.getIsolationGroupForTask(tr.cancelCtx, taskInfo)
11,409✔
448
                if err != nil {
11,409✔
449
                        // it only errors when the tasklist is a sticky tasklist and
×
450
                        // the sticky pollers are not available, in this case, we just complete the task
×
451
                        // and let the decision get timed out and rescheduled to non-sticky tasklist
×
452
                        if err == _stickyPollerUnavailableError {
×
453
                                tr.completeTask(taskInfo, nil)
×
454
                                return false, true
×
455
                        }
×
456
                        // it should never happen, unless there is a bug in 'getIsolationGroupForTask' method
457
                        tr.logger.Error("taskReader: unexpected error getting isolation group",
×
458
                                tag.Error(err),
×
459
                                tag.IsolationGroup(group))
×
460
                        tr.completeTask(taskInfo, err)
×
461
                        return false, true
×
462
                }
463

464
                if group == isolationGroup {
22,797✔
465
                        // no change, retry to dispatch the task again
11,388✔
466
                        return false, false
11,388✔
467
                }
11,388✔
468

469
                // ensure the isolation group is configured and available
470
                _, taskGroupReaderIsPresent := tr.taskBuffers[group]
21✔
471
                if !taskGroupReaderIsPresent {
31✔
472
                        // there's a programmatic error. Something has gone wrong with tasklist instantiation
10✔
473
                        // don't block and redirect to the default group
10✔
474
                        tr.scope.IncCounter(metrics.BufferIsolationGroupRedirectFailureCounter)
10✔
475
                        tr.logger.Error("An isolation group buffer was misconfigured and couldn't be found. Redirecting to default",
10✔
476
                                tag.Dynamic("redirection-from-isolation-group", isolationGroup),
10✔
477
                                tag.Dynamic("redirection-to-isolation-group", group),
10✔
478
                                tag.IsolationGroup(group),
10✔
479
                                tag.WorkflowRunID(taskInfo.RunID),
10✔
480
                                tag.WorkflowID(taskInfo.WorkflowID),
10✔
481
                                tag.TaskID(taskInfo.TaskID),
10✔
482
                                tag.WorkflowDomainID(taskInfo.DomainID),
10✔
483
                        )
10✔
484

10✔
485
                        select {
10✔
486
                        case <-tr.cancelCtx.Done():
×
487
                                // the task reader is shutting down
×
488
                                return true, true
×
489
                        case tr.taskBuffers[defaultTaskBufferIsolationGroup] <- taskInfo:
9✔
490
                                // task successfully rerouted to default tasklist
9✔
491
                                return false, true
9✔
492
                        default:
1✔
493
                                // couldn't redirect, loop and try again
1✔
494
                                return false, false
1✔
495
                        }
496
                }
497

498
                // if there is no poller in the isolation group or the isolation group is drained,
499
                // we want to redistribute the tasks to other isolation groups in this case to drain
500
                // the backlog.
501
                select {
11✔
502
                case <-tr.cancelCtx.Done():
×
503
                        // the task reader is shutting down
×
504
                        return true, true
×
505
                case tr.taskBuffers[group] <- taskInfo:
9✔
506
                        // successful redirect
9✔
507
                        tr.scope.IncCounter(metrics.BufferIsolationGroupRedirectCounter)
9✔
508
                        tr.logger.Warn("some tasks were redirected to another isolation group.",
9✔
509
                                tag.Dynamic("redirection-from-isolation-group", isolationGroup),
9✔
510
                                tag.Dynamic("redirection-to-isolation-group", group),
9✔
511
                                tag.WorkflowRunID(taskInfo.RunID),
9✔
512
                                tag.WorkflowID(taskInfo.WorkflowID),
9✔
513
                                tag.TaskID(taskInfo.TaskID),
9✔
514
                                tag.WorkflowDomainID(taskInfo.DomainID),
9✔
515
                        )
9✔
516
                        return false, true
9✔
517
                default:
2✔
518
                        tr.scope.IncCounter(metrics.BufferIsolationGroupRedirectFailureCounter)
2✔
519
                        tr.logger.Error("some tasks could not be redirected to another isolation group as the buffer's already full",
2✔
520
                                tag.WorkflowRunID(taskInfo.RunID),
2✔
521
                                tag.Dynamic("redirection-from-isolation-group", isolationGroup),
2✔
522
                                tag.Dynamic("redirection-to-isolation-group", group),
2✔
523
                                tag.WorkflowID(taskInfo.WorkflowID),
2✔
524
                                tag.TaskID(taskInfo.TaskID),
2✔
525
                                tag.WorkflowDomainID(taskInfo.DomainID),
2✔
526
                        )
2✔
527
                        // the task async buffers on the other isolation-group are already full, wait and retry
2✔
528
                        return false, false
2✔
529
                }
530
        }
531

532
        if errors.Is(err, ErrTasklistThrottled) {
2✔
533
                tr.scope.IncCounter(metrics.BufferThrottlePerTaskListCounter)
1✔
534
                runtime.Gosched()
1✔
535
                return false, false
1✔
536
        }
1✔
537

538
        tr.scope.IncCounter(metrics.BufferUnknownTaskDispatchError)
×
539
        tr.logger.Error("unknown error while dispatching task",
×
540
                tag.Error(err),
×
541
                tag.IsolationGroup(isolationGroup),
×
542
                tag.WorkflowRunID(taskInfo.RunID),
×
543
                tag.WorkflowID(taskInfo.WorkflowID),
×
544
                tag.TaskID(taskInfo.TaskID),
×
545
                tag.WorkflowDomainID(taskInfo.DomainID),
×
546
        )
×
547
        return false, false
×
548
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc