• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kelindar / async / 16089549196

05 Jul 2025 03:31PM UTC coverage: 97.561%. First build
16089549196

push

github

kelindar
Use atomic for chaining

13 of 14 new or added lines in 1 file covered. (92.86%)

200 of 205 relevant lines covered (97.56%)

47.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.48
/task.go
1
// Copyright 2019 Grabtaxi Holdings PTE LTE (GRAB), All rights reserved.
2
// Copyright (c) 2021-2025 Roman Atachiants
3
// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file
4

5
package async
6

7
import (
8
        "context"
9
        "errors"
10
        "fmt"
11
        "runtime/debug"
12
        "sync"
13
        "sync/atomic"
14
        "time"
15
)
16

17
var (
18
        errCancelled = errors.New("context canceled")
19
        ErrPanic     = errors.New("panic in async task")
20
)
21

22
var now = time.Now
23

24
// Work represents a handler to execute
25
type Work[T any] func(context.Context) (T, error)
26

27
// State represents the state enumeration for a task.
28
type State byte
29

30
// Various task states
31
const (
32
        IsCreated   State = iota // IsCreated represents a newly created task
33
        IsRunning                // IsRunning represents a task which is currently running
34
        IsCompleted              // IsCompleted represents a task which was completed successfully or errored out
35
        IsCancelled              // IsCancelled represents a task which was cancelled or has timed out
36
)
37

38
// Outcome of the task contains a result and an error
39
type outcome[T any] struct {
40
        result T     // The result of the work
41
        err    error // The error
42
}
43

44
// task represents a unit of work to be done
45
type task[T any] struct {
46
        state    int32                                   // This indicates whether the task is started or not
47
        duration int64                                   // The duration of the task, in nanoseconds
48
        wg       sync.WaitGroup                          // Used to wait for completion instead of channel
49
        action   Work[T]                                 // The work to do
50
        outcome  outcome[T]                              // This is used to store the result
51
        next     atomic.Pointer[[]func(context.Context)] // Continuation functions
52
}
53

54
// Awaiter is an interface that can be used to wait for a task to complete.
55
type Awaiter interface {
56
        Wait() error
57
        Cancel()
58
        State() State
59
}
60

61
// Task represents a unit of work to be done
62
type Task[T any] interface {
63
        Awaiter
64
        Run(ctx context.Context) Task[T]
65
        Outcome() (T, error)
66
        Duration() time.Duration
67
}
68

69
// NewTask creates a new task.
70
func NewTask[T any](action Work[T]) Task[T] {
107✔
71
        t := &task[T]{
107✔
72
                action: action,
107✔
73
        }
107✔
74
        t.wg.Add(1)
107✔
75
        return t
107✔
76
}
107✔
77

78
// NewTasks creates a set of new tasks.
79
func NewTasks[T any](actions ...Work[T]) []Task[T] {
4✔
80
        tasks := make([]Task[T], 0, len(actions))
4✔
81
        for _, action := range actions {
22✔
82
                tasks = append(tasks, NewTask(action))
18✔
83
        }
18✔
84
        return tasks
4✔
85
}
86

87
// Outcome waits until the task is done and returns the final result and error.
88
func (t *task[T]) Outcome() (T, error) {
178✔
89
        t.wg.Wait()
178✔
90
        return t.outcome.result, t.outcome.err
178✔
91
}
178✔
92

93
// Wait waits for the task to complete and returns the error.
94
func (t *task[T]) Wait() error {
28✔
95
        t.wg.Wait()
28✔
96
        return t.outcome.err
28✔
97
}
28✔
98

99
// State returns the current state of the task. This operation is non-blocking.
100
func (t *task[T]) State() State {
281✔
101
        v := atomic.LoadInt32(&t.state)
281✔
102
        return State(v)
281✔
103
}
281✔
104

105
// Duration returns the duration of the task.
106
func (t *task[T]) Duration() time.Duration {
4✔
107
        return time.Duration(atomic.LoadInt64(&t.duration))
4✔
108
}
4✔
109

110
// Run starts the task asynchronously.
111
func (t *task[T]) Run(ctx context.Context) Task[T] {
98✔
112
        go t.run(ctx)
98✔
113
        return t
98✔
114
}
98✔
115

116
// Cancel cancels a running task.
117
func (t *task[T]) Cancel() {
111✔
118
        switch {
111✔
119
        case t.changeState(IsCreated, IsCancelled):
6✔
120
                var zero T
6✔
121
                t.outcome = outcome[T]{result: zero, err: errCancelled}
6✔
122
                t.wg.Done()
6✔
123
                return
6✔
124
        case t.changeState(IsRunning, IsCancelled):
6✔
125
                // already running, do nothing
126
        }
127
}
128

129
// run starts the task synchronously.
130
func (t *task[T]) run(ctx context.Context) {
103✔
131
        if !t.changeState(IsCreated, IsRunning) {
109✔
132
                return // Prevent from running the same task twice or if already cancelled
6✔
133
        }
6✔
134

135
        // Notify everyone of the completion/error state
136
        defer t.wg.Done()
97✔
137

97✔
138
        // Check for cancellation before starting work
97✔
139
        if t.State() == IsCancelled {
97✔
140
                var zero T
×
141
                t.outcome = outcome[T]{result: zero, err: errCancelled}
×
142
                return
×
143
        }
×
144

145
        select {
97✔
146
        case <-ctx.Done():
2✔
147
                var zero T
2✔
148
                t.outcome = outcome[T]{result: zero, err: ctx.Err()}
2✔
149
                t.changeState(IsRunning, IsCancelled)
2✔
150
                return
2✔
151
        default:
95✔
152
                // Continue to work execution
153
        }
154

155
        // Execute the task directly with panic recovery
156
        startedAt := now().UnixNano()
95✔
157

95✔
158
        func() {
190✔
159
                defer func() {
186✔
160
                        if out := recover(); out != nil {
92✔
161
                                var zero T
1✔
162
                                t.outcome = outcome[T]{result: zero, err: fmt.Errorf("%w: %s\n%s",
1✔
163
                                        ErrPanic, out, debug.Stack())}
1✔
164
                                return
1✔
165
                        }
1✔
166
                }()
167

168
                // Execute the task
169
                r, e := t.action(ctx)
95✔
170
                t.outcome = outcome[T]{result: r, err: e}
95✔
171

95✔
172
                // Run next tasks with the same context
95✔
173
                if cont := t.next.Load(); cont != nil {
100✔
174
                        for _, next := range *cont {
10✔
175
                                next(ctx)
5✔
176
                        }
5✔
177
                }
178
        }()
179

180
        atomic.StoreInt64(&t.duration, now().UnixNano()-startedAt)
91✔
181

91✔
182
        // Check if we were cancelled during execution
91✔
183
        switch {
91✔
184
        case t.State() == IsCancelled:
2✔
185
                var zero T
2✔
186
                t.outcome = outcome[T]{result: zero, err: errCancelled}
2✔
187
        case t.State() == IsRunning:
89✔
188
                select {
89✔
189
                case <-ctx.Done():
7✔
190
                        var zero T
7✔
191
                        t.outcome = outcome[T]{result: zero, err: ctx.Err()}
7✔
192
                        t.changeState(IsRunning, IsCancelled)
7✔
193
                default:
82✔
194
                        t.changeState(IsRunning, IsCompleted)
82✔
195
                }
196
        }
197
}
198

199
// Cancel cancels a running task.
200
func (t *task[T]) changeState(from, to State) bool {
410✔
201
        return atomic.CompareAndSwapInt32(&t.state, int32(from), int32(to))
410✔
202
}
410✔
203

204
// Invoke creates a new tasks and runs it asynchronously.
205
func Invoke[T any](ctx context.Context, action Work[T]) Task[T] {
20✔
206
        return NewTask(action).Run(ctx)
20✔
207
}
20✔
208

209
// -------------------------------- Completed Task --------------------------------
210

211
// completedTask represents a task that is already completed
212
type completedTask[T any] struct {
213
        out T
214
        err error
215
}
216

217
// Run returns the task itself since it's already completed
218
func (t *completedTask[T]) Run(ctx context.Context) Task[T] {
1✔
219
        return t
1✔
220
}
1✔
221

222
// Cancel does nothing since the task is already completed
223
func (t *completedTask[T]) Cancel() {}
2✔
224

225
// State always returns IsCompleted
226
func (t *completedTask[T]) State() State {
3✔
227
        return IsCompleted
3✔
228
}
3✔
229

230
// Outcome immediately returns the result
231
func (t *completedTask[T]) Outcome() (T, error) {
9✔
232
        return t.out, t.err
9✔
233
}
9✔
234

235
// Wait waits for the task to complete and returns the error.
236
func (t *completedTask[T]) Wait() error {
1✔
237
        return t.err
1✔
238
}
1✔
239

240
// Duration returns zero since no work was performed
241
func (t *completedTask[T]) Duration() time.Duration {
2✔
242
        return 0
2✔
243
}
2✔
244

245
// -------------------------------- Factory Functions --------------------------------
246

247
// Completed creates a completed task with the given result.
248
func Completed[T any](result T) Task[T] {
6✔
249
        return &completedTask[T]{out: result}
6✔
250
}
6✔
251

252
// Failed creates a failed task with the given error.
253
func Failed[T any](err error) Task[T] {
5✔
254
        return &completedTask[T]{err: err}
5✔
255
}
5✔
256

257
// -------------------------------- Continuation Task --------------------------------
258

259
// After creates a continuation task that automatically runs when the predecessor completes
260
func After[T, U any](predecessor Task[T], work func(context.Context, T) (U, error)) Task[U] {
6✔
261
        prev, ok := predecessor.(*task[T])
6✔
262
        if !ok {
7✔
263
                return Failed[U](fmt.Errorf("predecessor does not support chaining"))
1✔
264
        }
1✔
265

266
        // Since this function is only called after predecessor completes,
267
        // we can directly access its outcome without waiting
268
        next := NewTask(func(ctx context.Context) (U, error) {
10✔
269
                if prev.outcome.err != nil {
6✔
270
                        var zero U
1✔
271
                        return zero, prev.outcome.err
1✔
272
                }
1✔
273

274
                return work(ctx, prev.outcome.result) //nolint:scopelint
4✔
275
        }).(*task[U])
276

277
        // Add continuation function using atomic operations
278
        for {
10✔
279
                curr := prev.next.Load()
5✔
280
                cont := withNext(curr, next.run)
5✔
281
                if prev.next.CompareAndSwap(curr, &cont) {
10✔
282
                        break
5✔
283
                }
284
        }
285
        return next
5✔
286
}
287

288
// withNext adds a new continuation function to the list of continuations
289
func withNext(current *[]func(context.Context), next func(context.Context)) []func(context.Context) {
5✔
290
        if current == nil {
10✔
291
                return []func(context.Context){next}
5✔
292
        }
5✔
293

NEW
294
        return append((*current), next)
×
295
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc