• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rom8726 / floxy / 19597805525

22 Nov 2025 03:51PM UTC coverage: 44.512% (-0.8%) from 45.311%
19597805525

Pull #21

github

rom8726
Add TTL support for telemetry spans and workflow contexts with cleanup logic.
Pull Request #21: Telemetry plugin implemented

12 of 218 new or added lines in 2 files covered. (5.5%)

10 existing lines in 1 file now uncovered.

5069 of 11388 relevant lines covered (44.51%)

68.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/plugins/engine/telemetry/plugin.go
1
package telemetry
2

3
import (
4
        "context"
5
        "fmt"
6
        "sync"
7
        "time"
8

9
        "go.opentelemetry.io/otel"
10
        "go.opentelemetry.io/otel/attribute"
11
        "go.opentelemetry.io/otel/codes"
12
        "go.opentelemetry.io/otel/trace"
13

14
        "github.com/rom8726/floxy"
15
)
16

17
var _ floxy.Plugin = (*TelemetryPlugin)(nil)
18

19
type spanEntry struct {
20
        span      trace.Span
21
        createdAt time.Time
22
        stepType  floxy.StepType
23
}
24

25
type workflowCtxEntry struct {
26
        ctx       context.Context
27
        createdAt time.Time
28
}
29

30
type TelemetryPlugin struct {
31
        floxy.BasePlugin
32

33
        tracer       trace.Tracer
34
        mu           sync.RWMutex
35
        spans        map[string]*spanEntry
36
        workflowCtxs map[int64]*workflowCtxEntry
37
        defaultTTL   time.Duration
38
        humanStepTTL time.Duration
39
}
40

41
type TelemetryOption func(*TelemetryPlugin)
42

NEW
43
func WithDefaultTTL(ttl time.Duration) TelemetryOption {
×
NEW
44
        return func(p *TelemetryPlugin) {
×
NEW
45
                p.defaultTTL = ttl
×
NEW
46
        }
×
47
}
48

NEW
49
func WithHumanStepTTL(ttl time.Duration) TelemetryOption {
×
NEW
50
        return func(p *TelemetryPlugin) {
×
NEW
51
                p.humanStepTTL = ttl
×
NEW
52
        }
×
53
}
54

NEW
55
func New(tracer trace.Tracer, opts ...TelemetryOption) *TelemetryPlugin {
×
NEW
56
        if tracer == nil {
×
NEW
57
                tracer = otel.Tracer("floxy")
×
NEW
58
        }
×
59

NEW
60
        plugin := &TelemetryPlugin{
×
NEW
61
                BasePlugin:   floxy.NewBasePlugin("telemetry", floxy.PriorityHigh),
×
NEW
62
                tracer:       tracer,
×
NEW
63
                spans:        make(map[string]*spanEntry),
×
NEW
64
                workflowCtxs: make(map[int64]*workflowCtxEntry),
×
NEW
65
                defaultTTL:   1 * time.Hour,
×
NEW
66
                humanStepTTL: 24 * time.Hour,
×
NEW
67
        }
×
NEW
68

×
NEW
69
        for _, opt := range opts {
×
NEW
70
                opt(plugin)
×
NEW
71
        }
×
72

NEW
73
        return plugin
×
74
}
75

NEW
76
func (p *TelemetryPlugin) OnWorkflowStart(ctx context.Context, instance *floxy.WorkflowInstance) error {
×
NEW
77
        p.mu.Lock()
×
NEW
78
        defer p.mu.Unlock()
×
NEW
79

×
NEW
80
        spanName := fmt.Sprintf("workflow.%s", instance.WorkflowID)
×
NEW
81
        workflowCtx, span := p.tracer.Start(ctx, spanName, trace.WithSpanKind(trace.SpanKindServer))
×
NEW
82

×
NEW
83
        span.SetAttributes(
×
NEW
84
                attribute.Int64("instance.id", instance.ID),
×
NEW
85
                attribute.String("instance.workflow_id", instance.WorkflowID),
×
NEW
86
                attribute.String("instance.status", string(instance.Status)),
×
NEW
87
        )
×
NEW
88

×
NEW
89
        spanKey := fmt.Sprintf("workflow:%d", instance.ID)
×
NEW
90
        p.spans[spanKey] = &spanEntry{
×
NEW
91
                span:      span,
×
NEW
92
                createdAt: time.Now(),
×
NEW
93
        }
×
NEW
94
        p.workflowCtxs[instance.ID] = &workflowCtxEntry{
×
NEW
95
                ctx:       workflowCtx,
×
NEW
96
                createdAt: time.Now(),
×
NEW
97
        }
×
NEW
98

×
NEW
99
        p.cleanupExpired()
×
NEW
100

×
NEW
101
        return nil
×
NEW
102
}
×
103

NEW
104
func (p *TelemetryPlugin) OnWorkflowComplete(ctx context.Context, instance *floxy.WorkflowInstance) error {
×
NEW
105
        p.mu.Lock()
×
NEW
106
        defer p.mu.Unlock()
×
NEW
107

×
NEW
108
        spanKey := fmt.Sprintf("workflow:%d", instance.ID)
×
NEW
109
        if entry, ok := p.spans[spanKey]; ok {
×
NEW
110
                entry.span.SetAttributes(
×
NEW
111
                        attribute.String("instance.status", string(instance.Status)),
×
NEW
112
                )
×
NEW
113
                entry.span.SetStatus(codes.Ok, "workflow completed")
×
NEW
114
                entry.span.End()
×
NEW
115
                delete(p.spans, spanKey)
×
NEW
116
        }
×
NEW
117
        delete(p.workflowCtxs, instance.ID)
×
NEW
118

×
NEW
119
        return nil
×
120
}
121

NEW
122
func (p *TelemetryPlugin) OnWorkflowFailed(ctx context.Context, instance *floxy.WorkflowInstance) error {
×
NEW
123
        p.mu.Lock()
×
NEW
124
        defer p.mu.Unlock()
×
NEW
125

×
NEW
126
        spanKey := fmt.Sprintf("workflow:%d", instance.ID)
×
NEW
127
        if entry, ok := p.spans[spanKey]; ok {
×
NEW
128
                entry.span.SetAttributes(
×
NEW
129
                        attribute.String("instance.status", string(instance.Status)),
×
NEW
130
                )
×
NEW
131
                if instance.Error != nil {
×
NEW
132
                        entry.span.SetAttributes(attribute.String("instance.error", *instance.Error))
×
NEW
133
                }
×
NEW
134
                entry.span.SetStatus(codes.Error, "workflow failed")
×
NEW
135
                entry.span.End()
×
NEW
136
                delete(p.spans, spanKey)
×
137
        }
NEW
138
        delete(p.workflowCtxs, instance.ID)
×
NEW
139

×
NEW
140
        return nil
×
141
}
142

143
func (p *TelemetryPlugin) OnStepStart(
144
        ctx context.Context,
145
        instance *floxy.WorkflowInstance,
146
        step *floxy.WorkflowStep,
NEW
147
) error {
×
NEW
148
        p.mu.Lock()
×
NEW
149
        defer p.mu.Unlock()
×
NEW
150

×
NEW
151
        // Use workflow context if available to link spans
×
NEW
152
        stepCtx := ctx
×
NEW
153
        if entry, ok := p.workflowCtxs[instance.ID]; ok {
×
NEW
154
                stepCtx = entry.ctx
×
NEW
155
        }
×
156

NEW
157
        spanName := fmt.Sprintf("step.%s", step.StepName)
×
NEW
158
        _, span := p.tracer.Start(stepCtx, spanName, trace.WithSpanKind(trace.SpanKindInternal))
×
NEW
159

×
NEW
160
        attrs := []attribute.KeyValue{
×
NEW
161
                attribute.Int64("step.id", step.ID),
×
NEW
162
                attribute.Int64("step.instance_id", step.InstanceID),
×
NEW
163
                attribute.String("step.step_name", step.StepName),
×
NEW
164
                attribute.String("step.step_type", string(step.StepType)),
×
NEW
165
                attribute.String("step.status", string(step.Status)),
×
NEW
166
                attribute.Int("step.retry_count", step.RetryCount),
×
NEW
167
                attribute.Int("step.compensation_retry_count", step.CompensationRetryCount),
×
NEW
168
                attribute.String("step.idempotency_key", step.IdempotencyKey),
×
NEW
169
                attribute.Int64("instance.id", instance.ID),
×
NEW
170
                attribute.String("instance.workflow_id", instance.WorkflowID),
×
NEW
171
                attribute.String("instance.status", string(instance.Status)),
×
NEW
172
        }
×
NEW
173

×
NEW
174
        if step.Error != nil {
×
NEW
175
                attrs = append(attrs, attribute.String("step.error", *step.Error))
×
NEW
176
        }
×
177

NEW
178
        span.SetAttributes(attrs...)
×
NEW
179

×
NEW
180
        spanKey := fmt.Sprintf("step:%d", step.ID)
×
NEW
181
        p.spans[spanKey] = &spanEntry{
×
NEW
182
                span:      span,
×
NEW
183
                createdAt: time.Now(),
×
NEW
184
                stepType:  step.StepType,
×
NEW
185
        }
×
NEW
186

×
NEW
187
        p.cleanupExpired()
×
NEW
188

×
NEW
189
        return nil
×
190
}
191

192
func (p *TelemetryPlugin) OnStepComplete(
193
        ctx context.Context,
194
        instance *floxy.WorkflowInstance,
195
        step *floxy.WorkflowStep,
NEW
196
) error {
×
NEW
197
        p.mu.Lock()
×
NEW
198
        defer p.mu.Unlock()
×
NEW
199

×
NEW
200
        spanKey := fmt.Sprintf("step:%d", step.ID)
×
NEW
201
        if entry, ok := p.spans[spanKey]; ok {
×
NEW
202
                entry.span.SetAttributes(
×
NEW
203
                        attribute.String("step.status", string(step.Status)),
×
NEW
204
                        attribute.Int("step.retry_count", step.RetryCount),
×
NEW
205
                        attribute.Int("step.compensation_retry_count", step.CompensationRetryCount),
×
NEW
206
                )
×
NEW
207
                entry.span.SetStatus(codes.Ok, "step completed")
×
NEW
208
                entry.span.End()
×
NEW
209
                delete(p.spans, spanKey)
×
NEW
210
        }
×
211

NEW
212
        return nil
×
213
}
214

215
func (p *TelemetryPlugin) OnStepFailed(
216
        ctx context.Context,
217
        instance *floxy.WorkflowInstance,
218
        step *floxy.WorkflowStep,
219
        err error,
NEW
220
) error {
×
NEW
221
        p.mu.Lock()
×
NEW
222
        defer p.mu.Unlock()
×
NEW
223

×
NEW
224
        spanKey := fmt.Sprintf("step:%d", step.ID)
×
NEW
225
        if entry, ok := p.spans[spanKey]; ok {
×
NEW
226
                entry.span.SetAttributes(
×
NEW
227
                        attribute.String("step.status", string(step.Status)),
×
NEW
228
                        attribute.Int("step.retry_count", step.RetryCount),
×
NEW
229
                        attribute.Int("step.compensation_retry_count", step.CompensationRetryCount),
×
NEW
230
                )
×
NEW
231
                if step.Error != nil {
×
NEW
232
                        entry.span.SetAttributes(attribute.String("step.error", *step.Error))
×
NEW
233
                }
×
NEW
234
                if err != nil {
×
NEW
235
                        entry.span.RecordError(err)
×
NEW
236
                }
×
NEW
237
                entry.span.SetStatus(codes.Error, "step failed")
×
NEW
238
                entry.span.End()
×
NEW
239
                delete(p.spans, spanKey)
×
240
        }
241

NEW
242
        return nil
×
243
}
244

245
func (p *TelemetryPlugin) OnRollbackStepChain(
246
        ctx context.Context,
247
        instanceID int64,
248
        stepName string,
249
        depth int,
NEW
250
) error {
×
NEW
251
        p.mu.Lock()
×
NEW
252
        defer p.mu.Unlock()
×
NEW
253

×
NEW
254
        // Use workflow context if available to link spans
×
NEW
255
        rollbackCtx := ctx
×
NEW
256
        if entry, ok := p.workflowCtxs[instanceID]; ok {
×
NEW
257
                rollbackCtx = entry.ctx
×
NEW
258
        }
×
259

NEW
260
        spanName := fmt.Sprintf("rollback.%s", stepName)
×
NEW
261
        _, span := p.tracer.Start(rollbackCtx, spanName, trace.WithSpanKind(trace.SpanKindInternal))
×
NEW
262

×
NEW
263
        span.SetAttributes(
×
NEW
264
                attribute.Int64("instance.id", instanceID),
×
NEW
265
                attribute.String("step.step_name", stepName),
×
NEW
266
                attribute.Int("rollback.depth", depth),
×
NEW
267
        )
×
NEW
268

×
NEW
269
        span.SetStatus(codes.Ok, "rollback completed")
×
NEW
270
        span.End()
×
NEW
271

×
NEW
272
        return nil
×
273
}
274

NEW
275
func (p *TelemetryPlugin) cleanupExpired() {
×
NEW
276
        now := time.Now()
×
NEW
277

×
NEW
278
        // Cleanup expired spans
×
NEW
279
        for key, entry := range p.spans {
×
NEW
280
                ttl := p.defaultTTL
×
NEW
281
                if entry.stepType == floxy.StepTypeHuman {
×
NEW
282
                        ttl = p.humanStepTTL
×
NEW
283
                }
×
284

NEW
285
                if now.Sub(entry.createdAt) > ttl {
×
NEW
286
                        entry.span.SetStatus(codes.Error, "span expired due to TTL")
×
NEW
287
                        entry.span.End()
×
NEW
288
                        delete(p.spans, key)
×
NEW
289
                }
×
290
        }
291

292
        // Cleanup expired workflow contexts
NEW
293
        for instanceID, entry := range p.workflowCtxs {
×
NEW
294
                if now.Sub(entry.createdAt) > p.defaultTTL {
×
NEW
295
                        delete(p.workflowCtxs, instanceID)
×
NEW
296
                }
×
297
        }
298
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc