• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

welovemedia / ffmate / 18079473215

28 Sep 2025 08:33PM UTC coverage: 64.12% (-0.5%) from 64.608%
18079473215

push

github

YoSev
fix: enable cancelation of running task during pre, post and ffmpeg processing

0 of 56 new or added lines in 2 files covered. (0.0%)

1 existing line in 1 file now uncovered.

2207 of 3442 relevant lines covered (64.12%)

13.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/internal/service/task/processing.go
1
package task
2

3
import (
4
        "bytes"
5
        "context"
6
        "encoding/json"
7
        "errors"
8
        "fmt"
9
        "os"
10
        "os/exec"
11
        "path/filepath"
12
        "strconv"
13
        "time"
14

15
        "github.com/mattn/go-shellwords"
16
        "github.com/welovemedia/ffmate/v2/internal/database/model"
17
        "github.com/welovemedia/ffmate/v2/internal/debug"
18
        "github.com/welovemedia/ffmate/v2/internal/dto"
19
        "github.com/welovemedia/ffmate/v2/internal/metrics"
20
        "github.com/welovemedia/ffmate/v2/internal/service/ffmpeg"
21
)
22

23
func (s *Service) runPreProcessing(task *model.Task) error {
×
24
        if err := s.prePostProcessTask(task, task.PreProcessing, "pre"); err != nil {
×
25
                return fmt.Errorf("PreProcessing failed: %v", err)
×
26
        }
×
27
        return nil
×
28
}
29

30
func (s *Service) prepareTaskFiles(task *model.Task) {
×
31
        task.InputFile.Resolved = s.wildcardReplacer(task.InputFile.Raw, task.InputFile.Raw, task.OutputFile.Raw, task.Source, task.Metadata)
×
32
        task.OutputFile.Resolved = s.wildcardReplacer(task.OutputFile.Raw, task.InputFile.Raw, task.OutputFile.Raw, task.Source, task.Metadata)
×
33
        task.Command.Resolved = s.wildcardReplacer(task.Command.Raw, task.InputFile.Resolved, task.OutputFile.Resolved, task.Source, task.Metadata)
×
34

×
35
        task.Status = dto.Running
×
NEW
36
        if _, err := s.updateRunningTask(task); err != nil {
×
37
                debug.Log.Error("failed to save task (uuid: %s)", task.UUID)
×
38
        }
×
39
}
40

41
func (s *Service) createOutputDirectory(task *model.Task) error {
×
42
        if err := os.MkdirAll(filepath.Dir(task.OutputFile.Resolved), 0755); err != nil {
×
43
                return fmt.Errorf("failed to create non-existing output directory: %v", err)
×
44
        }
×
45
        return nil
×
46
}
47

48
func (s *Service) executeFFmpeg(task *model.Task) error {
×
49
        debug.Task.Debug("starting ffmpeg process (uuid: %s)", task.UUID)
×
50

×
NEW
51
        var err error
×
52

×
NEW
53
        // load taskContext from taskQueue
×
NEW
54
        if ctxVal, ok := taskQueue.Load(task.UUID); !ok {
×
NEW
55
                err = fmt.Errorf("taskContext for task has not been found (uuid: %s)", task.UUID)
×
NEW
56
        } else {
×
NEW
57
                // typecast ctxVal to taskContext
×
NEW
58
                if ctx, ok2 := ctxVal.(taskContext); !ok2 {
×
NEW
59
                        err = fmt.Errorf("taskQueue context for uuid %s is not of type taskContext", task.UUID)
×
NEW
60
                } else {
×
NEW
61
                        // executed ffmpeg with found context
×
NEW
62
                        err = s.ffmpegService.Execute(&ffmpeg.ExecutionRequest{
×
NEW
63
                                Task:    task,
×
NEW
64
                                Command: task.Command.Resolved,
×
NEW
65
                                Ctx:     ctx.ctx,
×
NEW
66
                                UpdateFunc: func(progress, remaining float64) {
×
NEW
67
                                        task.Progress = progress
×
NEW
68
                                        task.Remaining = remaining
×
NEW
69
                                        if _, err := s.updateRunningTask(task); err != nil {
×
NEW
70
                                                debug.Log.Error("failed to save task (uuid: %s): %v", task.UUID, err)
×
NEW
71
                                        }
×
72
                                },
73
                        })
74

NEW
75
                        task.Progress = 100
×
NEW
76
                        task.Remaining = -1
×
NEW
77

×
NEW
78
                        // check for cancelation first
×
NEW
79
                        if cause := context.Cause(ctx.ctx); cause != nil {
×
NEW
80
                                s.cancelTask(task)
×
NEW
81
                                return cause
×
NEW
82
                        }
×
83
                }
84
        }
85

86
        // check for ffmpeg related errors
87
        if err != nil {
×
88
                debug.Task.Debug("finished processing with error (uuid: %s): %v", task.UUID, err)
×
89
                s.failTask(task, err)
×
90
                return err
×
UNCOV
91
        }
×
92

93
        debug.Task.Debug("finished processing (uuid: %s)", task.UUID)
×
94
        return nil
×
95
}
96

97
func (s *Service) runPostProcessing(task *model.Task) error {
×
98
        if err := s.prePostProcessTask(task, task.PostProcessing, "post"); err != nil {
×
99
                return fmt.Errorf("PostProcessing failed: %v", err)
×
100
        }
×
101
        return nil
×
102
}
103

104
func (s *Service) finalizeTask(task *model.Task) {
×
105
        task.FinishedAt = time.Now().UnixMilli()
×
106
        task.Status = dto.DoneSuccessful
×
NEW
107
        if _, err := s.updateRunningTask(task); err != nil {
×
108
                debug.Log.Error("failed to save task (uuid: %s)", task.UUID)
×
109
        }
×
110
        debug.Task.Info("task successful (uuid: %s)", task.UUID)
×
111
}
112

113
func (s *Service) prePostProcessTask(task *model.Task, processor *dto.PrePostProcessing, processorType string) error {
×
114
        if processor == nil || (processor.SidecarPath == nil && processor.ScriptPath == nil) {
×
115
                return nil
×
116
        }
×
117

118
        s.trackProcessingMetrics(processor, processorType)
×
119
        s.initializeProcessing(task, processor, processorType)
×
120

×
121
        if err := s.handleSidecar(task, processor, processorType); err != nil {
×
122
                return err
×
123
        }
×
124

125
        if err := s.handleScriptExecution(task, processor, processorType); err != nil {
×
126
                return err
×
127
        }
×
128

129
        if err := s.reimportSidecarIfNeeded(task, processor, processorType); err != nil {
×
130
                return err
×
131
        }
×
132

133
        return s.finalizeProcessing(processor, processorType, task)
×
134
}
135

136
func (s *Service) trackProcessingMetrics(processor *dto.PrePostProcessing, processorType string) {
×
137
        sidecarEmpty := strconv.FormatBool(processor.SidecarPath != nil && processor.SidecarPath.Raw == "")
×
138
        scriptEmpty := strconv.FormatBool(processor.ScriptPath != nil && processor.ScriptPath.Raw == "")
×
139

×
140
        metricName := "task.preProcessing"
×
141
        if processorType == "post" {
×
142
                metricName = "task.postProcessing"
×
143
        }
×
144
        metrics.GaugeVec(metricName).WithLabelValues(sidecarEmpty, scriptEmpty).Inc()
×
145
}
146

147
func (s *Service) initializeProcessing(task *model.Task, processor *dto.PrePostProcessing, processorType string) {
×
148
        debug.Task.Debug("starting %sProcessing (uuid: %s)", processorType, task.UUID)
×
149
        processor.StartedAt = time.Now().UnixMilli()
×
150

×
151
        if processorType == "pre" {
×
152
                task.Status = dto.PreProcessing
×
153
        } else {
×
154
                task.Status = dto.PostProcessing
×
155
        }
×
156

NEW
157
        if _, err := s.updateRunningTask(task); err != nil {
×
158
                debug.Log.Error("failed to save task (uuid: %s)", task.UUID)
×
159
        }
×
160
}
161

162
func (s *Service) handleSidecar(task *model.Task, processor *dto.PrePostProcessing, processorType string) error {
×
163
        if processor.SidecarPath == nil || processor.SidecarPath.Raw == "" {
×
164
                return nil
×
165
        }
×
166

167
        // Resolve path and save
168
        if processorType == "pre" {
×
169
                processor.SidecarPath.Resolved = s.wildcardReplacer(processor.SidecarPath.Raw, task.InputFile.Raw, task.OutputFile.Raw, task.Source, task.Metadata)
×
170
        } else {
×
171
                processor.SidecarPath.Resolved = s.wildcardReplacer(processor.SidecarPath.Raw, task.InputFile.Resolved, task.OutputFile.Resolved, task.Source, task.Metadata)
×
172
        }
×
NEW
173
        if _, err := s.updateRunningTask(task); err != nil {
×
174
                debug.Log.Error("failed to save task (uuid: %s)", task.UUID)
×
175
        }
×
176

177
        // Write file
178
        data, err := json.Marshal(task.ToDTO())
×
179
        if err != nil {
×
180
                debug.Log.Error("failed to marshal task to write sidecar file: %v", err)
×
181
                return nil
×
182
        }
×
183

184
        if err := os.MkdirAll(filepath.Dir(processor.SidecarPath.Resolved), 0755); err != nil {
×
185
                return err
×
186
        }
×
187
        if err := os.WriteFile(processor.SidecarPath.Resolved, data, 0644); err != nil {
×
188
                processor.Error = fmt.Errorf("failed to write sidecar: %v", err).Error()
×
189
                debug.Log.Error("failed to write sidecar file: %v", err)
×
190
        }
×
191
        return nil
×
192
}
193

194
func (s *Service) handleScriptExecution(task *model.Task, processor *dto.PrePostProcessing, processorType string) error {
×
195
        if processor.Error != "" || processor.ScriptPath == nil || processor.ScriptPath.Raw == "" {
×
196
                return nil
×
197
        }
×
198

199
        if processorType == "pre" {
×
200
                processor.ScriptPath.Resolved = s.wildcardReplacer(processor.ScriptPath.Raw, task.InputFile.Raw, task.OutputFile.Raw, task.Source, task.Metadata)
×
201
        } else {
×
202
                processor.ScriptPath.Resolved = s.wildcardReplacer(processor.ScriptPath.Raw, task.InputFile.Resolved, task.OutputFile.Resolved, task.Source, task.Metadata)
×
203
        }
×
NEW
204
        if _, err := s.updateRunningTask(task); err != nil {
×
205
                debug.Log.Error("failed to save task (uuid: %s)", task.UUID)
×
206
        }
×
207

208
        args, err := shellwords.NewParser().Parse(processor.ScriptPath.Resolved)
×
209
        if err != nil {
×
210
                processor.Error = err.Error()
×
211
                debug.Task.Debug("failed to parse %sProcessing script (uuid: %s): %v", processorType, task.UUID, err)
×
212
                return nil
×
213
        }
×
214

215
        cmd := exec.Command(args[0], args[1:]...)
×
216
        var stderr bytes.Buffer
×
217
        cmd.Stderr = &stderr
×
218

×
219
        if err := cmd.Start(); err != nil || cmd.Wait() != nil {
×
220
                processor.Error = fmt.Sprintf("%s (exit code: %d)", stderr.String(), cmd.ProcessState.ExitCode())
×
221
                debug.Task.Debug("script failed (uuid: %s): stderr: %s", task.UUID, stderr.String())
×
222
        }
×
223
        return nil
×
224
}
225

226
func (s *Service) reimportSidecarIfNeeded(task *model.Task, processor *dto.PrePostProcessing, processorType string) error {
×
227
        if processorType != "pre" || processor.SidecarPath == nil || processor.SidecarPath.Raw == "" || !processor.ImportSidecar {
×
228
                return nil
×
229
        }
×
230
        data, err := os.ReadFile(processor.SidecarPath.Resolved)
×
231
        if err != nil {
×
232
                return err
×
233
        }
×
234
        if err := json.Unmarshal(data, task); err != nil {
×
235
                return err
×
236
        }
×
237
        debug.Task.Debug("re-imported sidecar file (uuid: %s)", task.UUID)
×
238
        return nil
×
239
}
240

241
func (s *Service) finalizeProcessing(processor *dto.PrePostProcessing, processorType string, task *model.Task) error {
×
242
        processor.FinishedAt = time.Now().UnixMilli()
×
243
        if processor.Error != "" {
×
244
                debug.Task.Info("finished %sProcessing with error (uuid: %s)", processorType, task.UUID)
×
245
                return errors.New(processor.Error)
×
246
        }
×
247
        debug.Task.Info("finished %sProcessing (uuid: %s)", processorType, task.UUID)
×
248
        return nil
×
249
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc