• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

opendefensecloud / artifact-conduit / 19664486947

25 Nov 2025 09:21AM UTC coverage: 63.878% (+0.3%) from 63.565%
19664486947

push

github

web-flow
Feature/cluster artifact type (#80)

Closes #52

44 of 47 new or added lines in 3 files covered. (93.62%)

4 existing lines in 2 files now uncovered.

504 of 789 relevant lines covered (63.88%)

751.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.3
/pkg/controller/artifactworkflow_controller.go
1
// Copyright 2025 BWI GmbH and Artefact Conduit contributors
2
// SPDX-License-Identifier: Apache-2.0
3

4
package controller
5

6
import (
7
        "bytes"
8
        "context"
9
        "fmt"
10
        "io"
11
        "slices"
12

13
        wfv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
14
        "github.com/go-logr/logr"
15
        "github.com/jastBytes/sprint"
16
        arcv1alpha1 "go.opendefense.cloud/arc/api/arc/v1alpha1"
17
        corev1 "k8s.io/api/core/v1"
18
        apierrors "k8s.io/apimachinery/pkg/api/errors"
19
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20
        "k8s.io/apimachinery/pkg/runtime"
21
        "k8s.io/client-go/kubernetes"
22
        ctrl "sigs.k8s.io/controller-runtime"
23
        "sigs.k8s.io/controller-runtime/pkg/client"
24
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
25
)
26

27
const (
28
        artifactWorkflowFinalizer = "arc.bwi.de/artifact-workflow-finalizer"
29
)
30

31
// ArtifactWorkflowReconciler reconciles a ArtifactWorkflow object
32
type ArtifactWorkflowReconciler struct {
33
        client.Client
34
        ClientSet kubernetes.Interface
35
        Scheme    *runtime.Scheme
36
}
37

38
//+kubebuilder:rbac:groups=arc.bwi.de,resources=artifacttypes,verbs=get;list;watch
39
//+kubebuilder:rbac:groups=arc.bwi.de,resources=clusterartifacttypes,verbs=get;list;watch
40
//+kubebuilder:rbac:groups=arc.bwi.de,resources=artifactworkflows/status,verbs=get;update;patch
41
//+kubebuilder:rbac:groups=arc.bwi.de,resources=artifactworkflows/finalizers,verbs=update
42
//+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete
43
//+kubebuilder:rbac:groups=argoproj.io,resources=workflows,verbs=get;list;watch;create;update;patch;delete
44

45
// Reconcile moves the current state of the cluster closer to the desired state
46
func (r *ArtifactWorkflowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
1,846✔
47
        log := ctrl.LoggerFrom(ctx)
1,846✔
48

1,846✔
49
        aw := &arcv1alpha1.ArtifactWorkflow{}
1,846✔
50
        if err := r.Get(ctx, req.NamespacedName, aw); err != nil {
1,849✔
51
                if apierrors.IsNotFound(err) {
6✔
52
                        // Object not found, return.
3✔
53
                        return ctrl.Result{}, nil
3✔
54
                }
3✔
55
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to get object")
×
56
        }
57

58
        if !aw.DeletionTimestamp.IsZero() {
1,846✔
59
                log.V(1).Info("ArtifactWorkflow is being deleted")
3✔
60
                // Cleanup workflow, if exists
3✔
61
                wf := wfv1alpha1.Workflow{
3✔
62
                        ObjectMeta: metav1.ObjectMeta{
3✔
63
                                Namespace: aw.Namespace,
3✔
64
                                Name:      aw.Name,
3✔
65
                        },
3✔
66
                }
3✔
67
                if err := r.Delete(ctx, &wf); client.IgnoreNotFound(err) != nil {
3✔
68
                        return ctrl.Result{}, errLogAndWrap(log, err, "workflow deletion failed")
×
69
                }
×
70
                // Remove finalizer
71
                if slices.Contains(aw.Finalizers, artifactWorkflowFinalizer) {
6✔
72
                        log.V(1).Info("Removing finalizer from ArtifactWorkflow")
3✔
73
                        aw.Finalizers = slices.DeleteFunc(aw.Finalizers, func(f string) bool {
6✔
74
                                return f == artifactWorkflowFinalizer
3✔
75
                        })
3✔
76
                        if err := r.Update(ctx, aw); err != nil {
3✔
77
                                return ctrl.Result{}, errLogAndWrap(log, err, "failed to remove finalizer")
×
78
                        }
×
79
                }
80
                return ctrl.Result{}, nil
3✔
81
        }
82

83
        // Add finalizer if not present and not deleting
84
        if aw.DeletionTimestamp.IsZero() {
3,680✔
85
                if !slices.Contains(aw.Finalizers, artifactWorkflowFinalizer) {
1,860✔
86
                        log.V(1).Info("Adding finalizer to ArtifactWorkflow")
20✔
87
                        aw.Finalizers = append(aw.Finalizers, artifactWorkflowFinalizer)
20✔
88
                        if err := r.Update(ctx, aw); err != nil {
20✔
89
                                return ctrl.Result{}, errLogAndWrap(log, err, "failed to add finalizer")
×
90
                        }
×
91
                        // Return without requeue; the Update event will trigger reconciliation again
92
                        return ctrl.Result{}, nil
20✔
93
                }
94
        }
95

96
        if aw.Status.Phase == arcv1alpha1.WorkflowUnknown {
2,803✔
97
                return r.createArgoWorkflow(ctx, log, aw)
983✔
98
        }
983✔
99

100
        if aw.Status.Phase.InProgress() {
1,672✔
101
                return r.checkArgoWorkflow(ctx, log, aw)
835✔
102
        }
835✔
103

104
        return ctrl.Result{}, nil
2✔
105
}
106

107
func (r *ArtifactWorkflowReconciler) createArgoWorkflow(ctx context.Context, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) (ctrl.Result, error) {
983✔
108
        artifactType := &arcv1alpha1.ArtifactType{}
983✔
109
        if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.Type), artifactType); client.IgnoreNotFound(err) != nil {
983✔
NEW
110
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to fetch referenced ArtifactType")
×
NEW
111
        }
×
112
        var artifactTypeSpec *arcv1alpha1.ArtifactTypeSpec
983✔
113
        if artifactType.Name == "" { // was not found, let's check ClusterArtifactType
1,898✔
114
                clusterArtifactType := &arcv1alpha1.ClusterArtifactType{}
915✔
115
                if err := r.Get(ctx, namespacedName("", aw.Spec.Type), clusterArtifactType); err != nil {
1,035✔
116
                        return ctrl.Result{}, errLogAndWrap(log, err, "failed to fetch ArtifactType or ClusterArtifactType")
120✔
117
                }
120✔
118
                artifactTypeSpec = &clusterArtifactType.Spec
795✔
119
                // For ClusterArtifactType we only reference ClusterWorkloadTemplates
795✔
120
                artifactTypeSpec.WorkflowTemplateRef.ClusterScope = true
795✔
121
        } else {
68✔
122
                artifactTypeSpec = &artifactType.Spec
68✔
123
        }
68✔
124

125
        srcSecret := corev1.Secret{}
863✔
126
        if aw.Spec.SrcSecretRef.Name != "" {
1,461✔
127
                if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.SrcSecretRef.Name), &srcSecret); err != nil {
598✔
128
                        return ctrl.Result{}, errLogAndWrap(log, err, "failed to fetch secret for source")
×
129
                }
×
130
        }
131

132
        dstSecret := corev1.Secret{}
863✔
133
        if aw.Spec.DstSecretRef.Name != "" {
1,461✔
134
                if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.DstSecretRef.Name), &dstSecret); err != nil {
598✔
135
                        return ctrl.Result{}, errLogAndWrap(log, err, "failed to fetch secret for destination")
×
136
                }
×
137
        }
138

139
        wf := r.hydrateArgoWorkflow(aw, artifactTypeSpec, &srcSecret, &dstSecret)
863✔
140

863✔
141
        if err := controllerutil.SetControllerReference(aw, wf, r.Scheme); err != nil {
863✔
142
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to set controller reference")
×
143
        }
×
144

145
        if err := r.Create(ctx, wf); client.IgnoreAlreadyExists(err) != nil {
892✔
146
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to create argo workflow")
29✔
147
        }
29✔
148

149
        aw.Status.Phase = arcv1alpha1.WorkflowPending
834✔
150
        if err := r.Status().Update(ctx, aw); err != nil {
834✔
UNCOV
151
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to update status")
×
UNCOV
152
        }
×
153
        return ctrl.Result{}, nil
834✔
154
}
155

156
func (r *ArtifactWorkflowReconciler) hydrateArgoWorkflow(aw *arcv1alpha1.ArtifactWorkflow, artifactTypeSpec *arcv1alpha1.ArtifactTypeSpec, srcSecret *corev1.Secret, dstSecret *corev1.Secret) *wfv1alpha1.Workflow {
863✔
157
        srcVolume := corev1.Volume{
863✔
158
                Name: "src-secret-vol",
863✔
159
                VolumeSource: corev1.VolumeSource{
863✔
160
                        EmptyDir: &corev1.EmptyDirVolumeSource{},
863✔
161
                },
863✔
162
        }
863✔
163
        if srcSecret.Name != "" {
1,461✔
164
                srcVolume.VolumeSource = corev1.VolumeSource{
598✔
165
                        Secret: &corev1.SecretVolumeSource{
598✔
166
                                SecretName: srcSecret.Name,
598✔
167
                        },
598✔
168
                }
598✔
169
        }
598✔
170

171
        dstVolume := corev1.Volume{
863✔
172
                Name: "dst-secret-vol",
863✔
173
                VolumeSource: corev1.VolumeSource{
863✔
174
                        EmptyDir: &corev1.EmptyDirVolumeSource{},
863✔
175
                },
863✔
176
        }
863✔
177
        if dstSecret.Name != "" {
1,461✔
178
                dstVolume.VolumeSource = corev1.VolumeSource{
598✔
179
                        Secret: &corev1.SecretVolumeSource{
598✔
180
                                SecretName: dstSecret.Name,
598✔
181
                        },
598✔
182
                }
598✔
183
        }
598✔
184

185
        parameters := []wfv1alpha1.Parameter{}
863✔
186
        for _, p := range aw.Spec.Parameters {
4,989✔
187
                parameters = append(parameters, wfv1alpha1.Parameter{
4,126✔
188
                        Name:  p.Name,
4,126✔
189
                        Value: (*wfv1alpha1.AnyString)(&p.Value),
4,126✔
190
                })
4,126✔
191
        }
4,126✔
192
        for _, p := range artifactTypeSpec.Parameters {
1,658✔
193
                parameters = append(parameters, wfv1alpha1.Parameter{
795✔
194
                        Name:  p.Name,
795✔
195
                        Value: (*wfv1alpha1.AnyString)(&p.Value),
795✔
196
                })
795✔
197
        }
795✔
198

199
        wf := &wfv1alpha1.Workflow{
863✔
200
                ObjectMeta: metav1.ObjectMeta{
863✔
201
                        Name:      aw.Name,
863✔
202
                        Namespace: aw.Namespace,
863✔
203
                },
863✔
204
                Spec: wfv1alpha1.WorkflowSpec{
863✔
205
                        WorkflowTemplateRef: &wfv1alpha1.WorkflowTemplateRef{
863✔
206
                                Name:         artifactTypeSpec.WorkflowTemplateRef.Name,
863✔
207
                                ClusterScope: artifactTypeSpec.WorkflowTemplateRef.ClusterScope,
863✔
208
                        },
863✔
209
                        Volumes: []corev1.Volume{
863✔
210
                                srcVolume,
863✔
211
                                dstVolume,
863✔
212
                        },
863✔
213
                        Arguments: wfv1alpha1.Arguments{
863✔
214
                                Parameters: parameters,
863✔
215
                        },
863✔
216
                },
863✔
217
        }
863✔
218

863✔
219
        return wf
863✔
220
}
221

222
func (r *ArtifactWorkflowReconciler) checkArgoWorkflow(ctx context.Context, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) (ctrl.Result, error) {
835✔
223
        wf := wfv1alpha1.Workflow{}
835✔
224
        if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Name), &wf); err != nil {
835✔
225
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to get workflow")
×
226
        }
×
227
        if aw.Status.Phase == arcv1alpha1.WorkflowPhase(wf.Status.Phase) {
837✔
228
                return ctrl.Result{}, nil // nothing updated
2✔
229
        }
2✔
230
        aw.Status.Phase = arcv1alpha1.WorkflowPhase(wf.Status.Phase)
833✔
231

833✔
232
        // If workflow has errored or failed, fetch logs and update status message
833✔
233
        if (aw.Status.Phase == arcv1alpha1.WorkflowError || aw.Status.Phase == arcv1alpha1.WorkflowFailed) && aw.Status.Message == "" {
834✔
234
                switch aw.Status.Phase {
1✔
235
                case arcv1alpha1.WorkflowFailed:
1✔
236
                        r.generateWorkflowStatusMessage(ctx, wf, log, aw)
1✔
237
                case arcv1alpha1.WorkflowError:
×
238
                        // TODO: Properly show why the workflow errored
×
239
                        aw.Status.Message = wf.Status.Message
×
240
                }
241
        }
242

243
        if err := r.Status().Update(ctx, aw); err != nil {
833✔
244
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to update status")
×
245
        }
×
246
        return ctrl.Result{}, nil
833✔
247
}
248

249
func (r *ArtifactWorkflowReconciler) generateWorkflowStatusMessage(ctx context.Context, wf wfv1alpha1.Workflow, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) {
1✔
250
        failedNodes := []struct {
1✔
251
                Name    string
1✔
252
                Pod     string
1✔
253
                Message string
1✔
254
        }{}
1✔
255
        for _, node := range wf.Status.Nodes {
4✔
256
                if node.Phase == wfv1alpha1.NodeFailed && node.Type == wfv1alpha1.NodeTypePod {
5✔
257
                        nr := struct {
2✔
258
                                Name    string
2✔
259
                                Pod     string
2✔
260
                                Message string
2✔
261
                        }{
2✔
262
                                Name:    node.DisplayName,
2✔
263
                                Pod:     generatePodNameFromNodeStatus(node),
2✔
264
                                Message: node.Message,
2✔
265
                        }
2✔
266
                        failedNodes = append(failedNodes, nr)
2✔
267
                }
2✔
268
        }
269

270
        for _, nr := range failedNodes {
3✔
271
                logs, err := r.fetchPodLogs(ctx, aw.Namespace, nr.Pod)
2✔
272
                if err != nil {
2✔
273
                        log.V(1).Info("failed to fetch pod logs", "pod", nr.Pod, "error", err)
×
274
                        continue
×
275
                }
276
                aw.Status.Message += fmt.Sprintf("Step '%s' failed:\n%s\nLogs:\n%s\n\n", nr.Name, nr.Message, logs)
2✔
277
        }
278
}
279

280
func (r *ArtifactWorkflowReconciler) fetchPodLogs(ctx context.Context, namespace, podName string) (string, error) {
2✔
281
        podLogOptions := corev1.PodLogOptions{
2✔
282
                Container: "main", // Assuming the main container
2✔
283
                Follow:    false,
2✔
284
                TailLines: sprint.ToPointer(int64(30)), // Fetch last 30 lines
2✔
285
        }
2✔
286
        req := r.ClientSet.CoreV1().Pods(namespace).GetLogs(podName, &podLogOptions)
2✔
287
        podLogs, err := req.Stream(ctx)
2✔
288
        if err != nil {
2✔
289
                return "", err
×
290
        }
×
291
        defer sprint.PanicOnErrorFunc(podLogs.Close) // Close the stream when done
2✔
292

2✔
293
        buf := new(bytes.Buffer)
2✔
294
        _, err = io.Copy(buf, podLogs)
2✔
295
        if err != nil {
2✔
296
                return "", err
×
297
        }
×
298
        return buf.String(), nil
2✔
299
}
300

301
// SetupWithManager sets up the controller with the Manager.
302
func (r *ArtifactWorkflowReconciler) SetupWithManager(mgr ctrl.Manager) error {
1✔
303
        return ctrl.NewControllerManagedBy(mgr).
1✔
304
                For(&arcv1alpha1.ArtifactWorkflow{}).
1✔
305
                Owns(&wfv1alpha1.Workflow{}).
1✔
306
                Complete(r)
1✔
307
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc