• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

welovemedia / ffmate / 18079104130

28 Sep 2025 07:59PM UTC coverage: 64.371% (-0.2%) from 64.608%
18079104130

Pull #23

github

YoSev
fix: enable cancelation of running task during pre, post and ffmpeg processing
Pull Request #23: fix: enable cancelation of running task during ffmpeg processing

0 of 32 new or added lines in 2 files covered. (0.0%)

2 existing lines in 2 files now uncovered.

2206 of 3427 relevant lines covered (64.37%)

13.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.06
/internal/service/task/task.go
1
package task
2

3
import (
4
        "context"
5
        "errors"
6
        "os/exec"
7
        "strings"
8
        "sync"
9
        "time"
10

11
        "github.com/google/uuid"
12
        "github.com/welovemedia/ffmate/v2/internal/cfg"
13
        "github.com/welovemedia/ffmate/v2/internal/database/model"
14
        "github.com/welovemedia/ffmate/v2/internal/debug"
15
        "github.com/welovemedia/ffmate/v2/internal/dto"
16
        "github.com/welovemedia/ffmate/v2/internal/metrics"
17
        "github.com/welovemedia/ffmate/v2/internal/service"
18
        "github.com/welovemedia/ffmate/v2/internal/service/ffmpeg"
19
        "github.com/welovemedia/ffmate/v2/internal/service/preset"
20
        "github.com/welovemedia/ffmate/v2/internal/service/webhook"
21
        "github.com/welovemedia/ffmate/v2/internal/service/websocket"
22
)
23

24
type Repository interface {
25
        List(page int, perPage int) (*[]model.Task, int64, error)
26
        ListByBatch(uuid string, page int, perPage int) (*[]model.Task, int64, error)
27
        Add(task *model.Task) (*model.Task, error)
28
        Update(task *model.Task) (*model.Task, error)
29
        First(uuid string) (*model.Task, error)
30
        Delete(task *model.Task) error
31
        Count() (int64, error)
32
        CountUnfinishedByBatch(uuid string) (int64, error)
33
        CountAllStatus() (int, int, int, int, int, error)
34
        NextQueued(amount int) (*[]model.Task, error)
35
}
36

37
type Service struct {
38
        repository       Repository
39
        presetService    *preset.Service
40
        webhookService   *webhook.Service
41
        websocketService *websocket.Service
42
        ffmpegService    *ffmpeg.Service
43
}
44

45
func NewService(repository Repository, presetService *preset.Service, webhookService *webhook.Service, websocketService *websocket.Service, ffmpegService *ffmpeg.Service) *Service {
38✔
46
        return &Service{
38✔
47
                repository:       repository,
38✔
48
                presetService:    presetService,
38✔
49
                webhookService:   webhookService,
38✔
50
                websocketService: websocketService,
38✔
51
                ffmpegService:    ffmpegService,
38✔
52
        }
38✔
53
}
38✔
54

55
func (s *Service) Get(uuid string) (*model.Task, error) {
1✔
56
        w, err := s.repository.First(uuid)
1✔
57
        if err != nil {
1✔
58
                return nil, err
×
59
        }
×
60

61
        if w == nil {
1✔
62
                return nil, errors.New("task for given uuid not found")
×
63
        }
×
64

65
        return w, nil
1✔
66
}
67

68
func (s *Service) Update(task *model.Task) (*model.Task, error) {
2✔
69
        task.ClientIdentifier = cfg.GetString("ffmate.identifier")
2✔
70
        task, err := s.repository.Update(task)
2✔
71
        if err != nil {
2✔
72
                return nil, err
×
73
        }
×
74

75
        s.webhookService.Fire(dto.TaskUpdated, task.ToDTO())
2✔
76
        s.webhookService.FireDirect(task.Webhooks, dto.TaskUpdated, task.ToDTO())
2✔
77
        s.websocketService.Broadcast(websocket.TaskUpdated, task.ToDTO())
2✔
78

2✔
79
        if task.Batch != "" {
2✔
80
                switch task.Status {
×
81
                case dto.DoneSuccessful, dto.DoneError, dto.DoneCanceled:
×
82
                        c, _ := s.repository.CountUnfinishedByBatch(task.Batch)
×
83
                        if c == 0 {
×
84
                                metrics.Gauge("batch.finished").Inc()
×
85
                                s.webhookService.Fire(dto.BatchFinished, task.ToDTO())
×
86
                        }
×
87
                }
88
        }
89

90
        return task, nil
2✔
91
}
92

93
func (s *Service) Cancel(uuid string) (*model.Task, error) {
1✔
94
        w, err := s.repository.First(uuid)
1✔
95
        if err != nil {
1✔
96
                return nil, err
×
97
        }
×
98

99
        if w == nil {
1✔
100
                return nil, errors.New("task for given uuid not found")
×
101
        }
×
102

103
        w.Status = dto.DoneCanceled
1✔
104
        w.Remaining = -1
1✔
105
        w.Progress = 100
1✔
106
        w.FinishedAt = time.Now().UnixMilli()
1✔
107

1✔
108
        metrics.Gauge("task.canceled").Inc()
1✔
109
        debug.Log.Info("canceled task (uuid: %s)", uuid)
1✔
110

1✔
111
        return s.Update(w)
1✔
112
}
113

114
func (s *Service) Restart(uuid string) (*model.Task, error) {
1✔
115
        w, err := s.repository.First(uuid)
1✔
116
        if err != nil {
1✔
117
                return nil, err
×
118
        }
×
119

120
        if w == nil {
1✔
121
                return nil, errors.New("task for given uuid not found")
×
122
        }
×
123

124
        w.Status = dto.Queued
1✔
125
        w.Progress = 0
1✔
126
        w.StartedAt = 0
1✔
127
        w.FinishedAt = 0
1✔
128
        w.Error = ""
1✔
129

1✔
130
        metrics.Gauge("task.restarted").Inc()
1✔
131
        debug.Log.Info("restarted task (uuid: %s)", uuid)
1✔
132

1✔
133
        return s.Update(w)
1✔
134
}
135

136
func (s *Service) List(page int, perPage int) (*[]model.Task, int64, error) {
1✔
137
        return s.repository.List(page, perPage)
1✔
138
}
1✔
139

140
func (s *Service) GetBatch(uuid string, page int, perPage int) (*dto.Batch, int64, error) {
1✔
141
        tasks, count, err := s.repository.ListByBatch(uuid, page, perPage)
1✔
142
        if err != nil {
1✔
143
                return nil, count, err
×
144
        }
×
145

146
        var taskDTOs = []*dto.Task{}
1✔
147
        for _, task := range *tasks {
2✔
148
                taskDTOs = append(taskDTOs, task.ToDTO())
1✔
149
        }
1✔
150

151
        return &dto.Batch{
1✔
152
                UUID:  uuid,
1✔
153
                Tasks: taskDTOs,
1✔
154
        }, count, err
1✔
155
}
156

157
var presetCache = sync.Map{}
158

159
func (s *Service) Add(newTask *dto.NewTask, source dto.TaskSource, batch string) (*model.Task, error) {
7✔
160
        if newTask.Preset != "" {
7✔
161
                var preset *model.Preset
×
162
                var err error
×
163

×
164
                // add preset cache for batch creation
×
165
                if batch != "" {
×
166
                        p, ok := presetCache.Load(batch)
×
167
                        if ok {
×
168
                                preset = p.(*model.Preset)
×
169
                        }
×
170
                }
171
                if preset == nil {
×
172
                        preset, err = s.presetService.Get(newTask.Preset)
×
173
                        if err != nil {
×
174
                                return nil, err
×
175
                        }
×
176

177
                        if preset != nil {
×
178
                                presetCache.Store(batch, preset)
×
179
                        }
×
180
                }
181

182
                newTask.Command = preset.Command
×
183
                if newTask.OutputFile == "" {
×
184
                        newTask.OutputFile = preset.OutputFile
×
185
                }
×
186
                if newTask.Priority == 0 {
×
187
                        newTask.Priority = preset.Priority
×
188
                }
×
189
                if preset.PreProcessing != nil && newTask.PreProcessing == nil {
×
190
                        newTask.PreProcessing = &dto.NewPrePostProcessing{ScriptPath: preset.PreProcessing.ScriptPath, SidecarPath: preset.PreProcessing.SidecarPath, ImportSidecar: preset.PreProcessing.ImportSidecar}
×
191
                }
×
192
                if preset.PostProcessing != nil && newTask.PostProcessing == nil {
×
193
                        newTask.PostProcessing = &dto.NewPrePostProcessing{ScriptPath: preset.PostProcessing.ScriptPath, SidecarPath: preset.PostProcessing.SidecarPath}
×
194
                }
×
195

196
                if preset.Webhooks != nil {
×
197
                        if newTask.Webhooks == nil {
×
198
                                newTask.Webhooks = preset.Webhooks
×
199
                        } else {
×
200
                                *newTask.Webhooks = append(*preset.Webhooks, *newTask.Webhooks...)
×
201
                        }
×
202
                }
203
        }
204

205
        // filter webhooks so only "task.*" events remain
206
        if newTask.Webhooks != nil {
7✔
207
                filtered := make(dto.DirectWebhooks, 0, len(*newTask.Webhooks))
×
208
                for _, wh := range *newTask.Webhooks {
×
209
                        if strings.HasPrefix(string(wh.Event), "task.") {
×
210
                                filtered = append(filtered, wh)
×
211
                        }
×
212
                }
213
                newTask.Webhooks = &filtered
×
214
        }
215

216
        task := &model.Task{
7✔
217
                UUID:             uuid.NewString(),
7✔
218
                Command:          &dto.RawResolved{Raw: newTask.Command},
7✔
219
                InputFile:        &dto.RawResolved{Raw: newTask.InputFile},
7✔
220
                OutputFile:       &dto.RawResolved{Raw: newTask.OutputFile},
7✔
221
                Metadata:         newTask.Metadata,
7✔
222
                Name:             newTask.Name,
7✔
223
                Priority:         newTask.Priority,
7✔
224
                Progress:         0,
7✔
225
                Source:           source,
7✔
226
                Status:           dto.Queued,
7✔
227
                Batch:            batch,
7✔
228
                Webhooks:         newTask.Webhooks,
7✔
229
                ClientIdentifier: cfg.GetString("ffmate.identifier"),
7✔
230
        }
7✔
231
        w, err := s.repository.Add(task)
7✔
232
        debug.Task.Info("created task (uuid: %s)", w.UUID)
7✔
233

7✔
234
        metrics.Gauge("task.created").Inc()
7✔
235
        s.webhookService.Fire(dto.TaskCreated, w.ToDTO())
7✔
236
        s.webhookService.FireDirect(w.Webhooks, dto.TaskCreated, w.ToDTO())
7✔
237
        s.websocketService.Broadcast(websocket.TaskCreated, w.ToDTO())
7✔
238

7✔
239
        return w, err
7✔
240
}
241

242
func (s *Service) AddBatch(newBatch *dto.NewBatch) (*dto.Batch, error) {
1✔
243
        batchUUID := uuid.NewString()
1✔
244
        tasks := []model.Task{}
1✔
245
        for _, task := range newBatch.Tasks {
2✔
246
                t, err := s.Add(task, "api", batchUUID)
1✔
247
                if err != nil {
1✔
248
                        return nil, err
×
249
                }
×
250
                tasks = append(tasks, *t)
1✔
251
        }
252

253
        // clear preset cache
254
        presetCache.Delete(batchUUID)
1✔
255

1✔
256
        // transform each task to its DTO
1✔
257
        var taskDTOs = []*dto.Task{}
1✔
258
        for _, task := range tasks {
2✔
259
                taskDTOs = append(taskDTOs, task.ToDTO())
1✔
260
        }
1✔
261

262
        metrics.Gauge("batch.created").Inc()
1✔
263
        s.webhookService.Fire(dto.BatCreated, taskDTOs)
1✔
264

1✔
265
        batch := &dto.Batch{
1✔
266
                UUID:  batchUUID,
1✔
267
                Tasks: taskDTOs,
1✔
268
        }
1✔
269

1✔
270
        return batch, nil
1✔
271
}
272

273
func (s *Service) Delete(uuid string) error {
2✔
274
        w, err := s.repository.First(uuid)
2✔
275
        if err != nil {
2✔
276
                return err
×
277
        }
×
278

279
        if w == nil {
3✔
280
                return errors.New("task for given uuid not found")
1✔
281
        }
1✔
282

283
        err = s.repository.Delete(w)
1✔
284
        if err != nil {
1✔
285
                debug.Log.Error("failed to delete task (uuid: %s)", uuid)
×
286
                return err
×
287
        }
×
288

289
        debug.Log.Info("deleted task (uuid: %s)", uuid)
1✔
290

1✔
291
        metrics.Gauge("task.deleted").Inc()
1✔
292
        s.webhookService.Fire(dto.TaskDeleted, w.ToDTO())
1✔
293
        s.webhookService.FireDirect(w.Webhooks, dto.TaskDeleted, w.ToDTO())
1✔
294
        s.websocketService.Broadcast(websocket.TaskDeleted, w.ToDTO())
1✔
295

1✔
296
        return nil
1✔
297
}
298

299
/**
300
 * Task processing
301
 */
302

303
func (s *Service) ProcessQueue() *Service {
38✔
304
        // lookup ffmpeg (path)
38✔
305
        if !s.checkFFmpeg() {
76✔
306
                go func() {
76✔
307
                        for {
76✔
308
                                time.Sleep(10 * time.Second)
38✔
309
                                if s.checkFFmpeg() {
38✔
310
                                        return
×
311
                                }
×
312
                        }
313
                }()
314
        }
315

316
        go s.processQueue()
38✔
317

38✔
318
        return s
38✔
319
}
320

321
func (s *Service) checkFFmpeg() bool {
38✔
322
        if !cfg.Has("ffmate.ffmpeg") || cfg.GetString("ffmate.ffmpeg") == "" {
41✔
323
                cfg.Set("ffmate.ffmpeg", "ffmpeg")
3✔
324
        }
3✔
325
        if path, err := exec.LookPath(cfg.GetString("ffmate.ffmpeg")); err != nil {
76✔
326
                debug.Task.Error("ffmpeg binary not found in PATH. Please install ffmpeg or set the path to the ffmpeg binary with the --ffmpeg flag. Error: %s", err)
38✔
327
        } else {
38✔
328
                cfg.Set("ffmate.ffmpeg", path)
×
329
                cfg.Set("ffmate.isFFmpeg", true)
×
330
                debug.Log.Info("ffmpeg binary found at '%s'", cfg.GetString("ffmate.ffmpeg"))
×
331
                return true
×
332
        }
×
333
        return false
38✔
334
}
335

336
func (s *Service) processQueue() {
38✔
337
        for {
76✔
338
                time.Sleep(1 * time.Second)
38✔
339

38✔
340
                if !cfg.GetBool("ffmate.isFFmpeg") {
38✔
341
                        debug.Task.Debug("ffmpeg not configured yet, skipping processing")
×
342
                        continue
×
343
                }
344

345
                taskQueueLength := s.taskQueueLength()
×
346
                var maxConcurrentTasks = cfg.GetInt("ffmate.maxConcurrentTasks")
×
347
                if maxConcurrentTasks == 0 || maxConcurrentTasks <= taskQueueLength {
×
348
                        debug.Task.Debug("maximum concurrent tasks reached (tasks: %d/%d)", taskQueueLength, maxConcurrentTasks)
×
349
                        continue
×
350
                }
351

352
                task, err := s.repository.NextQueued(maxConcurrentTasks - taskQueueLength)
×
353
                if err != nil {
×
354
                        debug.Log.Error("failed to receive queued task from db: %v", err)
×
355
                        continue
×
356
                }
357
                if task == nil || len(*task) == 0 {
×
358
                        debug.Task.Debug("no queued tasks found")
×
359
                        continue
×
360
                }
361

362
                for _, t := range *task {
×
363
                        go s.processNewTask(&t)
×
364
                }
×
365
        }
366
}
367

368
type taskContext struct {
369
        ctx    context.Context
370
        cancel context.CancelCauseFunc
371
}
372

373
// taskQueue holds taskContext for each running task.UUID
374
var taskQueue = sync.Map{}
375

376
func (s *Service) processNewTask(task *model.Task) {
×
377
        debug.Task.Info("processing task (uuid: %s)", task.UUID)
×
NEW
378

×
NEW
379
        ctx, cancel := context.WithCancelCause(context.Background())
×
NEW
380
        taskQueue.Store(task.UUID, taskContext{ctx, cancel})
×
381
        defer taskQueue.Delete(task.UUID)
×
382

×
383
        task.StartedAt = time.Now().UnixMilli()
×
384

×
385
        if err := s.runPreProcessing(task); err != nil {
×
386
                s.failTask(task, err)
×
387
                return
×
388
        }
×
389

390
        s.prepareTaskFiles(task)
×
391

×
392
        if err := s.createOutputDirectory(task); err != nil {
×
393
                s.failTask(task, err)
×
394
                return
×
395
        }
×
396

397
        if err := s.executeFFmpeg(task); err != nil {
×
398
                return // failure already handled inside executeFFmpeg
×
399
        }
×
400

401
        if err := s.runPostProcessing(task); err != nil {
×
402
                s.failTask(task, err)
×
403
                return
×
404
        }
×
405

406
        s.finalizeTask(task)
×
407
}
408

409
// updateRunningTask checks if a given task has status 'canceled' in the database before updating it.
410
// if task is already canceled, the related context is being canceled to stop the actual execution process (ffmpeg)
NEW
411
func (s *Service) updateRunningTask(task *model.Task) (*model.Task, error) {
×
NEW
412
        ctx, _ := taskQueue.Load(task.UUID)
×
NEW
413
        if t, err := s.repository.First(task.UUID); err != nil {
×
NEW
414
                debug.Task.Error("failed to receive task during running updates (uuid: %s): %v", task.UUID, err)
×
NEW
415
        } else {
×
NEW
416
                if t.Status == dto.DoneCanceled {
×
NEW
417
                        debug.Task.Debug("denied updating an already canceled task during processing, canceling context.. (uuid: %s)", task.UUID)
×
NEW
418
                        (ctx.(taskContext)).cancel(errors.New("task has been canceled"))
×
NEW
419
                        return task, nil
×
NEW
420
                }
×
421
        }
422

NEW
423
        return s.Update(task)
×
424
}
425

NEW
426
func (s *Service) cancelTask(task *model.Task) {
×
427
        task.FinishedAt = time.Now().UnixMilli()
×
428
        task.Progress = 100
×
429
        task.Status = dto.DoneCanceled
×
NEW
430
        _, err := s.Update(task)
×
431
        if err != nil {
×
432
                debug.Task.Error("failed to update task after cancel (uuid: %s)", task.UUID)
×
433
        }
×
NEW
434
        debug.Task.Info("task canceled (uuid: %s)", task.UUID)
×
435
}
436

437
func (s *Service) failTask(task *model.Task, err error) {
×
438
        task.FinishedAt = time.Now().UnixMilli()
×
439
        task.Progress = 100
×
440
        task.Status = dto.DoneError
×
441
        task.Error = err.Error()
×
442
        _, err = s.Update(task)
×
443
        if err != nil {
×
444
                debug.Task.Error("failed to update task after fail (uuid: %s)", task.UUID)
×
445
        }
×
446
        debug.Task.Warn("task failed (uuid: %s)", task.UUID)
×
447
}
448

449
func (s *Service) taskQueueLength() int {
×
450
        length := 0
×
451
        taskQueue.Range(func(_, _ any) bool {
×
452
                length++
×
453
                return true
×
454
        })
×
455
        return length
×
456
}
457

458
/**
459
 * CountAllStatus is used in systray
460
 */
461

462
func (s *Service) CountAllStatus() (queued, running, doneSuccessful, doneError, doneCanceled int, err error) {
×
463
        return s.repository.CountAllStatus()
×
464
}
×
465

466
func (s *Service) Name() string {
38✔
467
        return service.Task
38✔
468
}
38✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc