• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rom8726 / floxy / 18997003723

01 Nov 2025 12:53PM UTC coverage: 43.279% (-0.3%) from 43.619%
18997003723

push

github

rom8726
Add priority aging support in queue handling, prevent starvation.

- Introduced `SetAgingEnabled` and `SetAgingRate` methods in `Store` to manage priority aging configurations.
- Enhanced `DequeueStep` logic to support SQL-based priority aging based on wait time and rate.
- Added `WithQueueAgingEnabled` and `WithQueueAgingRate` options for `Engine` configuration.
- Updated microservices rollback example to demonstrate priority aging usage.

29 of 97 new or added lines in 3 files covered. (29.9%)

55 existing lines in 2 files now uncovered.

3764 of 8697 relevant lines covered (43.28%)

38.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/plugins.go
1
package floxy
2

3
import (
4
        "context"
5
        "fmt"
6
        "log/slog"
7
        "sort"
8
        "sync"
9
)
10

11
// Plugin represents a lifecycle hook system for workflows
12
type Plugin interface {
13
        // Name returns unique plugin identifier
14
        Name() string
15

16
        // Priority determines execution order (higher = earlier)
17
        Priority() Priority
18

19
        // Lifecycle hooks
20
        OnWorkflowStart(ctx context.Context, instance *WorkflowInstance) error
21
        OnWorkflowComplete(ctx context.Context, instance *WorkflowInstance) error
22
        OnWorkflowFailed(ctx context.Context, instance *WorkflowInstance) error
23
        OnStepStart(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep) error
24
        OnStepComplete(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep) error
25
        OnStepFailed(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep, err error) error
26
        OnRollbackStepChain(ctx context.Context, instanceID int64, stepName string, depth int) error
27
}
28

29
// BasePlugin provides default no-op implementations
30
type BasePlugin struct {
31
        name     string
32
        priority Priority
33
}
34

UNCOV
35
func NewBasePlugin(name string, priority Priority) BasePlugin {
×
UNCOV
36
        return BasePlugin{name: name, priority: priority}
×
UNCOV
37
}
×
38

UNCOV
39
func (p BasePlugin) Name() string { return p.name }
×
40

UNCOV
41
func (p BasePlugin) Priority() Priority { return p.priority }
×
42

43
func (p BasePlugin) OnWorkflowStart(context.Context, *WorkflowInstance) error {
×
44
        return nil
×
45
}
×
46

47
func (p BasePlugin) OnWorkflowComplete(context.Context, *WorkflowInstance) error {
×
UNCOV
48
        return nil
×
49
}
×
50

51
func (p BasePlugin) OnWorkflowFailed(context.Context, *WorkflowInstance) error {
×
52
        return nil
×
53
}
×
54

55
func (p BasePlugin) OnStepStart(context.Context, *WorkflowInstance, *WorkflowStep) error { return nil }
×
56

57
func (p BasePlugin) OnStepComplete(context.Context, *WorkflowInstance, *WorkflowStep) error {
×
UNCOV
58
        return nil
×
59
}
×
60

61
func (p BasePlugin) OnStepFailed(context.Context, *WorkflowInstance, *WorkflowStep, error) error {
×
UNCOV
62
        return nil
×
63
}
×
64

65
func (p BasePlugin) OnRollbackStepChain(context.Context, int64, string, int) error {
×
66
        return nil
×
67
}
×
68

69
// PluginManager manages plugin lifecycle
70
type PluginManager struct {
71
        plugins []Plugin
72
        mu      sync.RWMutex
73
}
74

75
func NewPluginManager() *PluginManager {
×
UNCOV
76
        return &PluginManager{
×
UNCOV
77
                plugins: make([]Plugin, 0),
×
UNCOV
78
        }
×
UNCOV
79
}
×
80

UNCOV
81
func (pm *PluginManager) Register(plugin Plugin) {
×
UNCOV
82
        pm.mu.Lock()
×
83
        defer pm.mu.Unlock()
×
84

×
85
        pm.plugins = append(pm.plugins, plugin)
×
86

×
87
        sort.Slice(pm.plugins, func(i, j int) bool {
×
UNCOV
88
                return pm.plugins[i].Priority() < pm.plugins[j].Priority()
×
89
        })
×
90
}
91

92
func (pm *PluginManager) ExecuteWorkflowStart(ctx context.Context, instance *WorkflowInstance) error {
×
93
        pm.mu.RLock()
×
94
        defer pm.mu.RUnlock()
×
95

×
96
        for _, plugin := range pm.plugins {
×
97
                if err := plugin.OnWorkflowStart(ctx, instance); err != nil {
×
UNCOV
98
                        return fmt.Errorf("plugin %s failed: %w", plugin.Name(), err)
×
UNCOV
99
                }
×
100
        }
101

102
        return nil
×
103
}
104

105
func (pm *PluginManager) ExecuteWorkflowComplete(ctx context.Context, instance *WorkflowInstance) error {
×
106
        pm.mu.RLock()
×
107
        defer pm.mu.RUnlock()
×
UNCOV
108

×
UNCOV
109
        for _, plugin := range pm.plugins {
×
110
                if err := plugin.OnWorkflowComplete(ctx, instance); err != nil {
×
UNCOV
111
                        slog.Error("[floxy] plugin error on workflow complete", "plugin", plugin.Name(), "error", err)
×
UNCOV
112
                }
×
113
        }
114

115
        return nil
×
116
}
117

118
func (pm *PluginManager) ExecuteWorkflowFailed(ctx context.Context, instance *WorkflowInstance) error {
×
119
        pm.mu.RLock()
×
120
        defer pm.mu.RUnlock()
×
UNCOV
121

×
UNCOV
122
        for _, plugin := range pm.plugins {
×
123
                if err := plugin.OnWorkflowFailed(ctx, instance); err != nil {
×
UNCOV
124
                        slog.Error("[floxy] plugin error on workflow failed", "plugin", plugin.Name(), "error", err)
×
UNCOV
125
                }
×
126
        }
127

128
        return nil
×
129
}
130

131
func (pm *PluginManager) ExecuteStepStart(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep) error {
×
132
        pm.mu.RLock()
×
133
        defer pm.mu.RUnlock()
×
UNCOV
134

×
UNCOV
135
        for _, plugin := range pm.plugins {
×
136
                if err := plugin.OnStepStart(ctx, instance, step); err != nil {
×
UNCOV
137
                        return fmt.Errorf("plugin %s failed: %w", plugin.Name(), err)
×
UNCOV
138
                }
×
139
        }
140

141
        return nil
×
142
}
143

144
func (pm *PluginManager) ExecuteStepComplete(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep) error {
×
145
        pm.mu.RLock()
×
146
        defer pm.mu.RUnlock()
×
UNCOV
147

×
UNCOV
148
        for _, plugin := range pm.plugins {
×
149
                if err := plugin.OnStepComplete(ctx, instance, step); err != nil {
×
UNCOV
150
                        slog.Error("[floxy] plugin error on step complete", "plugin", plugin.Name(), "error", err)
×
UNCOV
151
                }
×
152
        }
153

154
        return nil
×
155
}
156

157
func (pm *PluginManager) ExecuteStepFailed(ctx context.Context, instance *WorkflowInstance, step *WorkflowStep, err error) error {
×
158
        pm.mu.RLock()
×
159
        defer pm.mu.RUnlock()
×
UNCOV
160

×
UNCOV
161
        for _, plugin := range pm.plugins {
×
162
                if pluginErr := plugin.OnStepFailed(ctx, instance, step, err); pluginErr != nil {
×
UNCOV
163
                        slog.Error("[floxy] plugin error on step failed", "plugin", plugin.Name(), "error", err)
×
UNCOV
164
                }
×
165
        }
166

167
        return nil
×
168
}
169

170
func (pm *PluginManager) ExecuteRollbackStepChain(ctx context.Context, instanceID int64, stepName string, depth int) error {
×
171
        pm.mu.RLock()
×
172
        defer pm.mu.RUnlock()
×
UNCOV
173

×
UNCOV
174
        for _, plugin := range pm.plugins {
×
175
                if err := plugin.OnRollbackStepChain(ctx, instanceID, stepName, depth); err != nil {
×
UNCOV
176
                        slog.Error("[floxy] plugin error on rollback step chain", "plugin", plugin.Name(), "error", err)
×
UNCOV
177
                }
×
178
        }
179

180
        return nil
×
181
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc