• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

opendefensecloud / artifact-conduit / 19705097266

26 Nov 2025 01:19PM UTC coverage: 60.219% (-1.1%) from 61.352%
19705097266

Pull #94

github

web-flow
Merge dbe584ca1 into f3a6c2f69
Pull Request #94: Feature/87 workflow template ref refactor

38 of 38 new or added lines in 3 files covered. (100.0%)

6 existing lines in 2 files now uncovered.

551 of 915 relevant lines covered (60.22%)

952.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.85
/pkg/controller/artifactworkflow_controller.go
1
// Copyright 2025 BWI GmbH and Artefact Conduit contributors
2
// SPDX-License-Identifier: Apache-2.0
3

4
package controller
5

6
import (
7
        "bytes"
8
        "context"
9
        "fmt"
10
        "io"
11
        "slices"
12

13
        wfv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
14
        "github.com/go-logr/logr"
15
        "github.com/jastBytes/sprint"
16
        arcv1alpha1 "go.opendefense.cloud/arc/api/arc/v1alpha1"
17
        corev1 "k8s.io/api/core/v1"
18
        apierrors "k8s.io/apimachinery/pkg/api/errors"
19
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20
        "k8s.io/apimachinery/pkg/runtime"
21
        "k8s.io/client-go/kubernetes"
22
        "k8s.io/client-go/tools/record"
23
        ctrl "sigs.k8s.io/controller-runtime"
24
        "sigs.k8s.io/controller-runtime/pkg/client"
25
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
26
)
27

28
const (
29
        artifactWorkflowFinalizer = "arc.bwi.de/artifact-workflow-finalizer"
30
)
31

32
// ArtifactWorkflowReconciler reconciles a ArtifactWorkflow object
33
type ArtifactWorkflowReconciler struct {
34
        client.Client
35
        ClientSet kubernetes.Interface
36
        Scheme    *runtime.Scheme
37
        Recorder  record.EventRecorder
38
}
39

40
//+kubebuilder:rbac:groups=arc.bwi.de,resources=clusterartifacttypes,verbs=get;list;watch
41
//+kubebuilder:rbac:groups=arc.bwi.de,resources=artifactworkflows/status,verbs=get;update;patch
42
//+kubebuilder:rbac:groups=arc.bwi.de,resources=artifactworkflows/finalizers,verbs=update
43
//+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete
44
//+kubebuilder:rbac:groups=argoproj.io,resources=workflows,verbs=get;list;watch;create;update;patch;delete
45
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
46

47
// Reconcile moves the current state of the cluster closer to the desired state
48
func (r *ArtifactWorkflowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
2,256✔
49
        log := ctrl.LoggerFrom(ctx)
2,256✔
50

2,256✔
51
        aw := &arcv1alpha1.ArtifactWorkflow{}
2,256✔
52
        if err := r.Get(ctx, req.NamespacedName, aw); err != nil {
2,261✔
53
                if apierrors.IsNotFound(err) {
10✔
54
                        // Object not found, return.
5✔
55
                        return ctrl.Result{}, nil
5✔
56
                }
5✔
57
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to get object")
×
58
        }
59

60
        if !aw.DeletionTimestamp.IsZero() {
2,256✔
61
                log.V(1).Info("ArtifactWorkflow is being deleted")
5✔
62
                // Cleanup workflow, if exists
5✔
63
                wf := wfv1alpha1.Workflow{
5✔
64
                        ObjectMeta: metav1.ObjectMeta{
5✔
65
                                Namespace: aw.Namespace,
5✔
66
                                Name:      aw.Name,
5✔
67
                        },
5✔
68
                }
5✔
69
                if err := r.Delete(ctx, &wf); client.IgnoreNotFound(err) != nil {
5✔
70
                        r.Recorder.Event(aw, corev1.EventTypeWarning, "DeletionFailed", fmt.Sprintf("Failed to delete associated workflow '%s': %v", aw.Name, err))
×
71
                        return ctrl.Result{}, errLogAndWrap(log, err, "workflow deletion failed")
×
72
                }
×
73
                // Remove finalizer
74
                if slices.Contains(aw.Finalizers, artifactWorkflowFinalizer) {
10✔
75
                        log.V(1).Info("Removing finalizer from ArtifactWorkflow")
5✔
76
                        aw.Finalizers = slices.DeleteFunc(aw.Finalizers, func(f string) bool {
10✔
77
                                return f == artifactWorkflowFinalizer
5✔
78
                        })
5✔
79
                        if err := r.Update(ctx, aw); err != nil {
6✔
80
                                return ctrl.Result{}, errLogAndWrap(log, err, "failed to remove finalizer")
1✔
81
                        }
1✔
82
                }
83
                return ctrl.Result{}, nil
4✔
84
        }
85

86
        // Add finalizer if not present and not deleting
87
        if aw.DeletionTimestamp.IsZero() {
4,492✔
88
                if !slices.Contains(aw.Finalizers, artifactWorkflowFinalizer) {
2,268✔
89
                        log.V(1).Info("Adding finalizer to ArtifactWorkflow")
22✔
90
                        aw.Finalizers = append(aw.Finalizers, artifactWorkflowFinalizer)
22✔
91
                        if err := r.Update(ctx, aw); err != nil {
22✔
92
                                return ctrl.Result{}, errLogAndWrap(log, err, "failed to add finalizer")
×
93
                        }
×
94
                        // Return without requeue; the Update event will trigger reconciliation again
95
                        return ctrl.Result{}, nil
22✔
96
                }
97
        }
98

99
        if aw.Status.Phase == arcv1alpha1.WorkflowUnknown {
3,411✔
100
                return r.createArgoWorkflow(ctx, log, aw)
1,187✔
101
        }
1,187✔
102

103
        if aw.Status.Phase.InProgress() {
2,071✔
104
                return r.checkArgoWorkflow(ctx, log, aw)
1,034✔
105
        }
1,034✔
106

107
        return ctrl.Result{}, nil
3✔
108
}
109

110
func (r *ArtifactWorkflowReconciler) createArgoWorkflow(ctx context.Context, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) (ctrl.Result, error) {
1,187✔
111
        srcSecret := corev1.Secret{}
1,187✔
112
        if aw.Spec.SrcSecretRef.Name != "" {
2,120✔
113
                if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.SrcSecretRef.Name), &srcSecret); err != nil {
933✔
114
                        r.Recorder.Event(aw, corev1.EventTypeWarning, "InvalidSecret", fmt.Sprintf("Failed to fetch source secret '%s': %v", aw.Spec.SrcSecretRef.Name, err))
×
115
                        return ctrl.Result{}, errLogAndWrap(log, err, "failed to fetch secret for source")
×
116
                }
×
117
        }
118

119
        dstSecret := corev1.Secret{}
1,187✔
120
        if aw.Spec.DstSecretRef.Name != "" {
2,120✔
121
                if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.DstSecretRef.Name), &dstSecret); err != nil {
933✔
122
                        r.Recorder.Event(aw, corev1.EventTypeWarning, "InvalidSecret", fmt.Sprintf("Failed to fetch destination secret '%s': %v", aw.Spec.DstSecretRef.Name, err))
×
123
                        return ctrl.Result{}, errLogAndWrap(log, err, "failed to fetch secret for destination")
×
124
                }
×
125
        }
126

127
        wf := r.hydrateArgoWorkflow(aw, &srcSecret, &dstSecret)
1,187✔
128

1,187✔
129
        if err := controllerutil.SetControllerReference(aw, wf, r.Scheme); err != nil {
1,187✔
130
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to set controller reference")
×
131
        }
×
132

133
        if err := r.Create(ctx, wf); client.IgnoreAlreadyExists(err) != nil {
1,343✔
134
                r.Recorder.Event(aw, corev1.EventTypeWarning, "CreationFailed", fmt.Sprintf("Failed to create workflow '%s': %v", wf.Name, err))
156✔
135
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to create argo workflow")
156✔
136
        }
156✔
137
        r.Recorder.Event(aw, corev1.EventTypeNormal, "Created", fmt.Sprintf("Created workflow '%s'", wf.Name))
1,031✔
138

1,031✔
139
        aw.Status.Phase = arcv1alpha1.WorkflowPending
1,031✔
140
        if err := r.Status().Update(ctx, aw); err != nil {
1,031✔
UNCOV
141
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to update status")
×
UNCOV
142
        }
×
143
        return ctrl.Result{}, nil
1,031✔
144
}
145

146
func (r *ArtifactWorkflowReconciler) hydrateArgoWorkflow(aw *arcv1alpha1.ArtifactWorkflow, srcSecret *corev1.Secret, dstSecret *corev1.Secret) *wfv1alpha1.Workflow {
1,187✔
147
        srcVolume := corev1.Volume{
1,187✔
148
                Name: "src-secret-vol",
1,187✔
149
                VolumeSource: corev1.VolumeSource{
1,187✔
150
                        EmptyDir: &corev1.EmptyDirVolumeSource{},
1,187✔
151
                },
1,187✔
152
        }
1,187✔
153
        if srcSecret.Name != "" {
2,120✔
154
                srcVolume.VolumeSource = corev1.VolumeSource{
933✔
155
                        Secret: &corev1.SecretVolumeSource{
933✔
156
                                SecretName: srcSecret.Name,
933✔
157
                        },
933✔
158
                }
933✔
159
        }
933✔
160

161
        dstVolume := corev1.Volume{
1,187✔
162
                Name: "dst-secret-vol",
1,187✔
163
                VolumeSource: corev1.VolumeSource{
1,187✔
164
                        EmptyDir: &corev1.EmptyDirVolumeSource{},
1,187✔
165
                },
1,187✔
166
        }
1,187✔
167
        if dstSecret.Name != "" {
2,120✔
168
                dstVolume.VolumeSource = corev1.VolumeSource{
933✔
169
                        Secret: &corev1.SecretVolumeSource{
933✔
170
                                SecretName: dstSecret.Name,
933✔
171
                        },
933✔
172
                }
933✔
173
        }
933✔
174

175
        parameters := []wfv1alpha1.Parameter{}
1,187✔
176
        for _, p := range aw.Spec.Parameters {
8,297✔
177
                parameters = append(parameters, wfv1alpha1.Parameter{
7,110✔
178
                        Name:  p.Name,
7,110✔
179
                        Value: (*wfv1alpha1.AnyString)(&p.Value),
7,110✔
180
                })
7,110✔
181
        }
7,110✔
182

183
        wf := &wfv1alpha1.Workflow{
1,187✔
184
                ObjectMeta: metav1.ObjectMeta{
1,187✔
185
                        Name:      aw.Name,
1,187✔
186
                        Namespace: aw.Namespace,
1,187✔
187
                },
1,187✔
188
                Spec: wfv1alpha1.WorkflowSpec{
1,187✔
189
                        WorkflowTemplateRef: &wfv1alpha1.WorkflowTemplateRef{
1,187✔
190
                                Name:         aw.Spec.WorkflowTemplateRef.Name,
1,187✔
191
                                ClusterScope: aw.Spec.WorkflowTemplateRef.ClusterScope,
1,187✔
192
                        },
1,187✔
193
                        Volumes: []corev1.Volume{
1,187✔
194
                                srcVolume,
1,187✔
195
                                dstVolume,
1,187✔
196
                        },
1,187✔
197
                        Arguments: wfv1alpha1.Arguments{
1,187✔
198
                                Parameters: parameters,
1,187✔
199
                        },
1,187✔
200
                },
1,187✔
201
        }
1,187✔
202

1,187✔
203
        return wf
1,187✔
204
}
205

206
func (r *ArtifactWorkflowReconciler) checkArgoWorkflow(ctx context.Context, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) (ctrl.Result, error) {
1,034✔
207
        wf := wfv1alpha1.Workflow{}
1,034✔
208
        if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Name), &wf); err != nil {
1,034✔
209
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to get workflow")
×
210
        }
×
211
        if aw.Status.Phase == arcv1alpha1.WorkflowPhase(wf.Status.Phase) {
1,036✔
212
                return ctrl.Result{}, nil // nothing updated
2✔
213
        }
2✔
214
        aw.Status.Phase = arcv1alpha1.WorkflowPhase(wf.Status.Phase)
1,032✔
215

1,032✔
216
        if aw.Status.Phase == arcv1alpha1.WorkflowSucceeded {
1,034✔
217
                aw.Status.CompletionTime = metav1.Now()
2✔
218
        }
2✔
219

220
        // If workflow has errored or failed, fetch logs and update status message
221
        if (aw.Status.Phase == arcv1alpha1.WorkflowError || aw.Status.Phase == arcv1alpha1.WorkflowFailed) && aw.Status.Message == "" {
1,033✔
222
                switch aw.Status.Phase {
1✔
223
                case arcv1alpha1.WorkflowFailed:
1✔
224
                        r.generateWorkflowStatusMessage(ctx, wf, log, aw)
1✔
225
                case arcv1alpha1.WorkflowError:
×
226
                        // TODO: Properly show why the workflow errored
×
227
                        aw.Status.Message = wf.Status.Message
×
228
                }
229
        }
230

231
        if err := r.Status().Update(ctx, aw); err != nil {
1,032✔
UNCOV
232
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to update status")
×
UNCOV
233
        }
×
234
        return ctrl.Result{}, nil
1,032✔
235
}
236

237
func (r *ArtifactWorkflowReconciler) generateWorkflowStatusMessage(ctx context.Context, wf wfv1alpha1.Workflow, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) {
1✔
238
        failedNodes := []struct {
1✔
239
                Name    string
1✔
240
                Pod     string
1✔
241
                Message string
1✔
242
        }{}
1✔
243
        for _, node := range wf.Status.Nodes {
4✔
244
                if node.Phase == wfv1alpha1.NodeFailed && node.Type == wfv1alpha1.NodeTypePod {
5✔
245
                        nr := struct {
2✔
246
                                Name    string
2✔
247
                                Pod     string
2✔
248
                                Message string
2✔
249
                        }{
2✔
250
                                Name:    node.DisplayName,
2✔
251
                                Pod:     generatePodNameFromNodeStatus(node),
2✔
252
                                Message: node.Message,
2✔
253
                        }
2✔
254
                        failedNodes = append(failedNodes, nr)
2✔
255
                }
2✔
256
        }
257

258
        for _, nr := range failedNodes {
3✔
259
                logs, err := r.fetchPodLogs(ctx, aw.Namespace, nr.Pod)
2✔
260
                if err != nil {
2✔
261
                        log.V(1).Info("failed to fetch pod logs", "pod", nr.Pod, "error", err)
×
262
                        continue
×
263
                }
264
                aw.Status.Message += fmt.Sprintf("Step '%s' failed:\n%s\nLogs:\n%s\n\n", nr.Name, nr.Message, logs)
2✔
265
        }
266
}
267

268
func (r *ArtifactWorkflowReconciler) fetchPodLogs(ctx context.Context, namespace, podName string) (string, error) {
2✔
269
        podLogOptions := corev1.PodLogOptions{
2✔
270
                Container: "main", // Assuming the main container
2✔
271
                Follow:    false,
2✔
272
                TailLines: sprint.ToPointer(int64(30)), // Fetch last 30 lines
2✔
273
        }
2✔
274
        req := r.ClientSet.CoreV1().Pods(namespace).GetLogs(podName, &podLogOptions)
2✔
275
        podLogs, err := req.Stream(ctx)
2✔
276
        if err != nil {
2✔
277
                return "", err
×
278
        }
×
279
        defer sprint.PanicOnErrorFunc(podLogs.Close) // Close the stream when done
2✔
280

2✔
281
        buf := new(bytes.Buffer)
2✔
282
        _, err = io.Copy(buf, podLogs)
2✔
283
        if err != nil {
2✔
284
                return "", err
×
285
        }
×
286
        return buf.String(), nil
2✔
287
}
288

289
// SetupWithManager sets up the controller with the Manager.
290
func (r *ArtifactWorkflowReconciler) SetupWithManager(mgr ctrl.Manager) error {
1✔
291
        return ctrl.NewControllerManagedBy(mgr).
1✔
292
                For(&arcv1alpha1.ArtifactWorkflow{}).
1✔
293
                Owns(&wfv1alpha1.Workflow{}).
1✔
294
                Complete(r)
1✔
295
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc