• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

opendefensecloud / artifact-conduit / 20374482880

19 Dec 2025 03:26PM UTC coverage: 86.272% (+0.3%) from 85.983%
20374482880

Pull #159

github

web-flow
Merge c451483b1 into eeb8df568
Pull Request #159: fix missing plugin for docs release

597 of 692 relevant lines covered (86.27%)

856.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.96
/pkg/controller/artifactworkflow_controller.go
1
// Copyright 2025 BWI GmbH and Artefact Conduit contributors
2
// SPDX-License-Identifier: Apache-2.0
3

4
package controller
5

6
import (
7
        "bytes"
8
        "context"
9
        "fmt"
10
        "io"
11
        "slices"
12

13
        wfv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
14
        "github.com/go-logr/logr"
15
        "github.com/jastBytes/sprint"
16
        arcv1alpha1 "go.opendefense.cloud/arc/api/arc/v1alpha1"
17
        corev1 "k8s.io/api/core/v1"
18
        apierrors "k8s.io/apimachinery/pkg/api/errors"
19
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20
        "k8s.io/apimachinery/pkg/runtime"
21
        "k8s.io/client-go/kubernetes"
22
        "k8s.io/client-go/tools/record"
23
        ctrl "sigs.k8s.io/controller-runtime"
24
        "sigs.k8s.io/controller-runtime/pkg/client"
25
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
26
)
27

28
const (
29
        artifactWorkflowFinalizer = "arc.opendefense.cloud/artifact-workflow-finalizer"
30
)
31

32
// ArtifactWorkflowReconciler reconciles a ArtifactWorkflow object
33
type ArtifactWorkflowReconciler struct {
34
        client.Client
35
        ClientSet kubernetes.Interface
36
        Scheme    *runtime.Scheme
37
        Recorder  record.EventRecorder
38
}
39

40
//+kubebuilder:rbac:groups=arc.opendefense.cloud,resources=clusterartifacttypes,verbs=get;list;watch
41
//+kubebuilder:rbac:groups=arc.opendefense.cloud,resources=artifactworkflows/status,verbs=get;update;patch
42
//+kubebuilder:rbac:groups=arc.opendefense.cloud,resources=artifactworkflows/finalizers,verbs=update
43
//+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete
44
//+kubebuilder:rbac:groups=argoproj.io,resources=workflows,verbs=get;list;watch;create;update;patch;delete
45
//+kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
46
//+kubebuilder:rbac:groups="",resources=pods;pods/log,verbs=get;list
47

48
// Reconcile moves the current state of the cluster closer to the desired state
49
func (r *ArtifactWorkflowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
1,799✔
50
        log := ctrl.LoggerFrom(ctx)
1,799✔
51
        ctrlResult := ctrl.Result{}
1,799✔
52

1,799✔
53
        aw := &arcv1alpha1.ArtifactWorkflow{}
1,799✔
54
        if err := r.Get(ctx, req.NamespacedName, aw); err != nil {
1,805✔
55
                if apierrors.IsNotFound(err) {
12✔
56
                        // Object not found, return.
6✔
57
                        return ctrlResult, nil
6✔
58
                }
6✔
59
                return ctrlResult, errLogAndWrap(log, err, "failed to get object")
×
60
        }
61

62
        // Update last reconcile time
63
        aw.Status.LastReconcileAt = metav1.Now()
1,793✔
64

1,793✔
65
        // Handle deletion
1,793✔
66
        if !aw.DeletionTimestamp.IsZero() {
1,799✔
67
                log.V(1).Info("ArtifactWorkflow is being deleted")
6✔
68
                // Cleanup workflow, if exists
6✔
69
                if err := r.deleteArgoWorkflow(ctx, log, aw); err != nil {
6✔
70
                        return ctrlResult, errLogAndWrap(log, err, "workflow deletion failed")
×
71
                }
×
72

73
                // Remove finalizer
74
                if slices.Contains(aw.Finalizers, artifactWorkflowFinalizer) {
12✔
75
                        log.V(1).Info("Removing finalizer from ArtifactWorkflow")
6✔
76
                        aw.Finalizers = slices.DeleteFunc(aw.Finalizers, func(f string) bool {
12✔
77
                                return f == artifactWorkflowFinalizer
6✔
78
                        })
6✔
79
                        if err := r.Update(ctx, aw); err != nil {
7✔
80
                                return ctrlResult, errLogAndWrap(log, err, "failed to remove finalizer")
1✔
81
                        }
1✔
82
                }
83
                return ctrlResult, nil
5✔
84
        }
85

86
        // Add finalizer if not present and not deleting
87
        if aw.DeletionTimestamp.IsZero() {
3,574✔
88
                if !slices.Contains(aw.Finalizers, artifactWorkflowFinalizer) {
1,814✔
89
                        log.V(1).Info("Adding finalizer to ArtifactWorkflow")
27✔
90
                        aw.Finalizers = append(aw.Finalizers, artifactWorkflowFinalizer)
27✔
91
                        if err := r.Update(ctx, aw); err != nil {
27✔
92
                                return ctrlResult, errLogAndWrap(log, err, "failed to add finalizer")
×
93
                        }
×
94
                        // Return without requeue; the Update event will trigger reconciliation again
95
                        return ctrlResult, nil
27✔
96
                }
97
        }
98

99
        // Handle force reconcile annotation
100
        forceAt, err := GetForceAtAnnotationValue(aw)
1,760✔
101
        if err != nil {
1,760✔
102
                log.V(1).Error(err, "Invalid force reconcile annotation, ignoring")
×
103
        }
×
104
        if !forceAt.IsZero() && (aw.Status.LastForceAt.IsZero() || forceAt.After(aw.Status.LastForceAt.Time)) {
1,761✔
105
                log.V(1).Info("Force reconcile requested")
1✔
106
                r.Recorder.Event(aw, corev1.EventTypeNormal, "ForceReconcile", "Force reconcile requested via annotation")
1✔
107
                // Delete existing workflow, if any
1✔
108
                if err := r.deleteArgoWorkflow(ctx, log, aw); err != nil {
1✔
109
                        return ctrlResult, errLogAndWrap(log, err, "failed to delete existing workflow for force reconcile")
×
110
                }
×
111
                // Reset phase so workflow gets recreated, and update last force time
112
                aw.Status.Phase = arcv1alpha1.WorkflowUnknown
1✔
113
                aw.Status.LastForceAt = metav1.Now()
1✔
114
                if err := r.Status().Update(ctx, aw); err != nil {
1✔
115
                        return ctrlResult, errLogAndWrap(log, err, "failed to update last force time")
×
116
                }
×
117
                // Return without requeue; the update event will trigger reconciliation again
118
                return ctrlResult, nil
1✔
119
        }
120

121
        if aw.Status.Phase == arcv1alpha1.WorkflowUnknown {
2,737✔
122
                return ctrlResult, r.createArgoWorkflow(ctx, log, aw)
978✔
123
        }
978✔
124

125
        if aw.Status.Phase.InProgress() {
1,559✔
126
                return ctrlResult, r.checkArgoWorkflow(ctx, log, aw)
778✔
127
        }
778✔
128

129
        return ctrlResult, nil
3✔
130
}
131

132
func (r *ArtifactWorkflowReconciler) deleteArgoWorkflow(ctx context.Context, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) error {
7✔
133
        wf := wfv1alpha1.Workflow{
7✔
134
                ObjectMeta: metav1.ObjectMeta{
7✔
135
                        Namespace: aw.Namespace,
7✔
136
                        Name:      aw.Name,
7✔
137
                },
7✔
138
        }
7✔
139
        if err := r.Delete(ctx, &wf); client.IgnoreNotFound(err) != nil {
7✔
140
                r.Recorder.Event(aw, corev1.EventTypeWarning, "DeletionFailed", fmt.Sprintf("Failed to delete associated workflow '%s': %v", aw.Name, err))
×
141
                return errLogAndWrap(log, err, "workflow deletion failed")
×
142
        }
×
143
        r.Recorder.Event(aw, corev1.EventTypeNormal, "Deleted", fmt.Sprintf("Deleted workflow '%s'", aw.Name))
7✔
144
        return nil
7✔
145
}
146

147
func (r *ArtifactWorkflowReconciler) createArgoWorkflow(ctx context.Context, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) error {
978✔
148
        srcSecret := corev1.Secret{}
978✔
149
        if aw.Spec.SrcSecretRef.Name != "" {
1,602✔
150
                if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.SrcSecretRef.Name), &srcSecret); err != nil {
624✔
151
                        r.Recorder.Event(aw, corev1.EventTypeWarning, "InvalidSecret", fmt.Sprintf("Failed to fetch source secret '%s': %v", aw.Spec.SrcSecretRef.Name, err))
×
152
                        return errLogAndWrap(log, err, "failed to fetch secret for source")
×
153
                }
×
154
        }
155

156
        dstSecret := corev1.Secret{}
978✔
157
        if aw.Spec.DstSecretRef.Name != "" {
1,602✔
158
                if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.DstSecretRef.Name), &dstSecret); err != nil {
624✔
159
                        r.Recorder.Event(aw, corev1.EventTypeWarning, "InvalidSecret", fmt.Sprintf("Failed to fetch destination secret '%s': %v", aw.Spec.DstSecretRef.Name, err))
×
160
                        return errLogAndWrap(log, err, "failed to fetch secret for destination")
×
161
                }
×
162
        }
163

164
        wf := r.hydrateArgoWorkflow(aw, &srcSecret, &dstSecret)
978✔
165

978✔
166
        if err := controllerutil.SetControllerReference(aw, wf, r.Scheme); err != nil {
978✔
167
                return errLogAndWrap(log, err, "failed to set controller reference")
×
168
        }
×
169

170
        if err := r.Create(ctx, wf); client.IgnoreAlreadyExists(err) != nil {
1,177✔
171
                r.Recorder.Event(aw, corev1.EventTypeWarning, "CreationFailed", fmt.Sprintf("Failed to create workflow '%s': %v", wf.Name, err))
199✔
172
                return errLogAndWrap(log, err, "failed to create argo workflow")
199✔
173
        }
199✔
174
        r.Recorder.Event(aw, corev1.EventTypeNormal, "Created", fmt.Sprintf("Created workflow '%s'", wf.Name))
779✔
175

779✔
176
        aw.Status.Phase = arcv1alpha1.WorkflowPending
779✔
177
        if err := r.Status().Update(ctx, aw); err != nil {
783✔
178
                return errLogAndWrap(log, err, "failed to update status")
4✔
179
        }
4✔
180
        return nil
775✔
181
}
182

183
func (r *ArtifactWorkflowReconciler) hydrateArgoWorkflow(aw *arcv1alpha1.ArtifactWorkflow, srcSecret *corev1.Secret, dstSecret *corev1.Secret) *wfv1alpha1.Workflow {
978✔
184
        srcVolume := corev1.Volume{
978✔
185
                Name: "src-secret-vol",
978✔
186
                VolumeSource: corev1.VolumeSource{
978✔
187
                        EmptyDir: &corev1.EmptyDirVolumeSource{},
978✔
188
                },
978✔
189
        }
978✔
190
        if srcSecret.Name != "" {
1,602✔
191
                srcVolume.VolumeSource = corev1.VolumeSource{
624✔
192
                        Secret: &corev1.SecretVolumeSource{
624✔
193
                                SecretName: srcSecret.Name,
624✔
194
                        },
624✔
195
                }
624✔
196
        }
624✔
197

198
        dstVolume := corev1.Volume{
978✔
199
                Name: "dst-secret-vol",
978✔
200
                VolumeSource: corev1.VolumeSource{
978✔
201
                        EmptyDir: &corev1.EmptyDirVolumeSource{},
978✔
202
                },
978✔
203
        }
978✔
204
        if dstSecret.Name != "" {
1,602✔
205
                dstVolume.VolumeSource = corev1.VolumeSource{
624✔
206
                        Secret: &corev1.SecretVolumeSource{
624✔
207
                                SecretName: dstSecret.Name,
624✔
208
                        },
624✔
209
                }
624✔
210
        }
624✔
211

212
        parameters := []wfv1alpha1.Parameter{}
978✔
213
        for _, p := range aw.Spec.Parameters {
5,849✔
214
                parameters = append(parameters, wfv1alpha1.Parameter{
4,871✔
215
                        Name:  p.Name,
4,871✔
216
                        Value: (*wfv1alpha1.AnyString)(&p.Value),
4,871✔
217
                })
4,871✔
218
        }
4,871✔
219

220
        wf := &wfv1alpha1.Workflow{
978✔
221
                ObjectMeta: workflowObjectMeta(aw),
978✔
222
                Spec: wfv1alpha1.WorkflowSpec{
978✔
223
                        WorkflowTemplateRef: &wfv1alpha1.WorkflowTemplateRef{
978✔
224
                                Name:         aw.Spec.WorkflowTemplateRef.Name,
978✔
225
                                ClusterScope: aw.Spec.WorkflowTemplateRef.ClusterScope,
978✔
226
                        },
978✔
227
                        Volumes: []corev1.Volume{
978✔
228
                                srcVolume,
978✔
229
                                dstVolume,
978✔
230
                        },
978✔
231
                        Arguments: wfv1alpha1.Arguments{
978✔
232
                                Parameters: parameters,
978✔
233
                        },
978✔
234
                },
978✔
235
        }
978✔
236

978✔
237
        return wf
978✔
238
}
239

240
func (r *ArtifactWorkflowReconciler) checkArgoWorkflow(ctx context.Context, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) error {
778✔
241
        wf := wfv1alpha1.Workflow{}
778✔
242
        if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Name), &wf); err != nil {
778✔
243
                return errLogAndWrap(log, err, "failed to get workflow")
×
244
        }
×
245
        if aw.Status.Phase == arcv1alpha1.WorkflowPhase(wf.Status.Phase) {
781✔
246
                return nil // nothing updated
3✔
247
        }
3✔
248
        aw.Status.Phase = arcv1alpha1.WorkflowPhase(wf.Status.Phase)
775✔
249

775✔
250
        switch aw.Status.Phase {
775✔
251
        case arcv1alpha1.WorkflowSucceeded:
2✔
252
                aw.Status.CompletionTime = metav1.Now()
2✔
253
        case arcv1alpha1.WorkflowError, arcv1alpha1.WorkflowFailed:
2✔
254
                // If workflow has errored or failed, fetch logs and update status message
2✔
255
                switch aw.Status.Phase {
2✔
256
                case arcv1alpha1.WorkflowFailed:
1✔
257
                        r.generateWorkflowStatusMessage(ctx, wf, log, aw)
1✔
258
                case arcv1alpha1.WorkflowError:
1✔
259
                        // TODO: Properly show why the workflow errored
1✔
260
                        aw.Status.Message = wf.Status.Message
1✔
261
                }
262
        }
263

264
        if err := r.Status().Update(ctx, aw); err != nil {
776✔
265
                return errLogAndWrap(log, err, "failed to update status")
1✔
266
        }
1✔
267

268
        return nil
774✔
269
}
270

271
func (r *ArtifactWorkflowReconciler) generateWorkflowStatusMessage(ctx context.Context, wf wfv1alpha1.Workflow, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) {
1✔
272
        failedNodes := []struct {
1✔
273
                Name    string
1✔
274
                Pod     string
1✔
275
                Message string
1✔
276
        }{}
1✔
277
        for _, node := range wf.Status.Nodes {
4✔
278
                if node.Phase == wfv1alpha1.NodeFailed && node.Type == wfv1alpha1.NodeTypePod {
5✔
279
                        nr := struct {
2✔
280
                                Name    string
2✔
281
                                Pod     string
2✔
282
                                Message string
2✔
283
                        }{
2✔
284
                                Name:    node.DisplayName,
2✔
285
                                Pod:     generatePodNameFromNodeStatus(node),
2✔
286
                                Message: node.Message,
2✔
287
                        }
2✔
288
                        failedNodes = append(failedNodes, nr)
2✔
289
                }
2✔
290
        }
291

292
        for _, nr := range failedNodes {
3✔
293
                logs, err := r.fetchPodLogs(ctx, aw.Namespace, nr.Pod)
2✔
294
                if err != nil {
2✔
295
                        log.V(1).Info("failed to fetch pod logs", "pod", nr.Pod, "error", err)
×
296
                        continue
×
297
                }
298
                aw.Status.Message += fmt.Sprintf("Step '%s' failed:\n%s\nLogs:\n%s\n\n", nr.Name, nr.Message, logs)
2✔
299
        }
300
}
301

302
func (r *ArtifactWorkflowReconciler) fetchPodLogs(ctx context.Context, namespace, podName string) (string, error) {
2✔
303
        podLogOptions := corev1.PodLogOptions{
2✔
304
                Container: "main", // Assuming the main container
2✔
305
                Follow:    false,
2✔
306
                TailLines: sprint.ToPointer(int64(30)), // Fetch last 30 lines
2✔
307
        }
2✔
308
        req := r.ClientSet.CoreV1().Pods(namespace).GetLogs(podName, &podLogOptions)
2✔
309
        podLogs, err := req.Stream(ctx)
2✔
310
        if err != nil {
2✔
311
                return "", err
×
312
        }
×
313
        defer sprint.PanicOnErrorFunc(podLogs.Close) // Close the stream when done
2✔
314

2✔
315
        buf := new(bytes.Buffer)
2✔
316
        _, err = io.Copy(buf, podLogs)
2✔
317
        if err != nil {
2✔
318
                return "", err
×
319
        }
×
320
        return buf.String(), nil
2✔
321
}
322

323
// SetupWithManager sets up the controller with the Manager.
324
func (r *ArtifactWorkflowReconciler) SetupWithManager(mgr ctrl.Manager) error {
1✔
325
        return ctrl.NewControllerManagedBy(mgr).
1✔
326
                For(&arcv1alpha1.ArtifactWorkflow{}).
1✔
327
                Owns(&wfv1alpha1.Workflow{}).
1✔
328
                Complete(r)
1✔
329
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc