• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kelindar / async / 15765595396

19 Jun 2025 08:09PM UTC coverage: 97.661%. First build
15765595396

push

github

web-flow
Simplify, make this thing type safe and optimize (#3)

* Remove batch and partition implementations along with their associated tests. Introduce benchmark tests for task execution performance.

* Enhance task execution with cancellation checks and panic recovery. Ensure proper state transitions on cancellation and context errors, while simplifying outcome handling.

* Refactor task execution in Consume and InvokeAll functions to use goroutines for better concurrency management. Remove ContinueWith method from Task interface and simplify task outcome handling. Update tests to reflect these changes.

* Refactor task cancellation handling by removing the cancellation channel and simplifying state transitions. Update .gitignore to exclude profiling and test files.

* Refactor task and work types to use generics for improved type safety and flexibility. Update benchmarks and tests to accommodate changes in task handling. Remove unused mock partitioner file.

* Refactor task handling to utilize generics across Consume, InvokeAll, and Repeat functions for enhanced type safety. Introduce Completed and Failed task constructors for better error handling. Update benchmarks and tests to reflect these changes.

* Update benchmarks in bench_test.go for improved performance metrics and refactor InvokeAll to utilize Consume for task processing. Adjusted task handling to enhance concurrency control and maintain consistency in outcome management.

* Refactor task management in task.go to replace the cancellation channel with a WaitGroup for improved concurrency control. Update benchmarks in bench_test.go to reflect performance changes and ensure consistency in outcome management.

* Refactor task duration handling in task.go to use atomic operations for thread-safe updates. Replace the duration field with an atomic int64 to ensure accurate duration retrieval and storage during task execution.

* Enhance test coverage for Repeat function in repeat_test.go by adding test... (continued)

120 of 124 new or added lines in 4 files covered. (96.77%)

167 of 171 relevant lines covered (97.66%)

56.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.26
/task.go
1
// Copyright 2019 Grabtaxi Holdings PTE LTE (GRAB), All rights reserved.
2
// Copyright (c) 2021-2025 Roman Atachiants
3
// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file
4

5
package async
6

7
import (
8
        "context"
9
        "errors"
10
        "fmt"
11
        "runtime/debug"
12
        "sync"
13
        "sync/atomic"
14
        "time"
15
)
16

17
var (
18
        errCancelled = errors.New("context canceled")
19
        ErrPanic     = errors.New("panic in async task")
20
)
21

22
var now = time.Now
23

24
// Work represents a handler to execute
25
type Work[T any] func(context.Context) (T, error)
26

27
// State represents the state enumeration for a task.
28
type State byte
29

30
// Various task states
31
const (
32
        IsCreated   State = iota // IsCreated represents a newly created task
33
        IsRunning                // IsRunning represents a task which is currently running
34
        IsCompleted              // IsCompleted represents a task which was completed successfully or errored out
35
        IsCancelled              // IsCancelled represents a task which was cancelled or has timed out
36
)
37

38
// Outcome of the task contains a result and an error
39
type outcome[T any] struct {
40
        result T     // The result of the work
41
        err    error // The error
42
}
43

44
// Task represents a unit of work to be done
45
type task[T any] struct {
46
        state    int32          // This indicates whether the task is started or not
47
        duration int64          // The duration of the task, in nanoseconds
48
        wg       sync.WaitGroup // Used to wait for completion instead of channel
49
        action   Work[T]        // The work to do
50
        outcome  outcome[T]     // This is used to store the result
51

52
}
53

54
// Task represents a unit of work to be done
55
type Task[T any] interface {
56
        Run(ctx context.Context) Task[T]
57
        Cancel()
58
        State() State
59
        Outcome() (T, error)
60
        Duration() time.Duration
61
}
62

63
// NewTask creates a new task.
64
func NewTask[T any](action Work[T]) Task[T] {
112✔
65
        t := &task[T]{
112✔
66
                action: action,
112✔
67
        }
112✔
68
        t.wg.Add(1) // Will be Done() when task completes
112✔
69
        return t
112✔
70
}
112✔
71

72
// NewTasks creates a set of new tasks.
73
func NewTasks[T any](actions ...Work[T]) []Task[T] {
5✔
74
        tasks := make([]Task[T], 0, len(actions))
5✔
75
        for _, action := range actions {
29✔
76
                tasks = append(tasks, NewTask(action))
24✔
77
        }
24✔
78
        return tasks
5✔
79
}
80

81
// Outcome waits until the task is done and returns the final result and error.
82
func (t *task[T]) Outcome() (T, error) {
217✔
83
        t.wg.Wait()
217✔
84
        return t.outcome.result, t.outcome.err
217✔
85
}
217✔
86

87
// State returns the current state of the task. This operation is non-blocking.
88
func (t *task[T]) State() State {
293✔
89
        v := atomic.LoadInt32(&t.state)
293✔
90
        return State(v)
293✔
91
}
293✔
92

93
// Duration returns the duration of the task.
94
func (t *task[T]) Duration() time.Duration {
2✔
95
        return time.Duration(atomic.LoadInt64(&t.duration))
2✔
96
}
2✔
97

98
// Run starts the task asynchronously.
99
func (t *task[T]) Run(ctx context.Context) Task[T] {
108✔
100
        go t.run(ctx)
108✔
101
        return t
108✔
102
}
108✔
103

104
// Cancel cancels a running task.
105
func (t *task[T]) Cancel() {
110✔
106
        switch {
110✔
107
        case t.changeState(IsCreated, IsCancelled):
6✔
108
                var zero T
6✔
109
                t.outcome = outcome[T]{result: zero, err: errCancelled}
6✔
110
                t.wg.Done()
6✔
111
                return
6✔
112
        case t.changeState(IsRunning, IsCancelled):
5✔
113
                // already running, do nothing
114
        }
115
}
116

117
// run starts the task synchronously.
118
func (t *task[T]) run(ctx context.Context) {
108✔
119
        if !t.changeState(IsCreated, IsRunning) {
114✔
120
                return // Prevent from running the same task twice or if already cancelled
6✔
121
        }
6✔
122

123
        // Notify everyone of the completion/error state
124
        defer t.wg.Done()
102✔
125

102✔
126
        // Check for cancellation before starting work
102✔
127
        if t.State() == IsCancelled {
102✔
NEW
128
                var zero T
×
NEW
129
                t.outcome = outcome[T]{result: zero, err: errCancelled}
×
NEW
130
                return
×
NEW
131
        }
×
132

133
        select {
102✔
134
        case <-ctx.Done():
4✔
135
                var zero T
4✔
136
                t.outcome = outcome[T]{result: zero, err: ctx.Err()}
4✔
137
                t.changeState(IsRunning, IsCancelled)
4✔
138
                return
4✔
139
        default:
98✔
140
                // Continue to work execution
141
        }
142

143
        // Execute the task directly with panic recovery
144
        startedAt := now().UnixNano()
98✔
145

98✔
146
        func() {
196✔
147
                defer func() {
192✔
148
                        if out := recover(); out != nil {
95✔
149
                                var zero T
1✔
150
                                t.outcome = outcome[T]{result: zero, err: fmt.Errorf("%w: %s\n%s",
1✔
151
                                        ErrPanic, out, debug.Stack())}
1✔
152
                                return
1✔
153
                        }
1✔
154
                }()
155

156
                r, e := t.action(ctx)
98✔
157
                t.outcome = outcome[T]{result: r, err: e}
98✔
158
        }()
159

160
        atomic.StoreInt64(&t.duration, now().UnixNano()-startedAt)
94✔
161

94✔
162
        // Check if we were cancelled during execution
94✔
163
        switch {
94✔
164
        case t.State() == IsCancelled:
1✔
165
                var zero T
1✔
166
                t.outcome = outcome[T]{result: zero, err: errCancelled}
1✔
167
        case t.State() == IsRunning:
93✔
168
                select {
93✔
169
                case <-ctx.Done():
7✔
170
                        var zero T
7✔
171
                        t.outcome = outcome[T]{result: zero, err: ctx.Err()}
7✔
172
                        t.changeState(IsRunning, IsCancelled)
7✔
173
                default:
86✔
174
                        t.changeState(IsRunning, IsCompleted)
86✔
175
                }
176
        }
177
}
178

179
// Cancel cancels a running task.
180
func (t *task[T]) changeState(from, to State) bool {
419✔
181
        return atomic.CompareAndSwapInt32(&t.state, int32(from), int32(to))
419✔
182
}
419✔
183

184
// Invoke creates a new tasks and runs it asynchronously.
185
func Invoke[T any](ctx context.Context, action Work[T]) Task[T] {
22✔
186
        return NewTask(action).Run(ctx)
22✔
187
}
22✔
188

189
// -------------------------------- Completed Task --------------------------------
190

191
// completedTask represents a task that is already completed
192
type completedTask[T any] struct {
193
        out T
194
        err error
195
}
196

197
// Run returns the task itself since it's already completed
198
func (t *completedTask[T]) Run(ctx context.Context) Task[T] {
1✔
199
        return t
1✔
200
}
1✔
201

202
// Cancel does nothing since the task is already completed
203
func (t *completedTask[T]) Cancel() {}
2✔
204

205
// State always returns IsCompleted
206
func (t *completedTask[T]) State() State {
3✔
207
        return IsCompleted
3✔
208
}
3✔
209

210
// Outcome immediately returns the result
211
func (t *completedTask[T]) Outcome() (T, error) {
7✔
212
        return t.out, t.err
7✔
213
}
7✔
214

215
// Duration returns zero since no work was performed
216
func (t *completedTask[T]) Duration() time.Duration {
2✔
217
        return 0
2✔
218
}
2✔
219

220
// -------------------------------- Factory Functions --------------------------------
221

222
// Completed creates a completed task with the given result.
223
func Completed[T any](result T) Task[T] {
5✔
224
        return &completedTask[T]{out: result}
5✔
225
}
5✔
226

227
// Failed creates a failed task with the given error.
228
func Failed[T any](err error) Task[T] {
4✔
229
        return &completedTask[T]{err: err}
4✔
230
}
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc