• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

welovemedia / ffmate / 18079104130

28 Sep 2025 07:59PM UTC coverage: 64.371% (-0.2%) from 64.608%
18079104130

Pull #23

github

YoSev
fix: enable cancelation of running task during pre, post and ffmpeg processing
Pull Request #23: fix: enable cancelation of running task during ffmpeg processing

0 of 32 new or added lines in 2 files covered. (0.0%)

2 existing lines in 2 files now uncovered.

2206 of 3427 relevant lines covered (64.37%)

13.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/internal/service/task/processing.go
1
package task
2

3
import (
4
        "bytes"
5
        "context"
6
        "encoding/json"
7
        "errors"
8
        "fmt"
9
        "os"
10
        "os/exec"
11
        "path/filepath"
12
        "strconv"
13
        "time"
14

15
        "github.com/mattn/go-shellwords"
16
        "github.com/welovemedia/ffmate/v2/internal/database/model"
17
        "github.com/welovemedia/ffmate/v2/internal/debug"
18
        "github.com/welovemedia/ffmate/v2/internal/dto"
19
        "github.com/welovemedia/ffmate/v2/internal/metrics"
20
        "github.com/welovemedia/ffmate/v2/internal/service/ffmpeg"
21
)
22

23
func (s *Service) runPreProcessing(task *model.Task) error {
×
24
        if err := s.prePostProcessTask(task, task.PreProcessing, "pre"); err != nil {
×
25
                return fmt.Errorf("PreProcessing failed: %v", err)
×
26
        }
×
27
        return nil
×
28
}
29

30
func (s *Service) prepareTaskFiles(task *model.Task) {
×
31
        task.InputFile.Resolved = s.wildcardReplacer(task.InputFile.Raw, task.InputFile.Raw, task.OutputFile.Raw, task.Source, task.Metadata)
×
32
        task.OutputFile.Resolved = s.wildcardReplacer(task.OutputFile.Raw, task.InputFile.Raw, task.OutputFile.Raw, task.Source, task.Metadata)
×
33
        task.Command.Resolved = s.wildcardReplacer(task.Command.Raw, task.InputFile.Resolved, task.OutputFile.Resolved, task.Source, task.Metadata)
×
34

×
35
        task.Status = dto.Running
×
NEW
36
        if _, err := s.updateRunningTask(task); err != nil {
×
37
                debug.Log.Error("failed to save task (uuid: %s)", task.UUID)
×
38
        }
×
39
}
40

41
func (s *Service) createOutputDirectory(task *model.Task) error {
×
42
        if err := os.MkdirAll(filepath.Dir(task.OutputFile.Resolved), 0755); err != nil {
×
43
                return fmt.Errorf("failed to create non-existing output directory: %v", err)
×
44
        }
×
45
        return nil
×
46
}
47

48
func (s *Service) executeFFmpeg(task *model.Task) error {
×
49
        debug.Task.Debug("starting ffmpeg process (uuid: %s)", task.UUID)
×
50

×
NEW
51
        ctx, _ := taskQueue.Load(task.UUID)
×
52

×
53
        err := s.ffmpegService.Execute(&ffmpeg.ExecutionRequest{
×
54
                Task:    task,
×
55
                Command: task.Command.Resolved,
×
NEW
56
                Ctx:     ctx.(taskContext).ctx,
×
57
                UpdateFunc: func(progress, remaining float64) {
×
58
                        task.Progress = progress
×
59
                        task.Remaining = remaining
×
NEW
60
                        if _, err := s.updateRunningTask(task); err != nil {
×
NEW
61
                                debug.Log.Error("failed to save task (uuid: %s): %v", task.UUID, err)
×
UNCOV
62
                        }
×
63
                },
64
        })
65

66
        task.Progress = 100
×
67
        task.Remaining = -1
×
68

×
69
        if err != nil {
×
NEW
70
                if cause := context.Cause(ctx.(taskContext).ctx); cause != nil {
×
NEW
71
                        s.cancelTask(task)
×
72
                        return cause
×
73
                }
×
74

NEW
75
                debug.Task.Debug("finished processing with error (uuid: %s): %v", task.UUID, err)
×
76
                s.failTask(task, err)
×
77
                return err
×
78
        }
79

80
        debug.Task.Debug("finished processing (uuid: %s)", task.UUID)
×
81
        return nil
×
82
}
83

84
func (s *Service) runPostProcessing(task *model.Task) error {
×
85
        if err := s.prePostProcessTask(task, task.PostProcessing, "post"); err != nil {
×
86
                return fmt.Errorf("PostProcessing failed: %v", err)
×
87
        }
×
88
        return nil
×
89
}
90

91
func (s *Service) finalizeTask(task *model.Task) {
×
92
        task.FinishedAt = time.Now().UnixMilli()
×
93
        task.Status = dto.DoneSuccessful
×
NEW
94
        if _, err := s.updateRunningTask(task); err != nil {
×
95
                debug.Log.Error("failed to save task (uuid: %s)", task.UUID)
×
96
        }
×
97
        debug.Task.Info("task successful (uuid: %s)", task.UUID)
×
98
}
99

100
func (s *Service) prePostProcessTask(task *model.Task, processor *dto.PrePostProcessing, processorType string) error {
×
101
        if processor == nil || (processor.SidecarPath == nil && processor.ScriptPath == nil) {
×
102
                return nil
×
103
        }
×
104

105
        s.trackProcessingMetrics(processor, processorType)
×
106
        s.initializeProcessing(task, processor, processorType)
×
107

×
108
        if err := s.handleSidecar(task, processor, processorType); err != nil {
×
109
                return err
×
110
        }
×
111

112
        if err := s.handleScriptExecution(task, processor, processorType); err != nil {
×
113
                return err
×
114
        }
×
115

116
        if err := s.reimportSidecarIfNeeded(task, processor, processorType); err != nil {
×
117
                return err
×
118
        }
×
119

120
        return s.finalizeProcessing(processor, processorType, task)
×
121
}
122

123
func (s *Service) trackProcessingMetrics(processor *dto.PrePostProcessing, processorType string) {
×
124
        sidecarEmpty := strconv.FormatBool(processor.SidecarPath != nil && processor.SidecarPath.Raw == "")
×
125
        scriptEmpty := strconv.FormatBool(processor.ScriptPath != nil && processor.ScriptPath.Raw == "")
×
126

×
127
        metricName := "task.preProcessing"
×
128
        if processorType == "post" {
×
129
                metricName = "task.postProcessing"
×
130
        }
×
131
        metrics.GaugeVec(metricName).WithLabelValues(sidecarEmpty, scriptEmpty).Inc()
×
132
}
133

134
func (s *Service) initializeProcessing(task *model.Task, processor *dto.PrePostProcessing, processorType string) {
×
135
        debug.Task.Debug("starting %sProcessing (uuid: %s)", processorType, task.UUID)
×
136
        processor.StartedAt = time.Now().UnixMilli()
×
137

×
138
        if processorType == "pre" {
×
139
                task.Status = dto.PreProcessing
×
140
        } else {
×
141
                task.Status = dto.PostProcessing
×
142
        }
×
143

NEW
144
        if _, err := s.updateRunningTask(task); err != nil {
×
145
                debug.Log.Error("failed to save task (uuid: %s)", task.UUID)
×
146
        }
×
147
}
148

149
func (s *Service) handleSidecar(task *model.Task, processor *dto.PrePostProcessing, processorType string) error {
×
150
        if processor.SidecarPath == nil || processor.SidecarPath.Raw == "" {
×
151
                return nil
×
152
        }
×
153

154
        // Resolve path and save
155
        if processorType == "pre" {
×
156
                processor.SidecarPath.Resolved = s.wildcardReplacer(processor.SidecarPath.Raw, task.InputFile.Raw, task.OutputFile.Raw, task.Source, task.Metadata)
×
157
        } else {
×
158
                processor.SidecarPath.Resolved = s.wildcardReplacer(processor.SidecarPath.Raw, task.InputFile.Resolved, task.OutputFile.Resolved, task.Source, task.Metadata)
×
159
        }
×
NEW
160
        if _, err := s.updateRunningTask(task); err != nil {
×
161
                debug.Log.Error("failed to save task (uuid: %s)", task.UUID)
×
162
        }
×
163

164
        // Write file
165
        data, err := json.Marshal(task.ToDTO())
×
166
        if err != nil {
×
167
                debug.Log.Error("failed to marshal task to write sidecar file: %v", err)
×
168
                return nil
×
169
        }
×
170

171
        if err := os.MkdirAll(filepath.Dir(processor.SidecarPath.Resolved), 0755); err != nil {
×
172
                return err
×
173
        }
×
174
        if err := os.WriteFile(processor.SidecarPath.Resolved, data, 0644); err != nil {
×
175
                processor.Error = fmt.Errorf("failed to write sidecar: %v", err).Error()
×
176
                debug.Log.Error("failed to write sidecar file: %v", err)
×
177
        }
×
178
        return nil
×
179
}
180

181
func (s *Service) handleScriptExecution(task *model.Task, processor *dto.PrePostProcessing, processorType string) error {
×
182
        if processor.Error != "" || processor.ScriptPath == nil || processor.ScriptPath.Raw == "" {
×
183
                return nil
×
184
        }
×
185

186
        if processorType == "pre" {
×
187
                processor.ScriptPath.Resolved = s.wildcardReplacer(processor.ScriptPath.Raw, task.InputFile.Raw, task.OutputFile.Raw, task.Source, task.Metadata)
×
188
        } else {
×
189
                processor.ScriptPath.Resolved = s.wildcardReplacer(processor.ScriptPath.Raw, task.InputFile.Resolved, task.OutputFile.Resolved, task.Source, task.Metadata)
×
190
        }
×
NEW
191
        if _, err := s.updateRunningTask(task); err != nil {
×
192
                debug.Log.Error("failed to save task (uuid: %s)", task.UUID)
×
193
        }
×
194

195
        args, err := shellwords.NewParser().Parse(processor.ScriptPath.Resolved)
×
196
        if err != nil {
×
197
                processor.Error = err.Error()
×
198
                debug.Task.Debug("failed to parse %sProcessing script (uuid: %s): %v", processorType, task.UUID, err)
×
199
                return nil
×
200
        }
×
201

NEW
202
        ctx, _ := taskQueue.Load(task.UUID)
×
NEW
203

×
NEW
204
        cmd := exec.CommandContext(ctx.(taskContext).ctx, args[0], args[1:]...)
×
205
        var stderr bytes.Buffer
×
206
        cmd.Stderr = &stderr
×
207

×
208
        if err := cmd.Start(); err != nil || cmd.Wait() != nil {
×
209
                processor.Error = fmt.Sprintf("%s (exit code: %d)", stderr.String(), cmd.ProcessState.ExitCode())
×
210
                debug.Task.Debug("script failed (uuid: %s): stderr: %s", task.UUID, stderr.String())
×
211
        }
×
212
        return nil
×
213
}
214

215
func (s *Service) reimportSidecarIfNeeded(task *model.Task, processor *dto.PrePostProcessing, processorType string) error {
×
216
        if processorType != "pre" || processor.SidecarPath == nil || processor.SidecarPath.Raw == "" || !processor.ImportSidecar {
×
217
                return nil
×
218
        }
×
219
        data, err := os.ReadFile(processor.SidecarPath.Resolved)
×
220
        if err != nil {
×
221
                return err
×
222
        }
×
223
        if err := json.Unmarshal(data, task); err != nil {
×
224
                return err
×
225
        }
×
226
        debug.Task.Debug("re-imported sidecar file (uuid: %s)", task.UUID)
×
227
        return nil
×
228
}
229

230
func (s *Service) finalizeProcessing(processor *dto.PrePostProcessing, processorType string, task *model.Task) error {
×
231
        processor.FinishedAt = time.Now().UnixMilli()
×
232
        if processor.Error != "" {
×
233
                debug.Task.Info("finished %sProcessing with error (uuid: %s)", processorType, task.UUID)
×
234
                return errors.New(processor.Error)
×
235
        }
×
236
        debug.Task.Info("finished %sProcessing (uuid: %s)", processorType, task.UUID)
×
237
        return nil
×
238
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc