• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018ef31e-0dee-4de8-b6fd-54e4f1f05b43

18 Apr 2024 09:30PM UTC coverage: 67.542% (-0.05%) from 67.587%
018ef31e-0dee-4de8-b6fd-54e4f1f05b43

push

buildkite

web-flow
Added Executor Interface and TimerTaskExecutorBase with stop() Method and improve context management in TimerQueueProcessor #5920

* Added stop() to the Executor interface and created an empty stop() function for future implementations in Executors.
* Implemented stop() in TimerTaskExecutorBase.
* Added context and cancelFn in TimerTaskExecutorBase struct and replaced context.Background() with internal context to prevent memory leaks.
* Called executor.stop() in TimerQueueProcessor to prevent context leaks.

45 of 49 new or added lines in 8 files covered. (91.84%)

99 existing lines in 18 files now uncovered.

98907 of 146438 relevant lines covered (67.54%)

2383.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.11
/service/history/queue/timer_queue_processor.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2

3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9

10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12

13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package queue
22

23
import (
24
        "context"
25
        "fmt"
26
        "sync"
27
        "sync/atomic"
28
        "time"
29

30
        "github.com/uber/cadence/common"
31
        "github.com/uber/cadence/common/log"
32
        "github.com/uber/cadence/common/log/tag"
33
        "github.com/uber/cadence/common/metrics"
34
        "github.com/uber/cadence/common/ndc"
35
        "github.com/uber/cadence/common/persistence"
36
        "github.com/uber/cadence/common/reconciliation/invariant"
37
        "github.com/uber/cadence/common/types"
38
        hcommon "github.com/uber/cadence/service/history/common"
39
        "github.com/uber/cadence/service/history/config"
40
        "github.com/uber/cadence/service/history/engine"
41
        "github.com/uber/cadence/service/history/execution"
42
        "github.com/uber/cadence/service/history/shard"
43
        "github.com/uber/cadence/service/history/task"
44
        "github.com/uber/cadence/service/worker/archiver"
45
)
46

47
type timerQueueProcessor struct {
48
        shard         shard.Context
49
        historyEngine engine.Engine
50
        taskProcessor task.Processor
51

52
        config             *config.Config
53
        currentClusterName string
54

55
        metricsClient metrics.Client
56
        logger        log.Logger
57

58
        status       int32
59
        shutdownChan chan struct{}
60
        shutdownWG   sync.WaitGroup
61

62
        ackLevel               time.Time
63
        taskAllocator          TaskAllocator
64
        activeTaskExecutor     task.Executor
65
        activeQueueProcessor   *timerQueueProcessorBase
66
        standbyQueueProcessors map[string]*timerQueueProcessorBase
67
        standbyTaskExecutors   []task.Executor
68
        standbyQueueTimerGates map[string]RemoteTimerGate
69
}
70

71
// NewTimerQueueProcessor creates a new timer QueueProcessor
72
func NewTimerQueueProcessor(
73
        shard shard.Context,
74
        historyEngine engine.Engine,
75
        taskProcessor task.Processor,
76
        executionCache *execution.Cache,
77
        archivalClient archiver.Client,
78
        executionCheck invariant.Invariant,
79
) Processor {
63✔
80
        logger := shard.GetLogger().WithTags(tag.ComponentTimerQueue)
63✔
81
        currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName()
63✔
82
        config := shard.GetConfig()
63✔
83
        taskAllocator := NewTaskAllocator(shard)
63✔
84

63✔
85
        activeTaskExecutor := task.NewTimerActiveTaskExecutor(
63✔
86
                shard,
63✔
87
                archivalClient,
63✔
88
                executionCache,
63✔
89
                logger,
63✔
90
                shard.GetMetricsClient(),
63✔
91
                config,
63✔
92
        )
63✔
93

63✔
94
        activeQueueProcessor := newTimerQueueActiveProcessor(
63✔
95
                currentClusterName,
63✔
96
                shard,
63✔
97
                historyEngine,
63✔
98
                taskProcessor,
63✔
99
                taskAllocator,
63✔
100
                activeTaskExecutor,
63✔
101
                logger,
63✔
102
        )
63✔
103

63✔
104
        standbyTaskExecutors := make([]task.Executor, 0, len(shard.GetClusterMetadata().GetRemoteClusterInfo()))
63✔
105
        standbyQueueProcessors := make(map[string]*timerQueueProcessorBase)
63✔
106
        standbyQueueTimerGates := make(map[string]RemoteTimerGate)
63✔
107
        for clusterName := range shard.GetClusterMetadata().GetRemoteClusterInfo() {
126✔
108
                historyResender := ndc.NewHistoryResender(
63✔
109
                        shard.GetDomainCache(),
63✔
110
                        shard.GetService().GetClientBean().GetRemoteAdminClient(clusterName),
63✔
111
                        func(ctx context.Context, request *types.ReplicateEventsV2Request) error {
63✔
112
                                return historyEngine.ReplicateEventsV2(ctx, request)
×
113
                        },
×
114
                        config.StandbyTaskReReplicationContextTimeout,
115
                        executionCheck,
116
                        shard.GetLogger(),
117
                )
118
                standbyTaskExecutor := task.NewTimerStandbyTaskExecutor(
63✔
119
                        shard,
63✔
120
                        archivalClient,
63✔
121
                        executionCache,
63✔
122
                        historyResender,
63✔
123
                        logger,
63✔
124
                        shard.GetMetricsClient(),
63✔
125
                        clusterName,
63✔
126
                        config,
63✔
127
                )
63✔
128
                standbyTaskExecutors = append(standbyTaskExecutors, standbyTaskExecutor)
63✔
129
                standbyQueueProcessors[clusterName], standbyQueueTimerGates[clusterName] = newTimerQueueStandbyProcessor(
63✔
130
                        clusterName,
63✔
131
                        shard,
63✔
132
                        historyEngine,
63✔
133
                        taskProcessor,
63✔
134
                        taskAllocator,
63✔
135
                        standbyTaskExecutor,
63✔
136
                        logger,
63✔
137
                )
63✔
138
        }
139

140
        return &timerQueueProcessor{
63✔
141
                shard:         shard,
63✔
142
                historyEngine: historyEngine,
63✔
143
                taskProcessor: taskProcessor,
63✔
144

63✔
145
                config:             config,
63✔
146
                currentClusterName: currentClusterName,
63✔
147

63✔
148
                metricsClient: shard.GetMetricsClient(),
63✔
149
                logger:        logger,
63✔
150

63✔
151
                status:       common.DaemonStatusInitialized,
63✔
152
                shutdownChan: make(chan struct{}),
63✔
153

63✔
154
                ackLevel:               shard.GetTimerAckLevel(),
63✔
155
                taskAllocator:          taskAllocator,
63✔
156
                activeTaskExecutor:     activeTaskExecutor,
63✔
157
                activeQueueProcessor:   activeQueueProcessor,
63✔
158
                standbyQueueProcessors: standbyQueueProcessors,
63✔
159
                standbyQueueTimerGates: standbyQueueTimerGates,
63✔
160
                standbyTaskExecutors:   standbyTaskExecutors,
63✔
161
        }
63✔
162
}
163

164
func (t *timerQueueProcessor) Start() {
63✔
165
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
63✔
166
                return
×
167
        }
×
168

169
        t.activeQueueProcessor.Start()
63✔
170
        for _, standbyQueueProcessor := range t.standbyQueueProcessors {
126✔
171
                standbyQueueProcessor.Start()
63✔
172
        }
63✔
173

174
        t.shutdownWG.Add(1)
63✔
175
        go t.completeTimerLoop()
63✔
176
}
177

178
func (t *timerQueueProcessor) Stop() {
63✔
179
        if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
63✔
180
                return
×
181
        }
×
182

183
        if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() {
126✔
184
                t.activeQueueProcessor.Stop()
63✔
185
                // stop active executor after queue processor
63✔
186
                t.activeTaskExecutor.Stop()
63✔
187
                for _, standbyQueueProcessor := range t.standbyQueueProcessors {
126✔
188
                        standbyQueueProcessor.Stop()
63✔
189
                }
63✔
190

191
                // stop standby executors after queue processors
192
                for _, standbyTaskExecutor := range t.standbyTaskExecutors {
126✔
193
                        standbyTaskExecutor.Stop()
63✔
194
                }
63✔
195

196
                close(t.shutdownChan)
63✔
197
                common.AwaitWaitGroup(&t.shutdownWG, time.Minute)
63✔
198
                return
63✔
199
        }
200

201
        // close the shutdown channel first so processor pumps drains tasks
202
        // and then stop the processors
203
        close(t.shutdownChan)
×
204
        if !common.AwaitWaitGroup(&t.shutdownWG, gracefulShutdownTimeout) {
×
205
                t.logger.Warn("transferQueueProcessor timed out on shut down", tag.LifeCycleStopTimedout)
×
206
        }
×
207
        t.activeQueueProcessor.Stop()
×
NEW
208
        t.activeTaskExecutor.Stop()
×
209
        for _, standbyQueueProcessor := range t.standbyQueueProcessors {
×
210
                standbyQueueProcessor.Stop()
×
211
        }
×
212

213
        // stop standby executors after queue processors
NEW
214
        for _, standbyTaskExecutor := range t.standbyTaskExecutors {
×
NEW
215
                standbyTaskExecutor.Stop()
×
NEW
216
        }
×
217
}
218

219
func (t *timerQueueProcessor) NotifyNewTask(clusterName string, info *hcommon.NotifyTaskInfo) {
3,043✔
220
        if clusterName == t.currentClusterName {
6,086✔
221
                t.activeQueueProcessor.notifyNewTimers(info.Tasks)
3,043✔
222
                return
3,043✔
223
        }
3,043✔
224

225
        standbyQueueProcessor, ok := t.standbyQueueProcessors[clusterName]
3✔
226
        if !ok {
3✔
227
                panic(fmt.Sprintf("Cannot find standby timer processor for %s.", clusterName))
×
228
        }
229

230
        standbyQueueTimerGate, ok := t.standbyQueueTimerGates[clusterName]
3✔
231
        if !ok {
3✔
232
                panic(fmt.Sprintf("Cannot find standby timer gate for %s.", clusterName))
×
233
        }
234

235
        curTime := t.shard.GetCurrentTime(clusterName)
3✔
236
        standbyQueueTimerGate.SetCurrentTime(curTime)
3✔
237
        t.logger.Debug("Current time for standby queue timergate is updated", tag.ClusterName(clusterName), tag.Timestamp(curTime))
3✔
238
        standbyQueueProcessor.notifyNewTimers(info.Tasks)
3✔
239
}
240

241
func (t *timerQueueProcessor) FailoverDomain(domainIDs map[string]struct{}) {
×
242
        // Failover queue is used to scan all inflight tasks, if queue processor is not
×
243
        // started, there's no inflight task and we don't need to create a failover processor.
×
244
        // Also the HandleAction will be blocked if queue processor processing loop is not running.
×
245
        if atomic.LoadInt32(&t.status) != common.DaemonStatusStarted {
×
246
                return
×
247
        }
×
248

249
        minLevel := t.shard.GetTimerClusterAckLevel(t.currentClusterName)
×
250
        standbyClusterName := t.currentClusterName
×
251
        for clusterName := range t.shard.GetClusterMetadata().GetEnabledClusterInfo() {
×
252
                ackLevel := t.shard.GetTimerClusterAckLevel(clusterName)
×
253
                if ackLevel.Before(minLevel) {
×
254
                        minLevel = ackLevel
×
255
                        standbyClusterName = clusterName
×
256
                }
×
257
        }
258

259
        if standbyClusterName != t.currentClusterName {
×
260
                t.logger.Debugf("Timer queue failover will use minLevel: %v from standbyClusterName: %s", minLevel, standbyClusterName)
×
261
        } else {
×
262
                t.logger.Debugf("Timer queue failover will use minLevel: %v from current cluster: %s", minLevel, t.currentClusterName)
×
263
        }
×
264

265
        maxReadLevel := time.Time{}
×
266
        actionResult, err := t.HandleAction(context.Background(), t.currentClusterName, NewGetStateAction())
×
267
        if err != nil {
×
268
                t.logger.Error("Timer failover failed while getting queue states", tag.WorkflowDomainIDs(domainIDs), tag.Error(err))
×
269
                if err == errProcessorShutdown {
×
270
                        // processor/shard already shutdown, we don't need to create failover queue processor
×
271
                        return
×
272
                }
×
273
                // other errors should never be returned for GetStateAction
274
                panic(fmt.Sprintf("unknown error for GetStateAction: %v", err))
×
275
        }
276

277
        var maxReadLevelQueueLevel int
×
278
        for _, queueState := range actionResult.GetStateActionResult.States {
×
279
                queueReadLevel := queueState.ReadLevel().(timerTaskKey).visibilityTimestamp
×
280
                if maxReadLevel.Before(queueReadLevel) {
×
281
                        maxReadLevel = queueReadLevel
×
282
                        maxReadLevelQueueLevel = queueState.Level()
×
283
                }
×
284
        }
285

286
        if !maxReadLevel.IsZero() {
×
287
                t.logger.Debugf("Timer queue failover will use maxReadLevel: %v from queue at level: %v", maxReadLevel, maxReadLevelQueueLevel)
×
288
        }
×
289

290
        // TODO: Below Add call has no effect, understand the underlying intent and fix it.
291
        maxReadLevel.Add(1 * time.Millisecond)
×
292

×
293
        t.logger.Info("Timer Failover Triggered",
×
294
                tag.WorkflowDomainIDs(domainIDs),
×
295
                tag.MinLevel(minLevel.UnixNano()),
×
296
                tag.MaxLevel(maxReadLevel.UnixNano()),
×
297
        )
×
298

×
299
        updateClusterAckLevelFn, failoverQueueProcessor := newTimerQueueFailoverProcessor(
×
300
                standbyClusterName,
×
301
                t.shard,
×
302
                t.historyEngine,
×
303
                t.taskProcessor,
×
304
                t.taskAllocator,
×
305
                t.activeTaskExecutor,
×
306
                t.logger,
×
307
                minLevel,
×
308
                maxReadLevel,
×
309
                domainIDs,
×
310
        )
×
311

×
312
        // NOTE: READ REF BEFORE MODIFICATION
×
313
        // ref: historyEngine.go registerDomainFailoverCallback function
×
314
        err = updateClusterAckLevelFn(newTimerTaskKey(minLevel, 0))
×
315
        if err != nil {
×
316
                t.logger.Error("Error update shard ack level", tag.Error(err))
×
317
        }
×
318
        failoverQueueProcessor.Start()
×
319
}
320

321
func (t *timerQueueProcessor) HandleAction(ctx context.Context, clusterName string, action *Action) (*ActionResult, error) {
239✔
322
        var resultNotificationCh chan actionResultNotification
239✔
323
        var added bool
239✔
324
        if clusterName == t.currentClusterName {
390✔
325
                resultNotificationCh, added = t.activeQueueProcessor.addAction(ctx, action)
151✔
326
        } else {
241✔
327
                found := false
90✔
328
                for standbyClusterName, standbyProcessor := range t.standbyQueueProcessors {
180✔
329
                        if clusterName == standbyClusterName {
180✔
330
                                resultNotificationCh, added = standbyProcessor.addAction(ctx, action)
90✔
331
                                found = true
90✔
332
                                break
90✔
333
                        }
334
                }
335

336
                if !found {
90✔
337
                        return nil, fmt.Errorf("unknown cluster name: %v", clusterName)
×
338
                }
×
339
        }
340

341
        if !added {
302✔
342
                if ctxErr := ctx.Err(); ctxErr != nil {
63✔
343
                        return nil, ctxErr
×
344
                }
×
345
                return nil, errProcessorShutdown
63✔
346
        }
347

348
        select {
178✔
349
        case resultNotification := <-resultNotificationCh:
178✔
350
                return resultNotification.result, resultNotification.err
178✔
351
        case <-t.shutdownChan:
×
352
                return nil, errProcessorShutdown
×
353
        case <-ctx.Done():
×
354
                return nil, ctx.Err()
×
355
        }
356
}
357

358
func (t *timerQueueProcessor) LockTaskProcessing() {
717✔
359
        t.taskAllocator.Lock()
717✔
360
}
717✔
361

362
func (t *timerQueueProcessor) UnlockTaskProcessing() {
717✔
363
        t.taskAllocator.Unlock()
717✔
364
}
717✔
365

366
func (t *timerQueueProcessor) drain() {
63✔
367
        if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() {
126✔
368
                if err := t.completeTimer(context.Background()); err != nil {
126✔
369
                        t.logger.Error("Failed to complete timer task during shutdown", tag.Error(err))
63✔
370
                }
63✔
371
                return
63✔
372
        }
373

374
        // when graceful shutdown is enabled for queue processor, use a context with timeout
375
        ctx, cancel := context.WithTimeout(context.Background(), gracefulShutdownTimeout)
×
376
        defer cancel()
×
377
        if err := t.completeTimer(ctx); err != nil {
×
378
                t.logger.Error("Failed to complete timer task during shutdown", tag.Error(err))
×
379
        }
×
380
}
381

382
func (t *timerQueueProcessor) completeTimerLoop() {
63✔
383
        defer t.shutdownWG.Done()
63✔
384

63✔
385
        completeTimer := time.NewTimer(t.config.TimerProcessorCompleteTimerInterval())
63✔
386
        defer completeTimer.Stop()
63✔
387

63✔
388
        for {
214✔
389
                select {
151✔
390
                case <-t.shutdownChan:
63✔
391
                        t.drain()
63✔
392
                        return
63✔
393
                case <-completeTimer.C:
90✔
394
                        for attempt := 0; attempt < t.config.TimerProcessorCompleteTimerFailureRetryCount(); attempt++ {
180✔
395
                                err := t.completeTimer(context.Background())
90✔
396
                                if err == nil {
180✔
397
                                        break
90✔
398
                                }
399

400
                                t.logger.Error("Failed to complete timer task", tag.Error(err))
×
401
                                if err == shard.ErrShardClosed {
×
402
                                        if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() {
×
403
                                                go t.Stop()
×
404
                                                return
×
405
                                        }
×
406

407
                                        t.Stop()
×
408
                                        return
×
409
                                }
410

411
                                select {
×
412
                                case <-t.shutdownChan:
×
413
                                        t.drain()
×
414
                                        return
×
415
                                case <-time.After(time.Duration(attempt*100) * time.Millisecond):
×
416
                                        // do nothing. retry loop will continue
417
                                }
418
                        }
419

420
                        completeTimer.Reset(t.config.TimerProcessorCompleteTimerInterval())
90✔
421
                }
422
        }
423
}
424

425
func (t *timerQueueProcessor) completeTimer(ctx context.Context) error {
151✔
426
        newAckLevel := maximumTimerTaskKey
151✔
427
        actionResult, err := t.HandleAction(ctx, t.currentClusterName, NewGetStateAction())
151✔
428
        if err != nil {
214✔
429
                return err
63✔
430
        }
63✔
431
        for _, queueState := range actionResult.GetStateActionResult.States {
180✔
432
                newAckLevel = minTaskKey(newAckLevel, queueState.AckLevel())
90✔
433
        }
90✔
434

435
        for standbyClusterName := range t.standbyQueueProcessors {
180✔
436
                actionResult, err := t.HandleAction(ctx, standbyClusterName, NewGetStateAction())
90✔
437
                if err != nil {
90✔
438
                        return err
×
439
                }
×
440
                for _, queueState := range actionResult.GetStateActionResult.States {
180✔
441
                        newAckLevel = minTaskKey(newAckLevel, queueState.AckLevel())
90✔
442
                }
90✔
443
        }
444

445
        for _, failoverInfo := range t.shard.GetAllTimerFailoverLevels() {
90✔
446
                failoverLevel := newTimerTaskKey(failoverInfo.MinLevel, 0)
×
447
                newAckLevel = minTaskKey(newAckLevel, failoverLevel)
×
448
        }
×
449

450
        if newAckLevel == maximumTimerTaskKey {
90✔
451
                panic("Unable to get timer queue processor ack level")
×
452
        }
453

454
        newAckLevelTimestamp := newAckLevel.(timerTaskKey).visibilityTimestamp
90✔
455
        if !t.ackLevel.Before(newAckLevelTimestamp) {
154✔
456
                t.logger.Debugf("Skipping timer task completion because new ack level %v is not before ack level %v", newAckLevelTimestamp, t.ackLevel)
64✔
457
                return nil
64✔
458
        }
64✔
459

460
        t.logger.Debugf("Start completing timer task from: %v, to %v", t.ackLevel, newAckLevelTimestamp)
26✔
461
        t.metricsClient.Scope(metrics.TimerQueueProcessorScope).
26✔
462
                Tagged(metrics.ShardIDTag(t.shard.GetShardID())).
26✔
463
                IncCounter(metrics.TaskBatchCompleteCounter)
26✔
464

26✔
465
        totalDeleted := 0
26✔
466
        for {
52✔
467
                pageSize := t.config.TimerTaskDeleteBatchSize()
26✔
468
                resp, err := t.shard.GetExecutionManager().RangeCompleteTimerTask(ctx, &persistence.RangeCompleteTimerTaskRequest{
26✔
469
                        InclusiveBeginTimestamp: t.ackLevel,
26✔
470
                        ExclusiveEndTimestamp:   newAckLevelTimestamp,
26✔
471
                        PageSize:                pageSize, // pageSize may or may not be honored
26✔
472
                })
26✔
473
                if err != nil {
26✔
474
                        return err
×
475
                }
×
476

477
                totalDeleted += resp.TasksCompleted
26✔
478
                t.logger.Debug("Timer task batch deletion", tag.Dynamic("page-size", pageSize), tag.Dynamic("total-deleted", totalDeleted))
26✔
479
                if !persistence.HasMoreRowsToDelete(resp.TasksCompleted, pageSize) {
52✔
480
                        break
26✔
481
                }
482
        }
483

484
        t.ackLevel = newAckLevelTimestamp
26✔
485

26✔
486
        return t.shard.UpdateTimerAckLevel(t.ackLevel)
26✔
487
}
488

489
func loadTimerProcessingQueueStates(
490
        clusterName string,
491
        shard shard.Context,
492
        options *queueProcessorOptions,
493
        logger log.Logger,
494
) []ProcessingQueueState {
123✔
495
        ackLevel := shard.GetTimerClusterAckLevel(clusterName)
123✔
496
        if options.EnableLoadQueueStates() {
246✔
497
                pStates := shard.GetTimerProcessingQueueStates(clusterName)
123✔
498
                if validateProcessingQueueStates(pStates, ackLevel) {
246✔
499
                        return convertFromPersistenceTimerProcessingQueueStates(pStates)
123✔
500
                }
123✔
501

502
                logger.Error("Incompatible processing queue states and ackLevel",
×
503
                        tag.Value(pStates),
×
504
                        tag.ShardTimerAcks(ackLevel),
×
505
                )
×
506
        }
507

508
        return []ProcessingQueueState{
×
509
                NewProcessingQueueState(
×
510
                        defaultProcessingQueueLevel,
×
511
                        newTimerTaskKey(ackLevel, 0),
×
512
                        maximumTimerTaskKey,
×
513
                        NewDomainFilter(nil, true),
×
514
                ),
×
515
        }
×
516
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc