• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 01905fb8-bb8f-4e40-b7b8-0b7dc522c2e4

28 Jun 2024 04:40PM UTC coverage: 71.438% (+0.006%) from 71.432%
01905fb8-bb8f-4e40-b7b8-0b7dc522c2e4

push

buildkite

web-flow
Bugfix: replication messaged dropped during host shutdown (#6143)

What changed?

Internal details: CDNC-9597

A user reported some problems during a failover in which a workflow, during a continue-as-new event got dropped during replication silently, without any corresponding DLQ message. We were able to track down the (expected) cause to likely have been a shard movement during that time which triggers several unpleasant edge-conditions with interactions with the following:

A silent dropping of errors due to the shard closing
The in-memory offsets being advanced incorrectly during shard-closed events due to some missing control-flow handling
The GetReplication API call actually being a write call and tracking offsets (perhaps somewhat surprisingly, I really don't like this API).
The GetReplicationCall being called, with these invalid parameters during a shard-closing event.
How did you test it?

Tested locally and with unit tests. Was able to repro the sequence of events mostly with unit-tests.

14 of 15 new or added lines in 1 file covered. (93.33%)

34 existing lines in 11 files now uncovered.

104697 of 146557 relevant lines covered (71.44%)

2650.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.86
/service/matching/tasklist/task_reader.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
4

5
// Permission is hereby granted, free of charge, to any person obtaining a copy
6
// of this software and associated documentation files (the "Software"), to deal
7
// in the Software without restriction, including without limitation the rights
8
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
// copies of the Software, and to permit persons to whom the Software is
10
// furnished to do so, subject to the following conditions:
11
//
12
// The above copyright notice and this permission notice shall be included in all
13
// copies or substantial portions of the Software.
14
//
15
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
// SOFTWARE.
22

23
package tasklist
24

25
import (
26
        "context"
27
        "errors"
28
        "runtime"
29
        "sync"
30
        "sync/atomic"
31
        "time"
32

33
        "github.com/uber/cadence/common/backoff"
34
        "github.com/uber/cadence/common/cache"
35
        "github.com/uber/cadence/common/clock"
36
        "github.com/uber/cadence/common/cluster"
37
        "github.com/uber/cadence/common/log"
38
        "github.com/uber/cadence/common/log/tag"
39
        "github.com/uber/cadence/common/messaging"
40
        "github.com/uber/cadence/common/metrics"
41
        "github.com/uber/cadence/common/persistence"
42
        "github.com/uber/cadence/common/types"
43
        "github.com/uber/cadence/service/matching/config"
44
)
45

46
var epochStartTime = time.Unix(0, 0)
47

48
const (
49
        defaultTaskBufferIsolationGroup = "" // a task buffer which is not using an isolation group
50
)
51

52
type (
53
        taskReader struct {
54

55
                // taskBuffers: This is the in-memory queue of tasks for dispatch
56
                // that are enqueued for pollers to pickup. It's written to by
57
                // - getTasksPump - the primary means of loading async matching tasks
58
                // - task dispatch redirection - when a task is redirected from another isolation group
59
                taskBuffers     map[string]chan *persistence.TaskInfo
60
                notifyC         chan struct{} // Used as signal to notify pump of new tasks
61
                tlMgr           *taskListManagerImpl
62
                taskListID      *Identifier
63
                config          *config.TaskListConfig
64
                db              *taskListDB
65
                taskWriter      *taskWriter
66
                taskGC          *taskGC
67
                taskAckManager  messaging.AckManager
68
                domainCache     cache.DomainCache
69
                clusterMetadata cluster.Metadata
70
                timeSource      clock.TimeSource
71
                // The cancel objects are to cancel the ratelimiter Wait in dispatchBufferedTasks. The ideal
72
                // approach is to use request-scoped contexts and use a unique one for each call to Wait. However
73
                // in order to cancel it on shutdown, we need a new goroutine for each call that would wait on
74
                // the shutdown channel. To optimize on efficiency, we instead create one and tag it on the struct
75
                // so the cancel can be called directly on shutdown.
76
                cancelCtx                context.Context
77
                cancelFunc               context.CancelFunc
78
                stopped                  int64 // set to 1 if the reader is stopped or is shutting down
79
                logger                   log.Logger
80
                scope                    metrics.Scope
81
                throttleRetry            *backoff.ThrottleRetry
82
                handleErr                func(error) error
83
                onFatalErr               func()
84
                dispatchTask             func(context.Context, *InternalTask) error
85
                getIsolationGroupForTask func(context.Context, *persistence.TaskInfo) (string, error)
86
                ratePerSecond            func() float64
87

88
                // stopWg is used to wait for all dispatchers to stop.
89
                stopWg sync.WaitGroup
90
        }
91
)
92

93
func newTaskReader(tlMgr *taskListManagerImpl, isolationGroups []string) *taskReader {
1,366✔
94
        ctx, cancel := context.WithCancel(context.Background())
1,366✔
95
        taskBuffers := make(map[string]chan *persistence.TaskInfo)
1,366✔
96
        taskBuffers[defaultTaskBufferIsolationGroup] = make(chan *persistence.TaskInfo, tlMgr.config.GetTasksBatchSize()-1)
1,366✔
97
        for _, g := range isolationGroups {
1,372✔
98
                taskBuffers[g] = make(chan *persistence.TaskInfo, tlMgr.config.GetTasksBatchSize()-1)
6✔
99
        }
6✔
100
        return &taskReader{
1,366✔
101
                tlMgr:          tlMgr,
1,366✔
102
                taskListID:     tlMgr.taskListID,
1,366✔
103
                config:         tlMgr.config,
1,366✔
104
                db:             tlMgr.db,
1,366✔
105
                taskWriter:     tlMgr.taskWriter,
1,366✔
106
                taskGC:         tlMgr.taskGC,
1,366✔
107
                taskAckManager: tlMgr.taskAckManager,
1,366✔
108
                cancelCtx:      ctx,
1,366✔
109
                cancelFunc:     cancel,
1,366✔
110
                notifyC:        make(chan struct{}, 1),
1,366✔
111
                // we always dequeue the head of the buffer and try to dispatch it to a poller
1,366✔
112
                // so allocate one less than desired target buffer size
1,366✔
113
                taskBuffers:              taskBuffers,
1,366✔
114
                domainCache:              tlMgr.domainCache,
1,366✔
115
                clusterMetadata:          tlMgr.clusterMetadata,
1,366✔
116
                timeSource:               tlMgr.timeSource,
1,366✔
117
                logger:                   tlMgr.logger,
1,366✔
118
                scope:                    tlMgr.scope,
1,366✔
119
                handleErr:                tlMgr.handleErr,
1,366✔
120
                onFatalErr:               tlMgr.Stop,
1,366✔
121
                dispatchTask:             tlMgr.DispatchTask,
1,366✔
122
                getIsolationGroupForTask: tlMgr.getIsolationGroupForTask,
1,366✔
123
                ratePerSecond:            tlMgr.matcher.Rate,
1,366✔
124
                throttleRetry: backoff.NewThrottleRetry(
1,366✔
125
                        backoff.WithRetryPolicy(persistenceOperationRetryPolicy),
1,366✔
126
                        backoff.WithRetryableError(persistence.IsTransientError),
1,366✔
127
                ),
1,366✔
128
        }
1,366✔
129
}
130

131
func (tr *taskReader) Start() {
1,356✔
132
        tr.Signal()
1,356✔
133
        for g := range tr.taskBuffers {
2,712✔
134
                g := g
1,356✔
135
                tr.stopWg.Add(1)
1,356✔
136
                go func() {
2,712✔
137
                        defer tr.stopWg.Done()
1,356✔
138
                        tr.dispatchBufferedTasks(g)
1,356✔
139
                }()
1,356✔
140
        }
141
        tr.stopWg.Add(1)
1,356✔
142
        go func() {
2,712✔
143
                defer tr.stopWg.Done()
1,356✔
144
                tr.getTasksPump()
1,356✔
145
        }()
1,356✔
146
}
147

148
func (tr *taskReader) Stop() {
1,339✔
149
        if atomic.CompareAndSwapInt64(&tr.stopped, 0, 1) {
2,678✔
150
                tr.cancelFunc()
1,339✔
151
                if err := tr.persistAckLevel(); err != nil {
1,339✔
UNCOV
152
                        tr.logger.Error("Persistent store operation failure",
×
UNCOV
153
                                tag.StoreOperationUpdateTaskList,
×
UNCOV
154
                                tag.Error(err))
×
UNCOV
155
                }
×
156
                tr.taskGC.RunNow(tr.taskAckManager.GetAckLevel())
1,339✔
157
                tr.stopWg.Wait()
1,339✔
158
        }
159
}
160

161
func (tr *taskReader) Signal() {
8,028✔
162
        var event struct{}
8,028✔
163
        select {
8,028✔
164
        case tr.notifyC <- event:
6,803✔
165
        default: // channel already has an event, don't block
1,225✔
166
        }
167
}
168

169
func (tr *taskReader) dispatchBufferedTasks(isolationGroup string) {
1,359✔
170
dispatchLoop:
1,359✔
171
        for {
3,241✔
172
                select {
1,882✔
173
                case taskInfo, ok := <-tr.taskBuffers[isolationGroup]:
788✔
174
                        if !ok { // Task list getTasks pump is shutdown
788✔
175
                                break dispatchLoop
×
176
                        }
177
                        breakDispatchLoop := tr.dispatchSingleTaskFromBufferWithRetries(isolationGroup, taskInfo)
788✔
178
                        if breakDispatchLoop {
1,052✔
179
                                // shutting down
264✔
180
                                break dispatchLoop
264✔
181
                        }
182
                case <-tr.cancelCtx.Done():
1,078✔
183
                        break dispatchLoop
1,078✔
184
                }
185
        }
186
}
187

188
func (tr *taskReader) getTasksPump() {
1,356✔
189
        updateAckTimer := time.NewTimer(tr.config.UpdateAckInterval())
1,356✔
190
        defer updateAckTimer.Stop()
1,356✔
191
getTasksPumpLoop:
1,356✔
192
        for {
13,290✔
193
                select {
11,934✔
194
                case <-tr.cancelCtx.Done():
1,337✔
195
                        break getTasksPumpLoop
1,337✔
196
                case <-tr.notifyC:
6,802✔
197
                        {
13,604✔
198
                                tasks, readLevel, isReadBatchDone, err := tr.getTaskBatch()
6,802✔
199
                                if err != nil {
6,802✔
200
                                        tr.Signal() // re-enqueue the event
×
201
                                        // TODO: Should we ever stop retrying on db errors?
×
202
                                        continue getTasksPumpLoop
×
203
                                }
204

205
                                if len(tasks) == 0 {
12,749✔
206
                                        tr.taskAckManager.SetReadLevel(readLevel)
5,947✔
207
                                        if !isReadBatchDone {
5,947✔
208
                                                tr.Signal()
×
209
                                        }
×
210
                                        continue getTasksPumpLoop
5,947✔
211
                                }
212

213
                                if !tr.addTasksToBuffer(tasks) {
857✔
214
                                        break getTasksPumpLoop
2✔
215
                                }
216
                                // There maybe more tasks. We yield now, but signal pump to check again later.
217
                                tr.Signal()
853✔
218
                        }
219
                case <-updateAckTimer.C:
3,778✔
220
                        {
7,556✔
221
                                ackLevel := tr.taskAckManager.GetAckLevel()
3,778✔
222
                                if size, err := tr.db.GetTaskListSize(ackLevel); err == nil {
7,556✔
223
                                        tr.scope.Tagged(getTaskListTypeTag(tr.taskListID.GetType())).
3,778✔
224
                                                UpdateGauge(metrics.TaskCountPerTaskListGauge, float64(size))
3,778✔
225
                                }
3,778✔
226
                                if err := tr.handleErr(tr.persistAckLevel()); err != nil {
3,778✔
UNCOV
227
                                        tr.logger.Error("Persistent store operation failure",
×
UNCOV
228
                                                tag.StoreOperationUpdateTaskList,
×
UNCOV
229
                                                tag.Error(err))
×
UNCOV
230
                                        // keep going as saving ack is not critical
×
UNCOV
231
                                }
×
232
                                tr.Signal() // periodically signal pump to check persistence for tasks
3,778✔
233
                                updateAckTimer = time.NewTimer(tr.config.UpdateAckInterval())
3,778✔
234
                        }
235
                }
236
                scope := tr.scope.Tagged(getTaskListTypeTag(tr.taskListID.GetType()))
4,631✔
237
                scope.UpdateGauge(metrics.TaskBacklogPerTaskListGauge, float64(tr.taskAckManager.GetBacklogCount()))
4,631✔
238
        }
239
}
240

241
func (tr *taskReader) getTaskBatchWithRange(readLevel int64, maxReadLevel int64) ([]*persistence.TaskInfo, error) {
906✔
242
        var response *persistence.GetTasksResponse
906✔
243
        op := func() (err error) {
1,812✔
244
                response, err = tr.db.GetTasks(readLevel, maxReadLevel, tr.config.GetTasksBatchSize())
906✔
245
                return
906✔
246
        }
906✔
247
        err := tr.throttleRetry.Do(context.Background(), op)
906✔
248
        if err != nil {
906✔
249
                tr.logger.Error("Persistent store operation failure",
×
250
                        tag.StoreOperationGetTasks,
×
251
                        tag.Error(err),
×
252
                        tag.WorkflowTaskListName(tr.taskListID.GetName()),
×
253
                        tag.WorkflowTaskListType(tr.taskListID.GetType()))
×
254
                return nil, err
×
255
        }
×
256
        return response.Tasks, nil
906✔
257
}
258

259
// Returns a batch of tasks from persistence starting form current read level.
260
// Also return a number that can be used to update readLevel
261
// Also return a bool to indicate whether read is finished
262
func (tr *taskReader) getTaskBatch() ([]*persistence.TaskInfo, int64, bool, error) {
6,807✔
263
        var tasks []*persistence.TaskInfo
6,807✔
264
        readLevel := tr.taskAckManager.GetReadLevel()
6,807✔
265
        maxReadLevel := tr.taskWriter.GetMaxReadLevel()
6,807✔
266

6,807✔
267
        // counter i is used to break and let caller check whether tasklist is still alive and need resume read.
6,807✔
268
        for i := 0; i < 10 && readLevel < maxReadLevel; i++ {
7,713✔
269
                upper := readLevel + tr.config.RangeSize
906✔
270
                if upper > maxReadLevel {
1,759✔
271
                        upper = maxReadLevel
853✔
272
                }
853✔
273
                tasks, err := tr.getTaskBatchWithRange(readLevel, upper)
906✔
274
                if err != nil {
906✔
275
                        return nil, readLevel, true, err
×
276
                }
×
277
                // return as long as it grabs any tasks
278
                if len(tasks) > 0 {
1,763✔
279
                        return tasks, upper, true, nil
857✔
280
                }
857✔
281
                readLevel = upper
49✔
282
        }
283
        return tasks, readLevel, readLevel == maxReadLevel, nil // caller will update readLevel when no task grabbed
5,950✔
284
}
285

286
func (tr *taskReader) isTaskExpired(t *persistence.TaskInfo) bool {
878✔
287
        return t.Expiry.After(epochStartTime) && tr.timeSource.Now().After(t.Expiry)
878✔
288
}
878✔
289

290
func (tr *taskReader) addTasksToBuffer(tasks []*persistence.TaskInfo) bool {
857✔
291
        for _, t := range tasks {
1,735✔
292
                if !tr.addSingleTaskToBuffer(t) {
880✔
293
                        return false // we are shutting down the task list
2✔
294
                }
2✔
295
        }
296
        return true
855✔
297
}
298

299
func (tr *taskReader) addSingleTaskToBuffer(task *persistence.TaskInfo) bool {
878✔
300
        if tr.isTaskExpired(task) {
901✔
301
                tr.scope.IncCounter(metrics.ExpiredTasksPerTaskListCounter)
23✔
302
                // Also increment readLevel for expired tasks otherwise it could result in
23✔
303
                // looping over the same tasks if all tasks read in the batch are expired
23✔
304
                tr.taskAckManager.SetReadLevel(task.TaskID)
23✔
305
                return true
23✔
306
        }
23✔
307
        err := tr.taskAckManager.ReadItem(task.TaskID)
855✔
308
        if err != nil {
855✔
309
                tr.logger.Fatal("critical bug when adding item to ackManager", tag.Error(err))
×
310
        }
×
311
        isolationGroup, err := tr.getIsolationGroupForTask(tr.cancelCtx, task)
855✔
312
        if err != nil {
855✔
313
                // it only errors when the tasklist is a sticky tasklist and
×
314
                // the sticky pollers are not available, in this case, we just complete the task
×
315
                // and let the decision get timed out and rescheduled to non-sticky tasklist
×
316
                if err == _stickyPollerUnavailableError {
×
317
                        tr.completeTask(task, nil)
×
318
                } else {
×
319
                        // it should never happen, unless there is a bug in 'getIsolationGroupForTask' method
×
320
                        tr.logger.Error("taskReader: unexpected error getting isolation group", tag.Error(err))
×
321
                        tr.completeTask(task, err)
×
322
                }
×
323
                return true
×
324
        }
325
        select {
855✔
326
        case tr.taskBuffers[isolationGroup] <- task:
853✔
327
                return true
853✔
328
        case <-tr.cancelCtx.Done():
2✔
329
                return false
2✔
330
        }
331
}
332

333
func (tr *taskReader) persistAckLevel() error {
5,117✔
334
        ackLevel := tr.taskAckManager.GetAckLevel()
5,117✔
335
        if ackLevel >= 0 {
10,234✔
336
                maxReadLevel := tr.taskWriter.GetMaxReadLevel()
5,117✔
337
                scope := tr.scope.Tagged(getTaskListTypeTag(tr.taskListID.GetType()))
5,117✔
338
                // note: this metrics is only an estimation for the lag. taskID in DB may not be continuous,
5,117✔
339
                // especially when task list ownership changes.
5,117✔
340
                scope.UpdateGauge(metrics.TaskLagPerTaskListGauge, float64(maxReadLevel-ackLevel))
5,117✔
341

5,117✔
342
                return tr.db.UpdateState(ackLevel)
5,117✔
343
        }
5,117✔
344
        return nil
×
345
}
346

347
// completeTask marks a task as processed. Only tasks created by taskReader (i.e. backlog from db) reach
348
// here. As part of completion:
349
//   - task is deleted from the database when err is nil
350
//   - new task is created and current task is deleted when err is not nil
351
func (tr *taskReader) completeTask(task *persistence.TaskInfo, err error) {
523✔
352
        if err != nil {
523✔
353
                // failed to start the task.
×
354
                // We cannot just remove it from persistence because then it will be lost.
×
355
                // We handle this by writing the task back to persistence with a higher taskID.
×
356
                // This will allow subsequent tasks to make progress, and hopefully by the time this task is picked-up
×
357
                // again the underlying reason for failing to start will be resolved.
×
358
                // Note that RecordTaskStarted only fails after retrying for a long time, so a single task will not be
×
359
                // re-written to persistence frequently.
×
360
                op := func() error {
×
361
                        _, err := tr.taskWriter.appendTask(task)
×
362
                        return err
×
363
                }
×
364
                err = tr.throttleRetry.Do(context.Background(), op)
×
365
                if err != nil {
×
366
                        // OK, we also failed to write to persistence.
×
367
                        // This should only happen in very extreme cases where persistence is completely down.
×
368
                        // We still can't lose the old task so we just unload the entire task list
×
369
                        tr.logger.Error("Failed to complete task", tag.Error(err))
×
370
                        tr.onFatalErr()
×
371
                        return
×
372
                }
×
373
                tr.Signal()
×
374
        }
375
        ackLevel := tr.taskAckManager.AckItem(task.TaskID)
523✔
376
        tr.taskGC.Run(ackLevel)
523✔
377
}
378

379
func (tr *taskReader) newDispatchContext(isolationGroup string) (context.Context, context.CancelFunc) {
12,054✔
380
        rps := tr.ratePerSecond()
12,054✔
381
        if isolationGroup != "" || rps > 1e-7 { // 1e-7 is a random number chosen to avoid overflow, normally user don't set such a low rps
24,108✔
382
                // this is the minimum timeout required to dispatch a task, if the timeout value is smaller than this
12,054✔
383
                // async task dispatch can be completely throttled, which could happen when ratePerSecond is pretty low
12,054✔
384
                minTimeout := time.Duration(float64(len(tr.taskBuffers))/rps) * time.Second
12,054✔
385
                timeout := tr.config.AsyncTaskDispatchTimeout()
12,054✔
386
                if timeout < minTimeout {
12,055✔
387
                        timeout = minTimeout
1✔
388
                }
1✔
389
                domainEntry, err := tr.domainCache.GetDomainByID(tr.taskListID.GetDomainID())
12,054✔
390
                if err != nil {
12,054✔
391
                        // we don't know if the domain is active in the current cluster, assume it is active and set the timeout
×
392
                        return context.WithTimeout(tr.cancelCtx, timeout)
×
393
                }
×
394
                if _, err := domainEntry.IsActiveIn(tr.clusterMetadata.GetCurrentClusterName()); err == nil {
24,108✔
395
                        // if the domain is active in the current cluster, set the timeout
12,054✔
396
                        return context.WithTimeout(tr.cancelCtx, timeout)
12,054✔
397
                }
12,054✔
398
        }
399
        return tr.cancelCtx, func() {}
×
400
}
401

402
func (tr *taskReader) dispatchSingleTaskFromBufferWithRetries(isolationGroup string, taskInfo *persistence.TaskInfo) (breakDispatchLoop bool) {
788✔
403
        // retry loop for dispatching a single task
788✔
404
        for {
12,820✔
405
                breakDispatchLoop, breakRetryLoop := tr.dispatchSingleTaskFromBuffer(isolationGroup, taskInfo)
12,032✔
406
                if breakRetryLoop {
12,819✔
407
                        return breakDispatchLoop
787✔
408
                }
787✔
409
        }
410
}
411

412
func (tr *taskReader) dispatchSingleTaskFromBuffer(isolationGroup string, taskInfo *persistence.TaskInfo) (breakDispatchLoop bool, breakRetries bool) {
12,054✔
413
        task := newInternalTask(taskInfo, tr.completeTask, types.TaskSourceDbBacklog, "", false, nil, isolationGroup)
12,054✔
414
        dispatchCtx, cancel := tr.newDispatchContext(isolationGroup)
12,054✔
415
        timerScope := tr.scope.StartTimer(metrics.AsyncMatchLatencyPerTaskList)
12,054✔
416
        err := tr.dispatchTask(dispatchCtx, task)
12,054✔
417
        timerScope.Stop()
12,054✔
418
        cancel()
12,054✔
419

12,054✔
420
        if err == nil {
12,577✔
421
                return false, true
523✔
422
        }
523✔
423

424
        if errors.Is(err, context.Canceled) {
11,794✔
425
                tr.logger.Info("Tasklist manager context is cancelled, shutting down")
264✔
426
                return true, true
264✔
427
        }
264✔
428

429
        if errors.Is(err, context.DeadlineExceeded) {
22,531✔
430
                // it only happens when isolation is enabled and there is no pollers from the given isolation group
11,265✔
431
                // if this happens, we don't want to block the task dispatching, because there might be pollers from
11,265✔
432
                // other isolation groups, we just simply continue and dispatch the task to a new isolation group which
11,265✔
433
                // has pollers
11,265✔
434
                tr.logger.Warn("Async task dispatch timed out",
11,265✔
435
                        tag.IsolationGroup(isolationGroup),
11,265✔
436
                        tag.WorkflowRunID(taskInfo.RunID),
11,265✔
437
                        tag.WorkflowID(taskInfo.WorkflowID),
11,265✔
438
                        tag.TaskID(taskInfo.TaskID),
11,265✔
439
                        tag.Error(err),
11,265✔
440
                        tag.WorkflowDomainID(taskInfo.DomainID),
11,265✔
441
                )
11,265✔
442
                tr.scope.IncCounter(metrics.AsyncMatchDispatchTimeoutCounterPerTaskList)
11,265✔
443

11,265✔
444
                // the idea here is that by re-fetching the isolation-groups, if something has shifted
11,265✔
445
                // it will get a new isolation group to be placed. If it needs re-routing, then
11,265✔
446
                // this will be the new routing destination.
11,265✔
447
                group, err := tr.getIsolationGroupForTask(tr.cancelCtx, taskInfo)
11,265✔
448
                if err != nil {
11,265✔
449
                        // it only errors when the tasklist is a sticky tasklist and
×
450
                        // the sticky pollers are not available, in this case, we just complete the task
×
451
                        // and let the decision get timed out and rescheduled to non-sticky tasklist
×
452
                        if err == _stickyPollerUnavailableError {
×
453
                                tr.completeTask(taskInfo, nil)
×
454
                                return false, true
×
455
                        }
×
456
                        // it should never happen, unless there is a bug in 'getIsolationGroupForTask' method
457
                        tr.logger.Error("taskReader: unexpected error getting isolation group",
×
458
                                tag.Error(err),
×
459
                                tag.IsolationGroup(group))
×
460
                        tr.completeTask(taskInfo, err)
×
461
                        return false, true
×
462
                }
463

464
                if group == isolationGroup {
22,509✔
465
                        // no change, retry to dispatch the task again
11,244✔
466
                        return false, false
11,244✔
467
                }
11,244✔
468

469
                // ensure the isolation group is configured and available
470
                _, taskGroupReaderIsPresent := tr.taskBuffers[group]
21✔
471
                if !taskGroupReaderIsPresent {
31✔
472
                        // there's a programmatic error. Something has gone wrong with tasklist instantiation
10✔
473
                        // don't block and redirect to the default group
10✔
474
                        tr.scope.IncCounter(metrics.BufferIsolationGroupRedirectFailureCounter)
10✔
475
                        tr.logger.Error("An isolation group buffer was misconfigured and couldn't be found. Redirecting to default",
10✔
476
                                tag.Dynamic("redirection-from-isolation-group", isolationGroup),
10✔
477
                                tag.Dynamic("redirection-to-isolation-group", group),
10✔
478
                                tag.IsolationGroup(group),
10✔
479
                                tag.WorkflowRunID(taskInfo.RunID),
10✔
480
                                tag.WorkflowID(taskInfo.WorkflowID),
10✔
481
                                tag.TaskID(taskInfo.TaskID),
10✔
482
                                tag.WorkflowDomainID(taskInfo.DomainID),
10✔
483
                        )
10✔
484

10✔
485
                        select {
10✔
486
                        case <-tr.cancelCtx.Done():
×
487
                                // the task reader is shutting down
×
488
                                return true, true
×
489
                        case tr.taskBuffers[defaultTaskBufferIsolationGroup] <- taskInfo:
9✔
490
                                // task successfully rerouted to default tasklist
9✔
491
                                return false, true
9✔
492
                        default:
1✔
493
                                // couldn't redirect, loop and try again
1✔
494
                                return false, false
1✔
495
                        }
496
                }
497

498
                // if there is no poller in the isolation group or the isolation group is drained,
499
                // we want to redistribute the tasks to other isolation groups in this case to drain
500
                // the backlog.
501
                select {
11✔
502
                case <-tr.cancelCtx.Done():
×
503
                        // the task reader is shutting down
×
504
                        return true, true
×
505
                case tr.taskBuffers[group] <- taskInfo:
9✔
506
                        // successful redirect
9✔
507
                        tr.scope.IncCounter(metrics.BufferIsolationGroupRedirectCounter)
9✔
508
                        tr.logger.Warn("some tasks were redirected to another isolation group.",
9✔
509
                                tag.Dynamic("redirection-from-isolation-group", isolationGroup),
9✔
510
                                tag.Dynamic("redirection-to-isolation-group", group),
9✔
511
                                tag.WorkflowRunID(taskInfo.RunID),
9✔
512
                                tag.WorkflowID(taskInfo.WorkflowID),
9✔
513
                                tag.TaskID(taskInfo.TaskID),
9✔
514
                                tag.WorkflowDomainID(taskInfo.DomainID),
9✔
515
                        )
9✔
516
                        return false, true
9✔
517
                default:
2✔
518
                        tr.scope.IncCounter(metrics.BufferIsolationGroupRedirectFailureCounter)
2✔
519
                        tr.logger.Error("some tasks could not be redirected to another isolation group as the buffer's already full",
2✔
520
                                tag.WorkflowRunID(taskInfo.RunID),
2✔
521
                                tag.Dynamic("redirection-from-isolation-group", isolationGroup),
2✔
522
                                tag.Dynamic("redirection-to-isolation-group", group),
2✔
523
                                tag.WorkflowID(taskInfo.WorkflowID),
2✔
524
                                tag.TaskID(taskInfo.TaskID),
2✔
525
                                tag.WorkflowDomainID(taskInfo.DomainID),
2✔
526
                        )
2✔
527
                        // the task async buffers on the other isolation-group are already full, wait and retry
2✔
528
                        return false, false
2✔
529
                }
530
        }
531

532
        if errors.Is(err, ErrTasklistThrottled) {
2✔
533
                tr.scope.IncCounter(metrics.BufferThrottlePerTaskListCounter)
1✔
534
                runtime.Gosched()
1✔
535
                return false, false
1✔
536
        }
1✔
537

538
        tr.scope.IncCounter(metrics.BufferUnknownTaskDispatchError)
×
539
        tr.logger.Error("unknown error while dispatching task",
×
540
                tag.Error(err),
×
541
                tag.IsolationGroup(isolationGroup),
×
542
                tag.WorkflowRunID(taskInfo.RunID),
×
543
                tag.WorkflowID(taskInfo.WorkflowID),
×
544
                tag.TaskID(taskInfo.TaskID),
×
545
                tag.WorkflowDomainID(taskInfo.DomainID),
×
546
        )
×
547
        return false, false
×
548
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc