• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

opendefensecloud / artifact-conduit / 23895603772

02 Apr 2026 10:16AM UTC coverage: 83.836% (-0.9%) from 84.698%
23895603772

push

github

web-flow
Lower minimum cron schedule interval for e2e tests and refactor e2e tests (#258)

* Improve e2e test to fail fast

* Add renovate config for auto updates in example workflows

* Change latest for fixed version tags in example workflows
  Notice that cosign has still latest tag since it's a chainguard free image.

* Fix ocm example

* Add dedicated order resources for e2e tests
  This reduces the possibility of cve's making the tests fail. And
improves performance since only one of each type gets ordered.
  (We can add multiple of one as a dedicated test case)

778 of 928 relevant lines covered (83.84%)

1648.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.81
/pkg/controller/artifactworkflow_controller.go
1
// Copyright 2025 BWI GmbH and Artefact Conduit contributors
2
// SPDX-License-Identifier: Apache-2.0
3

4
package controller
5

6
import (
7
        "bytes"
8
        "context"
9
        "fmt"
10
        "io"
11
        "slices"
12

13
        wfv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
14
        "github.com/go-logr/logr"
15
        "github.com/jastBytes/sprint"
16
        corev1 "k8s.io/api/core/v1"
17
        apierrors "k8s.io/apimachinery/pkg/api/errors"
18
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
        "k8s.io/apimachinery/pkg/runtime"
20
        "k8s.io/apimachinery/pkg/types"
21
        "k8s.io/client-go/kubernetes"
22
        "k8s.io/client-go/tools/events"
23
        ctrl "sigs.k8s.io/controller-runtime"
24
        "sigs.k8s.io/controller-runtime/pkg/builder"
25
        "sigs.k8s.io/controller-runtime/pkg/client"
26
        "sigs.k8s.io/controller-runtime/pkg/handler"
27
        "sigs.k8s.io/controller-runtime/pkg/predicate"
28
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
29

30
        arcv1alpha1 "go.opendefense.cloud/arc/api/arc/v1alpha1"
31
)
32

33
const (
34
        artifactWorkflowFinalizer = "arc.opendefense.cloud/artifact-workflow-finalizer"
35
)
36

37
// ArtifactWorkflowReconciler reconciles a ArtifactWorkflow object
38
type ArtifactWorkflowReconciler struct {
39
        client.Client
40
        ClientSet kubernetes.Interface
41
        Scheme    *runtime.Scheme
42
        Recorder  events.EventRecorder
43
}
44

45
//+kubebuilder:rbac:groups=arc.opendefense.cloud,resources=clusterartifacttypes,verbs=get;list;watch
46
//+kubebuilder:rbac:groups=arc.opendefense.cloud,resources=artifactworkflows/status,verbs=get;update;patch
47
//+kubebuilder:rbac:groups=arc.opendefense.cloud,resources=artifactworkflows/finalizers,verbs=update
48
//+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete
49
//+kubebuilder:rbac:groups=argoproj.io,resources=workflows,verbs=get;list;watch;create;update;patch;delete
50
//+kubebuilder:rbac:groups=argoproj.io,resources=cronworkflows,verbs=get;list;watch;create;update;patch;delete
51
//+kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
52
//+kubebuilder:rbac:groups="",resources=pods;pods/log,verbs=get;list
53

54
// Reconcile moves the current state of the cluster closer to the desired state
55
func (r *ArtifactWorkflowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
3,148✔
56
        log := ctrl.LoggerFrom(ctx)
3,148✔
57
        ctrlResult := ctrl.Result{}
3,148✔
58

3,148✔
59
        aw := &arcv1alpha1.ArtifactWorkflow{}
3,148✔
60
        if err := r.Get(ctx, req.NamespacedName, aw); err != nil {
3,155✔
61
                if apierrors.IsNotFound(err) {
14✔
62
                        // Object not found, return.
7✔
63
                        return ctrlResult, nil
7✔
64
                }
7✔
65

66
                return ctrlResult, errLogAndWrap(log, err, "failed to get object")
×
67
        }
68

69
        // Two different behaviors are implemented depending on whether we have
70
        // a "one-shot" order or cron order, so let's instantiate the corresponding handler:
71
        var handler WorkflowHandler
3,141✔
72
        if aw.Spec.Cron != nil {
3,160✔
73
                handler = NewCronWorkflowHandler(r, log, aw)
19✔
74
        } else {
3,141✔
75
                handler = NewSingleWorkflowHandler(r, log, aw)
3,122✔
76
        }
3,122✔
77

78
        // Update last reconcile time
79
        aw.Status.LastReconcileAt = metav1.Now()
3,141✔
80

3,141✔
81
        // Handle deletion
3,141✔
82
        if !aw.DeletionTimestamp.IsZero() {
3,148✔
83
                log.V(1).Info("ArtifactWorkflow is being deleted")
7✔
84
                // Cleanup workflow, if exists
7✔
85
                if err := handler.DeleteArgoResources(ctx); err != nil {
7✔
86
                        return ctrlResult, errLogAndWrap(log, err, "workflow deletion failed")
×
87
                }
×
88

89
                // Remove finalizer
90
                if slices.Contains(aw.Finalizers, artifactWorkflowFinalizer) {
14✔
91
                        log.V(1).Info("Removing finalizer from ArtifactWorkflow")
7✔
92
                        aw.Finalizers = slices.DeleteFunc(aw.Finalizers, func(f string) bool {
14✔
93
                                return f == artifactWorkflowFinalizer
7✔
94
                        })
7✔
95
                        if err := r.Update(ctx, aw); err != nil {
7✔
96
                                return ctrlResult, errLogAndWrap(log, err, "failed to remove finalizer")
×
97
                        }
×
98
                }
99

100
                return ctrlResult, nil
7✔
101
        }
102

103
        // Add finalizer if not present and not deleting
104
        if aw.DeletionTimestamp.IsZero() {
6,268✔
105
                if !slices.Contains(aw.Finalizers, artifactWorkflowFinalizer) {
3,165✔
106
                        log.V(1).Info("Adding finalizer to ArtifactWorkflow")
31✔
107
                        aw.Finalizers = append(aw.Finalizers, artifactWorkflowFinalizer)
31✔
108
                        if err := r.Update(ctx, aw); err != nil {
31✔
109
                                return ctrlResult, errLogAndWrap(log, err, "failed to add finalizer")
×
110
                        }
×
111
                        // Return without requeue; the Update event will trigger reconciliation again
112
                        return ctrlResult, nil
31✔
113
                }
114
        }
115

116
        // Handle force reconcile annotation
117
        forceAt, err := GetForceAtAnnotationValue(aw)
3,103✔
118
        if err != nil {
3,103✔
119
                log.V(1).Error(err, "Invalid force reconcile annotation, ignoring")
×
120
        }
×
121
        if !forceAt.IsZero() && (aw.Status.LastForceAt.IsZero() || forceAt.After(aw.Status.LastForceAt.Time)) {
3,104✔
122
                log.V(1).Info("Force reconcile requested")
1✔
123
                r.Recorder.Eventf(aw, nil, corev1.EventTypeNormal, "ForceReconcile", "Reconcile", "Force reconcile requested via annotation")
1✔
124
                // Delete existing workflow, if any
1✔
125
                if err := handler.DeleteArgoResources(ctx); err != nil {
1✔
126
                        return ctrlResult, errLogAndWrap(log, err, "failed to delete existing workflow for force reconcile")
×
127
                }
×
128
                // Reset phase so workflow gets recreated, and update last force time
129
                aw.Status.Phase = arcv1alpha1.WorkflowUnknown
1✔
130
                aw.Status.LastForceAt = metav1.Now()
1✔
131
                if err := r.Status().Update(ctx, aw); err != nil {
1✔
132
                        return ctrlResult, errLogAndWrap(log, err, "failed to update last force time")
×
133
                }
×
134
                // Return without requeue; the update event will trigger reconciliation again
135
                return ctrlResult, nil
1✔
136
        }
137

138
        if aw.Status.Phase == arcv1alpha1.WorkflowUnknown {
4,749✔
139
                return ctrlResult, handler.CreateArgoResources(ctx)
1,647✔
140
        }
1,647✔
141

142
        if aw.Status.Phase.InProgress() || aw.Spec.Cron != nil {
2,904✔
143
                return ctrlResult, handler.CheckArgoResources(ctx)
1,449✔
144
        }
1,449✔
145

146
        return ctrlResult, nil
6✔
147
}
148

149
func (r *ArtifactWorkflowReconciler) setStatusFromWorkflow(ctx context.Context, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow, wf *wfv1alpha1.Workflow) bool {
1,443✔
150
        if aw.Status.Phase == arcv1alpha1.WorkflowPhase(wf.Status.Phase) {
1,447✔
151
                return false // nothing updated
4✔
152
        }
4✔
153
        aw.Status.Phase = arcv1alpha1.WorkflowPhase(wf.Status.Phase)
1,439✔
154

1,439✔
155
        switch aw.Status.Phase {
1,439✔
156
        case arcv1alpha1.WorkflowSucceeded, arcv1alpha1.WorkflowStopped:
6✔
157
                aw.Status.CompletionTime = metav1.Now()
6✔
158
        case arcv1alpha1.WorkflowError, arcv1alpha1.WorkflowFailed:
7✔
159
                aw.Status.Message = wf.Status.Message
7✔
160
                aw.Status.FailureTime = metav1.Now()
7✔
161
                r.generateWorkflowStatusMessage(ctx, wf, log, aw)
7✔
162
        default:
1,426✔
163
        }
164

165
        return true
1,439✔
166
}
167

168
func (r *ArtifactWorkflowReconciler) generateWorkflowStatusMessage(ctx context.Context, wf *wfv1alpha1.Workflow, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) {
7✔
169
        failedNodes := []struct {
7✔
170
                Name    string
7✔
171
                Pod     string
7✔
172
                Message string
7✔
173
        }{}
7✔
174
        for _, node := range wf.Status.Nodes {
10✔
175
                if (node.Phase == wfv1alpha1.NodeFailed || node.Phase == wfv1alpha1.NodeError) && node.Type == wfv1alpha1.NodeTypePod {
5✔
176
                        nr := struct {
2✔
177
                                Name    string
2✔
178
                                Pod     string
2✔
179
                                Message string
2✔
180
                        }{
2✔
181
                                Name:    node.DisplayName,
2✔
182
                                Pod:     generatePodNameFromNodeStatus(node),
2✔
183
                                Message: node.Message,
2✔
184
                        }
2✔
185
                        failedNodes = append(failedNodes, nr)
2✔
186
                }
2✔
187
        }
188

189
        for _, nr := range failedNodes {
9✔
190
                logs, err := r.fetchPodLogs(ctx, aw.Namespace, nr.Pod)
2✔
191
                if err != nil {
2✔
192
                        log.V(1).Info("failed to fetch pod logs", "pod", nr.Pod, "error", err)
×
193
                        aw.Status.Message += fmt.Sprintf("Step '%s' failed:\n%s\n\n", nr.Name, nr.Message)
×
194

×
195
                        continue
×
196
                }
197
                aw.Status.Message += fmt.Sprintf("Step '%s' failed:\n%s\nLogs:\n%s\n\n", nr.Name, nr.Message, logs)
2✔
198
        }
199
}
200

201
func (r *ArtifactWorkflowReconciler) fetchPodLogs(ctx context.Context, namespace, podName string) (string, error) {
2✔
202
        podLogOptions := corev1.PodLogOptions{
2✔
203
                Container: "main", // Assuming the main container
2✔
204
                Follow:    false,
2✔
205
                TailLines: sprint.ToPointer(int64(30)), // Fetch last 30 lines
2✔
206
        }
2✔
207
        req := r.ClientSet.CoreV1().Pods(namespace).GetLogs(podName, &podLogOptions)
2✔
208
        podLogs, err := req.Stream(ctx)
2✔
209
        if err != nil {
2✔
210
                return "", err
×
211
        }
×
212
        defer sprint.PanicOnErrorFunc(podLogs.Close) // Close the stream when done
2✔
213

2✔
214
        buf := new(bytes.Buffer)
2✔
215
        _, err = io.Copy(buf, podLogs)
2✔
216
        if err != nil {
2✔
217
                return "", err
×
218
        }
×
219

220
        return buf.String(), nil
2✔
221
}
222

223
func (r *ArtifactWorkflowReconciler) retrieveSecrets(ctx context.Context, aw *arcv1alpha1.ArtifactWorkflow) (*corev1.Secret, *corev1.Secret, error) {
1,647✔
224
        srcSecret := corev1.Secret{}
1,647✔
225
        if aw.Spec.SrcSecretRef.Name != "" {
3,011✔
226
                if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.SrcSecretRef.Name), &srcSecret); err != nil {
1,364✔
227
                        r.Recorder.Eventf(aw, nil, corev1.EventTypeWarning, "InvalidSecret", "FetchSecret", fmt.Sprintf("Failed to fetch source secret '%s': %v", aw.Spec.SrcSecretRef.Name, err))
×
228
                        return nil, nil, fmt.Errorf("failed to fetch secret for source: %w", err)
×
229
                }
×
230
        }
231

232
        dstSecret := corev1.Secret{}
1,647✔
233
        if aw.Spec.DstSecretRef.Name != "" {
3,011✔
234
                if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.DstSecretRef.Name), &dstSecret); err != nil {
1,364✔
235
                        r.Recorder.Eventf(aw, nil, corev1.EventTypeWarning, "InvalidSecret", "FetchSecret", fmt.Sprintf("Failed to fetch destination secret '%s': %v", aw.Spec.DstSecretRef.Name, err))
×
236
                        return nil, nil, fmt.Errorf("failed to fetch secret for destination: %w", err)
×
237
                }
×
238
        }
239

240
        return &srcSecret, &dstSecret, nil
1,647✔
241
}
242

243
// SetupWithManager sets up the controller with the Manager.
244
func (r *ArtifactWorkflowReconciler) SetupWithManager(mgr ctrl.Manager) error {
1✔
245
        return ctrl.NewControllerManagedBy(mgr).
1✔
246
                For(&arcv1alpha1.ArtifactWorkflow{}).
1✔
247
                Owns(&wfv1alpha1.Workflow{}).
1✔
248
                Owns(&wfv1alpha1.CronWorkflow{}).
1✔
249
                Watches(
1✔
250
                        &wfv1alpha1.Workflow{},
1✔
251
                        handler.EnqueueRequestsFromMapFunc(r.findArtifactWorkflowsForWorkflowOwnedByCronWorkflow()),
1✔
252
                        builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
1✔
253
                ).
1✔
254
                Complete(r)
1✔
255
}
1✔
256

257
// findArtifactWorkflowsForWorkflowOwnedByCronWorkflow returns a function that finds ArtifactWorkflows
258
// that own CronWorkflows that own the given Workflow. This handles the ownership chain:
259
// ArtifactWorkflow -> CronWorkflow -> Workflow
260
func (r *ArtifactWorkflowReconciler) findArtifactWorkflowsForWorkflowOwnedByCronWorkflow() func(context.Context, client.Object) []ctrl.Request {
1✔
261
        return func(ctx context.Context, wf client.Object) []ctrl.Request {
61✔
262
                // Find the CronWorkflow that owns this Workflow
60✔
263
                ownerReferences := wf.GetOwnerReferences()
60✔
264
                var cronWorkflowName, cronWorkflowNamespace string
60✔
265

60✔
266
                for _, owner := range ownerReferences {
120✔
267
                        if owner.Kind == "CronWorkflow" && owner.APIVersion == wfv1alpha1.SchemeGroupVersion.String() {
65✔
268
                                cronWorkflowName = owner.Name
5✔
269
                                cronWorkflowNamespace = wf.GetNamespace() // Owner is in same namespace
5✔
270

5✔
271
                                break
5✔
272
                        }
273
                }
274

275
                if cronWorkflowName == "" {
115✔
276
                        // Workflow is not owned by a CronWorkflow
55✔
277
                        return []reconcile.Request{}
55✔
278
                }
55✔
279

280
                // Find the ArtifactWorkflow that owns this CronWorkflow
281
                cwf := &wfv1alpha1.CronWorkflow{}
5✔
282
                if err := r.Get(ctx, types.NamespacedName{Name: cronWorkflowName, Namespace: cronWorkflowNamespace}, cwf); err != nil {
5✔
283
                        // CronWorkflow not found or error occurred
×
284
                        return []reconcile.Request{}
×
285
                }
×
286

287
                var artifactWorkflowName string
5✔
288
                for _, owner := range cwf.GetOwnerReferences() {
10✔
289
                        if owner.Kind == "ArtifactWorkflow" && owner.APIVersion == arcv1alpha1.SchemeGroupVersion.String() {
10✔
290
                                artifactWorkflowName = owner.Name
5✔
291
                                break
5✔
292
                        }
293
                }
294

295
                if artifactWorkflowName == "" {
5✔
296
                        // CronWorkflow is not owned by an ArtifactWorkflow
×
297
                        return []reconcile.Request{}
×
298
                }
×
299

300
                return []reconcile.Request{
5✔
301
                        {
5✔
302
                                NamespacedName: types.NamespacedName{
5✔
303
                                        Name:      artifactWorkflowName,
5✔
304
                                        Namespace: cronWorkflowNamespace,
5✔
305
                                },
5✔
306
                        },
5✔
307
                }
5✔
308
        }
309
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc