• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

looplab / eventhorizon / 12622512440

05 Jan 2025 07:58PM UTC coverage: 27.495% (-39.9%) from 67.361%
12622512440

Pull #419

github

web-flow
Merge b3c17d928 into ac3a97277
Pull Request #419: fix(ci): update to up/download-artifact v4

1769 of 6434 relevant lines covered (27.49%)

1.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.67
/middleware/commandhandler/scheduler/middleware.go
1
// Copyright (c) 2017 - The Event Horizon authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package scheduler
16

17
import (
18
        "context"
19
        "errors"
20
        "fmt"
21
        "sync"
22
        "time"
23

24
        eh "github.com/looplab/eventhorizon"
25
        "github.com/looplab/eventhorizon/uuid"
26
)
27

28
// The default command queue size to use.
29
var ScheduledCommandsQueueSize = 100
30

31
// ErrCanceled is when a scheduled command has been canceled.
32
var ErrCanceled = errors.New("canceled")
33

34
// Command is a scheduled command with an execution time.
35
type Command interface {
36
        eh.Command
37

38
        // ExecuteAt returns the time when the command will execute.
39
        ExecuteAt() time.Time
40
}
41

42
// CommandWithExecuteTime returns a wrapped command with a execution time set.
43
func CommandWithExecuteTime(cmd eh.Command, t time.Time) Command {
1✔
44
        return &command{Command: cmd, t: t}
1✔
45
}
1✔
46

47
// private implementation to wrap ordinary commands and add a execution time.
48
type command struct {
49
        eh.Command
50
        t time.Time
51
}
52

53
// ExecuteAt implements the ExecuteAt method of the Command interface.
54
func (c *command) ExecuteAt() time.Time {
2✔
55
        return c.t
2✔
56
}
2✔
57

58
// NewMiddleware returns a new command handler middleware and a scheduler helper.
59
func NewMiddleware(repo eh.ReadWriteRepo, codec eh.CommandCodec) (eh.CommandHandlerMiddleware, *Scheduler) {
1✔
60
        s := &Scheduler{
1✔
61
                repo:             repo,
1✔
62
                cmdCh:            make(chan *scheduledCommand, ScheduledCommandsQueueSize),
1✔
63
                cancelScheduling: map[uuid.UUID]chan struct{}{},
1✔
64
                errCh:            make(chan error, 100),
1✔
65
                codec:            codec,
1✔
66
        }
1✔
67

1✔
68
        return eh.CommandHandlerMiddleware(func(h eh.CommandHandler) eh.CommandHandler {
2✔
69
                s.setHandler(h)
1✔
70

1✔
71
                return eh.CommandHandlerFunc(func(ctx context.Context, cmd eh.Command) error {
2✔
72
                        // Delayed command execution if there is time set.
1✔
73
                        if c, ok := cmd.(Command); ok && !c.ExecuteAt().IsZero() {
2✔
74
                                // Use the wrapped command when created by the helper func.
1✔
75
                                innerCmd, ok := c.(*command)
1✔
76
                                if ok {
2✔
77
                                        cmd = innerCmd.Command
1✔
78
                                }
1✔
79

80
                                // Ignore the persisted command ID in this case.
81
                                _, err := s.ScheduleCommand(ctx, cmd, c.ExecuteAt())
1✔
82

1✔
83
                                return err
1✔
84
                        }
85

86
                        // Immediate command execution.
87
                        return h.HandleCommand(ctx, cmd)
×
88
                })
89
        }), s
90
}
91

92
// PersistedCommand is a persisted command.
93
type PersistedCommand struct {
94
        ID         uuid.UUID       `json:"_"                 bson:"_id"`
95
        IDStr      string          `json:"id"                bson:"_"`
96
        RawCommand []byte          `json:"command"           bson:"command"`
97
        ExecuteAt  time.Time       `json:"timestamp"         bson:"timestamp"`
98
        Command    eh.Command      `json:"-"                 bson:"-"`
99
        Context    context.Context `json:"-"                 bson:"-"`
100
}
101

102
// EntityID implements the EntityID method of the eventhorizon.Entity interface.
103
func (c *PersistedCommand) EntityID() uuid.UUID {
1✔
104
        return c.ID
1✔
105
}
1✔
106

107
// Scheduler is a scheduled of commands.
108
type Scheduler struct {
109
        h                  eh.CommandHandler
110
        hMu                sync.Mutex
111
        repo               eh.ReadWriteRepo
112
        cmdCh              chan *scheduledCommand
113
        cancelScheduling   map[uuid.UUID]chan struct{}
114
        cancelSchedulingMu sync.Mutex
115
        errCh              chan error
116
        cctx               context.Context
117
        cancel             context.CancelFunc
118
        done               chan struct{}
119
        codec              eh.CommandCodec
120
}
121

122
func (s *Scheduler) setHandler(h eh.CommandHandler) {
1✔
123
        s.hMu.Lock()
1✔
124
        defer s.hMu.Unlock()
1✔
125

1✔
126
        if s.h != nil {
1✔
127
                panic("eventhorizon: handler already set for outbox")
×
128
        }
129

130
        s.h = h
1✔
131
}
132

133
// Load loads all persisted scheduled commands. It will be limited
134
// by ScheduledCommandsQueueSize if Start() has not yet been called.
135
func (s *Scheduler) Load(ctx context.Context) error {
2✔
136
        commands, err := s.Commands(ctx)
2✔
137
        if err != nil {
2✔
138
                return fmt.Errorf("could not load scheduled commands: %w", err)
×
139
        }
×
140

141
        for _, pc := range commands {
3✔
142
                sc := &scheduledCommand{
1✔
143
                        id:        pc.ID,
1✔
144
                        ctx:       pc.Context,
1✔
145
                        cmd:       pc.Command,
1✔
146
                        executeAt: pc.ExecuteAt,
1✔
147
                }
1✔
148

1✔
149
                select {
1✔
150
                case s.cmdCh <- sc:
1✔
151
                default:
×
152
                        return fmt.Errorf("could not schedule command, command queue full")
×
153
                }
154
        }
155

156
        return nil
2✔
157
}
158

159
// Start starts the scheduler.
160
func (s *Scheduler) Start() error {
2✔
161
        if s.h == nil {
2✔
162
                return fmt.Errorf("command handler not set")
×
163
        }
×
164

165
        s.cctx, s.cancel = context.WithCancel(context.Background())
2✔
166
        s.done = make(chan struct{})
2✔
167

2✔
168
        go s.run()
2✔
169

2✔
170
        return nil
2✔
171
}
172

173
// Stop stops all scheduled commands.
174
func (s *Scheduler) Stop() error {
2✔
175
        s.cancel()
2✔
176

2✔
177
        <-s.done
2✔
178

2✔
179
        return nil
2✔
180
}
2✔
181

182
// Errors returns an error channel that will receive errors from handling of
183
// scheduled commands.
184
func (s *Scheduler) Errors() <-chan error {
1✔
185
        return s.errCh
1✔
186
}
1✔
187

188
type scheduledCommand struct {
189
        id        uuid.UUID
190
        ctx       context.Context
191
        cmd       eh.Command
192
        executeAt time.Time
193
}
194

195
// ScheduleCommand schedules a command to be executed at `executeAt`. It is persisted
196
// to the repo.
197
func (s *Scheduler) ScheduleCommand(ctx context.Context, cmd eh.Command, executeAt time.Time) (uuid.UUID, error) {
1✔
198
        b, err := s.codec.MarshalCommand(ctx, cmd)
1✔
199
        if err != nil {
1✔
200
                return uuid.Nil, &Error{
×
201
                        Err:     fmt.Errorf("could not marshal command: %w", err),
×
202
                        Ctx:     ctx,
×
203
                        Command: cmd,
×
204
                }
×
205
        }
×
206

207
        // Use the command ID as persisted ID if available.
208
        var id uuid.UUID
1✔
209
        if cmd, ok := cmd.(eh.CommandIDer); ok {
1✔
210
                id = cmd.CommandID()
×
211
        } else {
1✔
212
                id = uuid.New()
1✔
213
        }
1✔
214

215
        pc := &PersistedCommand{
1✔
216
                ID:         id,
1✔
217
                IDStr:      id.String(),
1✔
218
                RawCommand: b,
1✔
219
                ExecuteAt:  executeAt,
1✔
220
        }
1✔
221

1✔
222
        if err := s.repo.Save(ctx, pc); err != nil {
1✔
223
                return uuid.Nil, &Error{
×
224
                        Err:     fmt.Errorf("could not persist command: %w", err),
×
225
                        Ctx:     ctx,
×
226
                        Command: cmd,
×
227
                }
×
228
        }
×
229

230
        select {
1✔
231
        case s.cmdCh <- &scheduledCommand{id, ctx, cmd, executeAt}:
1✔
232
        default:
×
233
                return uuid.Nil, &Error{
×
234
                        Err:     fmt.Errorf("command queue full"),
×
235
                        Ctx:     ctx,
×
236
                        Command: cmd,
×
237
                }
×
238
        }
239

240
        return pc.ID, nil
1✔
241
}
242

243
// Commands returns all scheduled commands.
244
func (s *Scheduler) Commands(ctx context.Context) ([]*PersistedCommand, error) {
2✔
245
        entities, err := s.repo.FindAll(ctx)
2✔
246
        if err != nil {
2✔
247
                return nil, fmt.Errorf("could not load scheduled commands: %w", err)
×
248
        }
×
249

250
        commands := make([]*PersistedCommand, len(entities))
2✔
251

2✔
252
        for i, entity := range entities {
3✔
253
                c, ok := entity.(*PersistedCommand)
1✔
254
                if !ok {
1✔
255
                        return nil, fmt.Errorf("command is not schedulable: %T", entity)
×
256
                }
×
257

258
                if c.Command, c.Context, err = s.codec.UnmarshalCommand(ctx, c.RawCommand); err != nil {
1✔
259
                        return nil, fmt.Errorf("could not unmarshal command: %w", err)
×
260
                }
×
261

262
                c.RawCommand = nil
1✔
263

1✔
264
                if c.IDStr != "" {
2✔
265
                        id, err := uuid.Parse(c.IDStr)
1✔
266
                        if err != nil {
1✔
267
                                return nil, fmt.Errorf("could not parse command ID: %w", err)
×
268
                        }
×
269

270
                        c.ID = id
1✔
271
                }
272

273
                commands[i] = c
1✔
274
        }
275

276
        return commands, nil
2✔
277
}
278

279
// CancelCommand cancels a scheduled command.
280
func (s *Scheduler) CancelCommand(ctx context.Context, id uuid.UUID) error {
×
281
        s.cancelSchedulingMu.Lock()
×
282
        defer s.cancelSchedulingMu.Unlock()
×
283

×
284
        cancel, ok := s.cancelScheduling[id]
×
285
        if !ok {
×
286
                return fmt.Errorf("command %s not scheduled", id)
×
287
        }
×
288

289
        close(cancel)
×
290

×
291
        return nil
×
292
}
293

294
func (s *Scheduler) run() {
2✔
295
        var wg sync.WaitGroup
2✔
296

2✔
297
loop:
2✔
298
        for {
6✔
299
                select {
4✔
300
                case <-s.cctx.Done():
2✔
301
                        break loop
2✔
302
                case sc := <-s.cmdCh:
2✔
303
                        wg.Add(1)
2✔
304

2✔
305
                        s.cancelSchedulingMu.Lock()
2✔
306
                        cancel := make(chan struct{})
2✔
307
                        s.cancelScheduling[sc.id] = cancel
2✔
308
                        s.cancelSchedulingMu.Unlock()
2✔
309

2✔
310
                        go func(cancel chan struct{}) {
4✔
311
                                defer wg.Done()
2✔
312

2✔
313
                                t := time.NewTimer(time.Until(sc.executeAt))
2✔
314
                                defer t.Stop()
2✔
315

2✔
316
                                select {
2✔
317
                                case <-s.cctx.Done():
1✔
318
                                        // Stop without removing persisted cmd.
319
                                case <-t.C:
1✔
320
                                        if err := s.h.HandleCommand(sc.ctx, sc.cmd); err != nil {
1✔
321
                                                // Always try to deliver errors.
×
322
                                                s.errCh <- &Error{
×
323
                                                        Err:     err,
×
324
                                                        Ctx:     sc.ctx,
×
325
                                                        Command: sc.cmd,
×
326
                                                }
×
327
                                        }
×
328

329
                                        if err := s.repo.Remove(sc.ctx, sc.id); err != nil {
1✔
330
                                                s.errCh <- &Error{
×
331
                                                        Err:     fmt.Errorf("could not remove persisted command: %w", err),
×
332
                                                        Ctx:     sc.ctx,
×
333
                                                        Command: sc.cmd,
×
334
                                                }
×
335
                                        }
×
336
                                case <-cancel:
×
337
                                        if err := s.repo.Remove(sc.ctx, sc.id); err != nil {
×
338
                                                s.errCh <- &Error{
×
339
                                                        Err:     fmt.Errorf("could not remove persisted command: %w", err),
×
340
                                                        Ctx:     sc.ctx,
×
341
                                                        Command: sc.cmd,
×
342
                                                }
×
343
                                        }
×
344

345
                                        s.errCh <- &Error{
×
346
                                                Err:     ErrCanceled,
×
347
                                                Ctx:     sc.ctx,
×
348
                                                Command: sc.cmd,
×
349
                                        }
×
350
                                }
351
                        }(cancel)
352
                }
353
        }
354

355
        wg.Wait()
2✔
356

2✔
357
        close(s.done)
2✔
358
}
359

360
// Error is an async error containing the error and the command.
361
type Error struct {
362
        // Err is the error that happened when handling the command.
363
        Err error
364
        // Ctx is the context used when the error happened.
365
        Ctx context.Context
366
        // Command is the command handeled when the error happened.
367
        Command eh.Command
368
}
369

370
// Error implements the Error method of the error interface.
371
func (e *Error) Error() string {
×
372
        return fmt.Sprintf("%s (%s): %s", e.Command.CommandType(), e.Command.AggregateID(), e.Err.Error())
×
373
}
×
374

375
// Unwrap implements the errors.Unwrap method.
376
func (e *Error) Unwrap() error {
×
377
        return e.Err
×
378
}
×
379

380
// Cause implements the github.com/pkg/errors Unwrap method.
381
func (e *Error) Cause() error {
×
382
        return e.Unwrap()
×
383
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc