• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kelindar / async / 15765567801

19 Jun 2025 08:08PM UTC coverage: 97.661%. First build
15765567801

Pull #3

github

kelindar
Refactor repeat_test.go to use atomic operations for thread-safe counter updates in tests. This change enhances concurrency safety in the TestRepeatType, TestRepeatWithError, and TestRepeatNormalExecution functions.
Pull Request #3: Simplify, make this thing type safe and optimize

120 of 124 new or added lines in 4 files covered. (96.77%)

167 of 171 relevant lines covered (97.66%)

55.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.26
/task.go
1
// Copyright 2019 Grabtaxi Holdings PTE LTE (GRAB), All rights reserved.
2
// Copyright (c) 2021-2025 Roman Atachiants
3
// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file
4

5
package async
6

7
import (
8
        "context"
9
        "errors"
10
        "fmt"
11
        "runtime/debug"
12
        "sync"
13
        "sync/atomic"
14
        "time"
15
)
16

17
var (
18
        errCancelled = errors.New("context canceled")
19
        ErrPanic     = errors.New("panic in async task")
20
)
21

22
var now = time.Now
23

24
// Work represents a handler to execute
25
type Work[T any] func(context.Context) (T, error)
26

27
// State represents the state enumeration for a task.
28
type State byte
29

30
// Various task states
31
const (
32
        IsCreated   State = iota // IsCreated represents a newly created task
33
        IsRunning                // IsRunning represents a task which is currently running
34
        IsCompleted              // IsCompleted represents a task which was completed successfully or errored out
35
        IsCancelled              // IsCancelled represents a task which was cancelled or has timed out
36
)
37

38
// Outcome of the task contains a result and an error
39
type outcome[T any] struct {
40
        result T     // The result of the work
41
        err    error // The error
42
}
43

44
// Task represents a unit of work to be done
45
type task[T any] struct {
46
        state    int32          // This indicates whether the task is started or not
47
        duration int64          // The duration of the task, in nanoseconds
48
        wg       sync.WaitGroup // Used to wait for completion instead of channel
49
        action   Work[T]        // The work to do
50
        outcome  outcome[T]     // This is used to store the result
51

52
}
53

54
// Task represents a unit of work to be done
55
type Task[T any] interface {
56
        Run(ctx context.Context) Task[T]
57
        Cancel()
58
        State() State
59
        Outcome() (T, error)
60
        Duration() time.Duration
61
}
62

63
// NewTask creates a new task.
64
func NewTask[T any](action Work[T]) Task[T] {
108✔
65
        t := &task[T]{
108✔
66
                action: action,
108✔
67
        }
108✔
68
        t.wg.Add(1) // Will be Done() when task completes
108✔
69
        return t
108✔
70
}
108✔
71

72
// NewTasks creates a set of new tasks.
73
func NewTasks[T any](actions ...Work[T]) []Task[T] {
5✔
74
        tasks := make([]Task[T], 0, len(actions))
5✔
75
        for _, action := range actions {
29✔
76
                tasks = append(tasks, NewTask(action))
24✔
77
        }
24✔
78
        return tasks
5✔
79
}
80

81
// Outcome waits until the task is done and returns the final result and error.
82
func (t *task[T]) Outcome() (T, error) {
213✔
83
        t.wg.Wait()
213✔
84
        return t.outcome.result, t.outcome.err
213✔
85
}
213✔
86

87
// State returns the current state of the task. This operation is non-blocking.
88
func (t *task[T]) State() State {
285✔
89
        v := atomic.LoadInt32(&t.state)
285✔
90
        return State(v)
285✔
91
}
285✔
92

93
// Duration returns the duration of the task.
94
func (t *task[T]) Duration() time.Duration {
2✔
95
        return time.Duration(atomic.LoadInt64(&t.duration))
2✔
96
}
2✔
97

98
// Run starts the task asynchronously.
99
func (t *task[T]) Run(ctx context.Context) Task[T] {
104✔
100
        go t.run(ctx)
104✔
101
        return t
104✔
102
}
104✔
103

104
// Cancel cancels a running task.
105
func (t *task[T]) Cancel() {
110✔
106
        switch {
110✔
107
        case t.changeState(IsCreated, IsCancelled):
6✔
108
                var zero T
6✔
109
                t.outcome = outcome[T]{result: zero, err: errCancelled}
6✔
110
                t.wg.Done()
6✔
111
                return
6✔
112
        case t.changeState(IsRunning, IsCancelled):
5✔
113
                // already running, do nothing
114
        }
115
}
116

117
// run starts the task synchronously.
118
func (t *task[T]) run(ctx context.Context) {
104✔
119
        if !t.changeState(IsCreated, IsRunning) {
110✔
120
                return // Prevent from running the same task twice or if already cancelled
6✔
121
        }
6✔
122

123
        // Notify everyone of the completion/error state
124
        defer t.wg.Done()
98✔
125

98✔
126
        // Check for cancellation before starting work
98✔
127
        if t.State() == IsCancelled {
98✔
NEW
128
                var zero T
×
NEW
129
                t.outcome = outcome[T]{result: zero, err: errCancelled}
×
NEW
130
                return
×
NEW
131
        }
×
132

133
        select {
98✔
134
        case <-ctx.Done():
2✔
135
                var zero T
2✔
136
                t.outcome = outcome[T]{result: zero, err: ctx.Err()}
2✔
137
                t.changeState(IsRunning, IsCancelled)
2✔
138
                return
2✔
139
        default:
96✔
140
                // Continue to work execution
141
        }
142

143
        // Execute the task directly with panic recovery
144
        startedAt := now().UnixNano()
96✔
145

96✔
146
        func() {
192✔
147
                defer func() {
188✔
148
                        if out := recover(); out != nil {
93✔
149
                                var zero T
1✔
150
                                t.outcome = outcome[T]{result: zero, err: fmt.Errorf("%w: %s\n%s",
1✔
151
                                        ErrPanic, out, debug.Stack())}
1✔
152
                                return
1✔
153
                        }
1✔
154
                }()
155

156
                r, e := t.action(ctx)
96✔
157
                t.outcome = outcome[T]{result: r, err: e}
96✔
158
        }()
159

160
        atomic.StoreInt64(&t.duration, now().UnixNano()-startedAt)
92✔
161

92✔
162
        // Check if we were cancelled during execution
92✔
163
        switch {
92✔
164
        case t.State() == IsCancelled:
1✔
165
                var zero T
1✔
166
                t.outcome = outcome[T]{result: zero, err: errCancelled}
1✔
167
        case t.State() == IsRunning:
91✔
168
                select {
91✔
169
                case <-ctx.Done():
7✔
170
                        var zero T
7✔
171
                        t.outcome = outcome[T]{result: zero, err: ctx.Err()}
7✔
172
                        t.changeState(IsRunning, IsCancelled)
7✔
173
                default:
84✔
174
                        t.changeState(IsRunning, IsCompleted)
84✔
175
                }
176
        }
177
}
178

179
// Cancel cancels a running task.
180
func (t *task[T]) changeState(from, to State) bool {
411✔
181
        return atomic.CompareAndSwapInt32(&t.state, int32(from), int32(to))
411✔
182
}
411✔
183

184
// Invoke creates a new tasks and runs it asynchronously.
185
func Invoke[T any](ctx context.Context, action Work[T]) Task[T] {
22✔
186
        return NewTask(action).Run(ctx)
22✔
187
}
22✔
188

189
// -------------------------------- Completed Task --------------------------------
190

191
// completedTask represents a task that is already completed
192
type completedTask[T any] struct {
193
        out T
194
        err error
195
}
196

197
// Run returns the task itself since it's already completed
198
func (t *completedTask[T]) Run(ctx context.Context) Task[T] {
1✔
199
        return t
1✔
200
}
1✔
201

202
// Cancel does nothing since the task is already completed
203
func (t *completedTask[T]) Cancel() {}
2✔
204

205
// State always returns IsCompleted
206
func (t *completedTask[T]) State() State {
3✔
207
        return IsCompleted
3✔
208
}
3✔
209

210
// Outcome immediately returns the result
211
func (t *completedTask[T]) Outcome() (T, error) {
7✔
212
        return t.out, t.err
7✔
213
}
7✔
214

215
// Duration returns zero since no work was performed
216
func (t *completedTask[T]) Duration() time.Duration {
2✔
217
        return 0
2✔
218
}
2✔
219

220
// -------------------------------- Factory Functions --------------------------------
221

222
// Completed creates a completed task with the given result.
223
func Completed[T any](result T) Task[T] {
5✔
224
        return &completedTask[T]{out: result}
5✔
225
}
5✔
226

227
// Failed creates a failed task with the given error.
228
func Failed[T any](err error) Task[T] {
4✔
229
        return &completedTask[T]{err: err}
4✔
230
}
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc