• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 018d9fa8-75f8-405b-9b1e-38f93e6b0a11

12 Feb 2024 11:30PM UTC coverage: 62.748% (+0.05%) from 62.701%
018d9fa8-75f8-405b-9b1e-38f93e6b0a11

Pull #5657

buildkite

Shaddoll
Implement SignalWithStartWorkflowExecutionAsync API
Pull Request #5657: Implement SignalWithStartWorkflowExecutionAsync API

96 of 142 new or added lines in 5 files covered. (67.61%)

60 existing lines in 8 files now uncovered.

92596 of 147569 relevant lines covered (62.75%)

2318.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.57
/common/task/fifo_task_scheduler.go
1
// Copyright (c) 2020 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package task
22

23
import (
24
        "sync"
25
        "sync/atomic"
26
        "time"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/log"
30
        "github.com/uber/cadence/common/log/tag"
31
        "github.com/uber/cadence/common/metrics"
32
)
33

34
type fifoTaskSchedulerImpl struct {
35
        status       int32
36
        logger       log.Logger
37
        metricsScope metrics.Scope
38
        options      *FIFOTaskSchedulerOptions
39
        dispatcherWG sync.WaitGroup
40
        taskCh       chan PriorityTask
41
        shutdownCh   chan struct{}
42

43
        processor Processor
44
}
45

46
// NewFIFOTaskScheduler creates a new FIFO task scheduler
47
// it's an no-op implementation as it simply copy tasks from
48
// one task channel to another task channel.
49
// This scheduler is only for development purpose.
50
func NewFIFOTaskScheduler(
51
        logger log.Logger,
52
        metricsClient metrics.Client,
53
        options *FIFOTaskSchedulerOptions,
54
) Scheduler {
3✔
55
        return &fifoTaskSchedulerImpl{
3✔
56
                status:       common.DaemonStatusInitialized,
3✔
57
                logger:       logger,
3✔
58
                metricsScope: metricsClient.Scope(metrics.TaskSchedulerScope),
3✔
59
                options:      options,
3✔
60
                taskCh:       make(chan PriorityTask, options.QueueSize),
3✔
61
                shutdownCh:   make(chan struct{}),
3✔
62
                processor: NewParallelTaskProcessor(
3✔
63
                        logger,
3✔
64
                        metricsClient,
3✔
65
                        &ParallelTaskProcessorOptions{
3✔
66
                                QueueSize:   options.QueueSize,
3✔
67
                                WorkerCount: options.WorkerCount,
3✔
68
                                RetryPolicy: options.RetryPolicy,
3✔
69
                        },
3✔
70
                ),
3✔
71
        }
3✔
72
}
3✔
73

74
func (f *fifoTaskSchedulerImpl) Start() {
2✔
75
        if !atomic.CompareAndSwapInt32(&f.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
2✔
76
                return
×
77
        }
×
78

79
        f.processor.Start()
2✔
80

2✔
81
        f.dispatcherWG.Add(f.options.DispatcherCount)
2✔
82
        for i := 0; i != f.options.DispatcherCount; i++ {
4✔
83
                go f.dispatcher()
2✔
84
        }
2✔
85

86
        f.logger.Info("FIFO task scheduler started.")
2✔
87
}
88

89
func (f *fifoTaskSchedulerImpl) Stop() {
2✔
90
        if !atomic.CompareAndSwapInt32(&f.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
2✔
91
                return
×
92
        }
×
93

94
        close(f.shutdownCh)
2✔
95

2✔
96
        f.processor.Stop()
2✔
97

2✔
98
        f.drainAndNackTasks()
2✔
99

2✔
100
        if success := common.AwaitWaitGroup(&f.dispatcherWG, time.Minute); !success {
2✔
101
                f.logger.Warn("FIFO task scheduler timedout on shutdown.")
×
102
        }
×
103

104
        f.logger.Info("FIFO task scheduler shutdown.")
2✔
105
}
106

107
func (f *fifoTaskSchedulerImpl) Submit(task PriorityTask) error {
5,109✔
108
        f.metricsScope.IncCounter(metrics.ParallelTaskSubmitRequest)
5,109✔
109
        sw := f.metricsScope.StartTimer(metrics.ParallelTaskSubmitLatency)
5,109✔
110
        defer sw.Stop()
5,109✔
111

5,109✔
112
        if f.isStopped() {
9,371✔
113
                return ErrTaskSchedulerClosed
4,262✔
114
        }
4,262✔
115

116
        select {
847✔
117
        case f.taskCh <- task:
847✔
118
                if f.isStopped() {
847✔
UNCOV
119
                        f.drainAndNackTasks()
×
UNCOV
120
                }
×
121
                return nil
847✔
122
        case <-f.shutdownCh:
×
123
                return ErrTaskSchedulerClosed
×
124
        }
125
}
126

127
func (f *fifoTaskSchedulerImpl) TrySubmit(task PriorityTask) (bool, error) {
4,899✔
128
        if f.isStopped() {
8,971✔
129
                return false, ErrTaskSchedulerClosed
4,072✔
130
        }
4,072✔
131

132
        select {
827✔
133
        case f.taskCh <- task:
427✔
134
                f.metricsScope.IncCounter(metrics.ParallelTaskSubmitRequest)
427✔
135
                if f.isStopped() {
427✔
136
                        f.drainAndNackTasks()
×
137
                }
×
138
                return true, nil
427✔
139
        case <-f.shutdownCh:
×
140
                return false, ErrTaskSchedulerClosed
×
141
        default:
400✔
142
                return false, nil
400✔
143
        }
144
}
145

146
func (f *fifoTaskSchedulerImpl) dispatcher() {
2✔
147
        defer f.dispatcherWG.Done()
2✔
148

2✔
149
        for {
1,275✔
150
                select {
1,273✔
151
                case task := <-f.taskCh:
1,271✔
152
                        if err := f.processor.Submit(task); err != nil {
1,273✔
153
                                f.logger.Error("failed to submit task to processor", tag.Error(err))
2✔
154
                                task.Nack()
2✔
155
                        }
2✔
156
                case <-f.shutdownCh:
2✔
157
                        return
2✔
158
                }
159
        }
160
}
161

162
func (f *fifoTaskSchedulerImpl) isStopped() bool {
11,282✔
163
        return atomic.LoadInt32(&f.status) == common.DaemonStatusStopped
11,282✔
164
}
11,282✔
165

166
func (f *fifoTaskSchedulerImpl) drainAndNackTasks() {
2✔
167
        for {
5✔
168
                select {
3✔
169
                case task := <-f.taskCh:
1✔
170
                        task.Nack()
1✔
171
                default:
2✔
172
                        return
2✔
173
                }
174
        }
175
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc