• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

opendefensecloud / artifact-conduit / 25494842882

07 May 2026 12:08PM UTC coverage: 84.698% (-0.5%) from 85.237%
25494842882

push

github

web-flow
chore: upgrade to Argo Workflows v4 (#337)

## What
Closes #240 

## Why
Renovate PR:
https://github.com/opendefensecloud/artifact-conduit/pull/235

## Testing
`make test-e2e`

## Checklist
- [x] ~~Tests added/updated~~ ->n/a
- [x] No breaking changes (or upgrade path documented above)
- [x] Readable commit history (squashed and cleaned up as desired)
- [x] AI code review considered and comments resolved


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **New Features**
* Added TTL (time-to-live) lifecycle management with `ttlAfterFinished`
and `ttlAfterFailed` configuration options.
  * Added secret reference fields for enhanced credential management.
  * Enhanced cron scheduling with multiple schedule support.
  * Improved workflow status tracking with succeeded and failed counts.

* **Chores**
  * Updated Argo Workflows dependency to v4.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

786 of 928 relevant lines covered (84.7%)

585.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.85
/pkg/controller/artifactworkflow_controller.go
1
// Copyright 2025 BWI GmbH and Artefact Conduit contributors
2
// SPDX-License-Identifier: Apache-2.0
3

4
package controller
5

6
import (
7
        "bytes"
8
        "context"
9
        "fmt"
10
        "io"
11
        "slices"
12

13
        wfv1alpha1 "github.com/argoproj/argo-workflows/v4/pkg/apis/workflow/v1alpha1"
14
        "github.com/go-logr/logr"
15
        "github.com/jastBytes/sprint"
16
        corev1 "k8s.io/api/core/v1"
17
        apierrors "k8s.io/apimachinery/pkg/api/errors"
18
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
        "k8s.io/apimachinery/pkg/runtime"
20
        "k8s.io/apimachinery/pkg/types"
21
        "k8s.io/client-go/kubernetes"
22
        "k8s.io/client-go/tools/events"
23
        ctrl "sigs.k8s.io/controller-runtime"
24
        "sigs.k8s.io/controller-runtime/pkg/builder"
25
        "sigs.k8s.io/controller-runtime/pkg/client"
26
        "sigs.k8s.io/controller-runtime/pkg/handler"
27
        "sigs.k8s.io/controller-runtime/pkg/predicate"
28
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
29

30
        arcv1alpha1 "go.opendefense.cloud/arc/api/arc/v1alpha1"
31
)
32

33
const (
34
        artifactWorkflowFinalizer = "arc.opendefense.cloud/artifact-workflow-finalizer"
35
)
36

37
// ArtifactWorkflowReconciler reconciles a ArtifactWorkflow object
38
type ArtifactWorkflowReconciler struct {
39
        client.Client
40
        ClientSet kubernetes.Interface
41
        Scheme    *runtime.Scheme
42
        Recorder  events.EventRecorder
43
}
44

45
//+kubebuilder:rbac:groups=arc.opendefense.cloud,resources=clusterartifacttypes,verbs=get;list;watch
46
//+kubebuilder:rbac:groups=arc.opendefense.cloud,resources=artifactworkflows/status,verbs=get;update;patch
47
//+kubebuilder:rbac:groups=arc.opendefense.cloud,resources=artifactworkflows/finalizers,verbs=update
48
//+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete
49
//+kubebuilder:rbac:groups=argoproj.io,resources=workflows,verbs=get;list;watch;create;update;patch;delete
50
//+kubebuilder:rbac:groups=argoproj.io,resources=cronworkflows,verbs=get;list;watch;create;update;patch;delete
51
//+kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
52
//+kubebuilder:rbac:groups="",resources=pods;pods/log,verbs=get;list
53

54
// Reconcile moves the current state of the cluster closer to the desired state
55
func (r *ArtifactWorkflowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
1,477✔
56
        log := ctrl.LoggerFrom(ctx)
1,477✔
57
        ctrlResult := ctrl.Result{}
1,477✔
58

1,477✔
59
        aw := &arcv1alpha1.ArtifactWorkflow{}
1,477✔
60
        if err := r.Get(ctx, req.NamespacedName, aw); err != nil {
1,484✔
61
                if apierrors.IsNotFound(err) {
14✔
62
                        // Object not found, return.
7✔
63
                        return ctrlResult, nil
7✔
64
                }
7✔
65

66
                return ctrlResult, errLogAndWrap(log, err, "failed to get object")
×
67
        }
68

69
        // Two different behaviors are implemented depending on whether we have
70
        // a "one-shot" order or cron order, so let's instantiate the corresponding handler:
71
        var handler WorkflowHandler
1,470✔
72
        if aw.Spec.Cron != nil {
1,495✔
73
                handler = NewCronWorkflowHandler(r, log, aw)
25✔
74
        } else {
1,470✔
75
                handler = NewSingleWorkflowHandler(r, log, aw)
1,445✔
76
        }
1,445✔
77

78
        // Update last reconcile time
79
        aw.Status.LastReconcileAt = metav1.Now()
1,470✔
80

1,470✔
81
        // Handle deletion
1,470✔
82
        if !aw.DeletionTimestamp.IsZero() {
1,478✔
83
                log.V(1).Info("ArtifactWorkflow is being deleted")
8✔
84
                // Cleanup workflow, if exists
8✔
85
                if err := handler.DeleteArgoResources(ctx); err != nil {
8✔
86
                        return ctrlResult, errLogAndWrap(log, err, "workflow deletion failed")
×
87
                }
×
88

89
                // Remove finalizer
90
                if slices.Contains(aw.Finalizers, artifactWorkflowFinalizer) {
16✔
91
                        log.V(1).Info("Removing finalizer from ArtifactWorkflow")
8✔
92
                        aw.Finalizers = slices.DeleteFunc(aw.Finalizers, func(f string) bool {
16✔
93
                                return f == artifactWorkflowFinalizer
8✔
94
                        })
8✔
95
                        if err := r.Update(ctx, aw); err != nil {
9✔
96
                                return ctrlResult, errLogAndWrap(log, err, "failed to remove finalizer")
1✔
97
                        }
1✔
98
                }
99

100
                return ctrlResult, nil
7✔
101
        }
102

103
        // Add finalizer if not present and not deleting
104
        if aw.DeletionTimestamp.IsZero() {
2,924✔
105
                if !slices.Contains(aw.Finalizers, artifactWorkflowFinalizer) {
1,493✔
106
                        log.V(1).Info("Adding finalizer to ArtifactWorkflow")
31✔
107
                        aw.Finalizers = append(aw.Finalizers, artifactWorkflowFinalizer)
31✔
108
                        if err := r.Update(ctx, aw); err != nil {
31✔
109
                                return ctrlResult, errLogAndWrap(log, err, "failed to add finalizer")
×
110
                        }
×
111
                        // Return without requeue; the Update event will trigger reconciliation again
112
                        return ctrlResult, nil
31✔
113
                }
114
        }
115

116
        // Handle force reconcile annotation
117
        forceAt, err := GetForceAtAnnotationValue(aw)
1,431✔
118
        if err != nil {
1,431✔
119
                log.V(1).Error(err, "Invalid force reconcile annotation, ignoring")
×
120
        }
×
121
        if !forceAt.IsZero() && (aw.Status.LastForceAt.IsZero() || forceAt.After(aw.Status.LastForceAt.Time)) {
1,432✔
122
                log.V(1).Info("Force reconcile requested")
1✔
123
                r.Recorder.Eventf(aw, nil, corev1.EventTypeNormal, "ForceReconcile", "Reconcile", "Force reconcile requested via annotation")
1✔
124
                // Delete existing workflow, if any
1✔
125
                if err := handler.DeleteArgoResources(ctx); err != nil {
1✔
126
                        return ctrlResult, errLogAndWrap(log, err, "failed to delete existing workflow for force reconcile")
×
127
                }
×
128
                // Reset phase so workflow gets recreated, and update last force time
129
                aw.Status.Phase = arcv1alpha1.WorkflowUnknown
1✔
130
                aw.Status.LastForceAt = metav1.Now()
1✔
131
                if err := r.Status().Update(ctx, aw); err != nil {
1✔
132
                        return ctrlResult, errLogAndWrap(log, err, "failed to update last force time")
×
133
                }
×
134
                // Return without requeue; the update event will trigger reconciliation again
135
                return ctrlResult, nil
1✔
136
        }
137

138
        if aw.Status.Phase == arcv1alpha1.WorkflowUnknown {
2,241✔
139
                return ctrlResult, handler.CreateArgoResources(ctx)
811✔
140
        }
811✔
141

142
        if aw.Status.Phase.InProgress() || aw.Spec.Cron != nil {
1,232✔
143
                return ctrlResult, handler.CheckArgoResources(ctx)
613✔
144
        }
613✔
145

146
        return ctrlResult, nil
6✔
147
}
148

149
func (r *ArtifactWorkflowReconciler) setStatusFromWorkflow(ctx context.Context, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow, wf *wfv1alpha1.Workflow) bool {
603✔
150
        if aw.Status.Phase == arcv1alpha1.WorkflowPhase(wf.Status.Phase) {
609✔
151
                return false // nothing updated
6✔
152
        }
6✔
153
        aw.Status.Phase = arcv1alpha1.WorkflowPhase(wf.Status.Phase)
597✔
154

597✔
155
        switch aw.Status.Phase {
597✔
156
        case arcv1alpha1.WorkflowSucceeded, arcv1alpha1.WorkflowStopped:
7✔
157
                aw.Status.CompletionTime = metav1.Now()
7✔
158
        case arcv1alpha1.WorkflowError, arcv1alpha1.WorkflowFailed:
7✔
159
                aw.Status.Message = wf.Status.Message
7✔
160
                aw.Status.FailureTime = metav1.Now()
7✔
161
                r.generateWorkflowStatusMessage(ctx, wf, log, aw)
7✔
162
        default:
583✔
163
        }
164

165
        return true
597✔
166
}
167

168
func (r *ArtifactWorkflowReconciler) generateWorkflowStatusMessage(ctx context.Context, wf *wfv1alpha1.Workflow, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) {
7✔
169
        failedNodes := []struct {
7✔
170
                Name    string
7✔
171
                Pod     string
7✔
172
                Message string
7✔
173
        }{}
7✔
174
        for _, node := range wf.Status.Nodes {
10✔
175
                if (node.Phase == wfv1alpha1.NodeFailed || node.Phase == wfv1alpha1.NodeError) && node.Type == wfv1alpha1.NodeTypePod {
5✔
176
                        nr := struct {
2✔
177
                                Name    string
2✔
178
                                Pod     string
2✔
179
                                Message string
2✔
180
                        }{
2✔
181
                                Name:    node.DisplayName,
2✔
182
                                Pod:     generatePodNameFromNodeStatus(node),
2✔
183
                                Message: node.Message,
2✔
184
                        }
2✔
185
                        failedNodes = append(failedNodes, nr)
2✔
186
                }
2✔
187
        }
188

189
        for _, nr := range failedNodes {
9✔
190
                logs, err := r.fetchPodLogs(ctx, aw.Namespace, nr.Pod)
2✔
191
                if err != nil {
2✔
192
                        log.V(1).Info("failed to fetch pod logs", "pod", nr.Pod, "error", err)
×
193
                        aw.Status.Message += fmt.Sprintf("Step '%s' failed:\n%s\n\n", nr.Name, nr.Message)
×
194

×
195
                        continue
×
196
                }
197
                aw.Status.Message += fmt.Sprintf("Step '%s' failed:\n%s\nLogs:\n%s\n\n", nr.Name, nr.Message, logs)
2✔
198
        }
199
}
200

201
func (r *ArtifactWorkflowReconciler) fetchPodLogs(ctx context.Context, namespace, podName string) (string, error) {
2✔
202
        podLogOptions := corev1.PodLogOptions{
2✔
203
                Container: "main", // Assuming the main container
2✔
204
                Follow:    false,
2✔
205
                TailLines: new(int64(30)), // Fetch last 30 lines
2✔
206
        }
2✔
207
        req := r.ClientSet.CoreV1().Pods(namespace).GetLogs(podName, &podLogOptions)
2✔
208
        podLogs, err := req.Stream(ctx)
2✔
209
        if err != nil {
2✔
210
                return "", err
×
211
        }
×
212
        defer sprint.PanicOnErrorFunc(podLogs.Close) // Close the stream when done
2✔
213

2✔
214
        buf := new(bytes.Buffer)
2✔
215
        _, err = io.Copy(buf, podLogs)
2✔
216
        if err != nil {
2✔
217
                return "", err
×
218
        }
×
219

220
        return buf.String(), nil
2✔
221
}
222

223
func (r *ArtifactWorkflowReconciler) retrieveSecrets(ctx context.Context, aw *arcv1alpha1.ArtifactWorkflow) (*corev1.Secret, *corev1.Secret, error) {
811✔
224
        srcSecret := corev1.Secret{}
811✔
225
        if aw.Spec.SrcSecretRef.Name != "" {
1,413✔
226
                if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.SrcSecretRef.Name), &srcSecret); err != nil {
602✔
227
                        r.Recorder.Eventf(aw, nil, corev1.EventTypeWarning, "InvalidSecret", "FetchSecret", fmt.Sprintf("Failed to fetch source secret '%s': %v", aw.Spec.SrcSecretRef.Name, err))
×
228
                        return nil, nil, fmt.Errorf("failed to fetch secret for source: %w", err)
×
229
                }
×
230
        }
231

232
        dstSecret := corev1.Secret{}
811✔
233
        if aw.Spec.DstSecretRef.Name != "" {
1,413✔
234
                if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.DstSecretRef.Name), &dstSecret); err != nil {
602✔
235
                        r.Recorder.Eventf(aw, nil, corev1.EventTypeWarning, "InvalidSecret", "FetchSecret", fmt.Sprintf("Failed to fetch destination secret '%s': %v", aw.Spec.DstSecretRef.Name, err))
×
236
                        return nil, nil, fmt.Errorf("failed to fetch secret for destination: %w", err)
×
237
                }
×
238
        }
239

240
        return &srcSecret, &dstSecret, nil
811✔
241
}
242

243
// SetupWithManager sets up the controller with the Manager.
244
func (r *ArtifactWorkflowReconciler) SetupWithManager(mgr ctrl.Manager) error {
1✔
245
        return ctrl.NewControllerManagedBy(mgr).
1✔
246
                For(&arcv1alpha1.ArtifactWorkflow{}).
1✔
247
                Owns(&wfv1alpha1.Workflow{}).
1✔
248
                Owns(&wfv1alpha1.CronWorkflow{}).
1✔
249
                Watches(
1✔
250
                        &wfv1alpha1.Workflow{},
1✔
251
                        handler.EnqueueRequestsFromMapFunc(r.findArtifactWorkflowsForWorkflowOwnedByCronWorkflow()),
1✔
252
                        builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
1✔
253
                ).
1✔
254
                Complete(r)
1✔
255
}
1✔
256

257
// findArtifactWorkflowsForWorkflowOwnedByCronWorkflow returns a function that finds ArtifactWorkflows
258
// that own CronWorkflows that own the given Workflow. This handles the ownership chain:
259
// ArtifactWorkflow -> CronWorkflow -> Workflow
260
func (r *ArtifactWorkflowReconciler) findArtifactWorkflowsForWorkflowOwnedByCronWorkflow() func(context.Context, client.Object) []ctrl.Request {
1✔
261
        return func(ctx context.Context, wf client.Object) []ctrl.Request {
64✔
262
                // Find the CronWorkflow that owns this Workflow
63✔
263
                ownerReferences := wf.GetOwnerReferences()
63✔
264
                var cronWorkflowName, cronWorkflowNamespace string
63✔
265

63✔
266
                for _, owner := range ownerReferences {
126✔
267
                        if owner.Kind == "CronWorkflow" && owner.APIVersion == wfv1alpha1.SchemeGroupVersion.String() {
70✔
268
                                cronWorkflowName = owner.Name
7✔
269
                                cronWorkflowNamespace = wf.GetNamespace() // Owner is in same namespace
7✔
270

7✔
271
                                break
7✔
272
                        }
273
                }
274

275
                if cronWorkflowName == "" {
119✔
276
                        // Workflow is not owned by a CronWorkflow
56✔
277
                        return []reconcile.Request{}
56✔
278
                }
56✔
279

280
                // Find the ArtifactWorkflow that owns this CronWorkflow
281
                cwf := &wfv1alpha1.CronWorkflow{}
7✔
282
                if err := r.Get(ctx, types.NamespacedName{Name: cronWorkflowName, Namespace: cronWorkflowNamespace}, cwf); err != nil {
7✔
283
                        // CronWorkflow not found or error occurred
×
284
                        return []reconcile.Request{}
×
285
                }
×
286

287
                var artifactWorkflowName string
7✔
288
                for _, owner := range cwf.GetOwnerReferences() {
14✔
289
                        if owner.Kind == "ArtifactWorkflow" && owner.APIVersion == arcv1alpha1.SchemeGroupVersion.String() {
14✔
290
                                artifactWorkflowName = owner.Name
7✔
291
                                break
7✔
292
                        }
293
                }
294

295
                if artifactWorkflowName == "" {
7✔
296
                        // CronWorkflow is not owned by an ArtifactWorkflow
×
297
                        return []reconcile.Request{}
×
298
                }
×
299

300
                return []reconcile.Request{
7✔
301
                        {
7✔
302
                                NamespacedName: types.NamespacedName{
7✔
303
                                        Name:      artifactWorkflowName,
7✔
304
                                        Namespace: cronWorkflowNamespace,
7✔
305
                                },
7✔
306
                        },
7✔
307
                }
7✔
308
        }
309
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc