• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

opendefensecloud / artifact-conduit / 19698272222

26 Nov 2025 09:06AM UTC coverage: 63.406% (-0.2%) from 63.647%
19698272222

push

github

web-flow
add example to import ocm and scan oci images contained in the component (#90)

Closes #65

525 of 828 relevant lines covered (63.41%)

1010.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.02
/pkg/controller/artifactworkflow_controller.go
1
// Copyright 2025 BWI GmbH and Artefact Conduit contributors
2
// SPDX-License-Identifier: Apache-2.0
3

4
package controller
5

6
import (
7
        "bytes"
8
        "context"
9
        "fmt"
10
        "io"
11
        "slices"
12

13
        wfv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
14
        "github.com/go-logr/logr"
15
        "github.com/jastBytes/sprint"
16
        arcv1alpha1 "go.opendefense.cloud/arc/api/arc/v1alpha1"
17
        corev1 "k8s.io/api/core/v1"
18
        apierrors "k8s.io/apimachinery/pkg/api/errors"
19
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20
        "k8s.io/apimachinery/pkg/runtime"
21
        "k8s.io/client-go/kubernetes"
22
        "k8s.io/client-go/tools/record"
23
        ctrl "sigs.k8s.io/controller-runtime"
24
        "sigs.k8s.io/controller-runtime/pkg/client"
25
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
26
)
27

28
const (
29
        artifactWorkflowFinalizer = "arc.bwi.de/artifact-workflow-finalizer"
30
)
31

32
// ArtifactWorkflowReconciler reconciles a ArtifactWorkflow object
33
type ArtifactWorkflowReconciler struct {
34
        client.Client
35
        ClientSet kubernetes.Interface
36
        Scheme    *runtime.Scheme
37
        Recorder  record.EventRecorder
38
}
39

40
//+kubebuilder:rbac:groups=arc.bwi.de,resources=artifacttypes,verbs=get;list;watch
41
//+kubebuilder:rbac:groups=arc.bwi.de,resources=clusterartifacttypes,verbs=get;list;watch
42
//+kubebuilder:rbac:groups=arc.bwi.de,resources=artifactworkflows/status,verbs=get;update;patch
43
//+kubebuilder:rbac:groups=arc.bwi.de,resources=artifactworkflows/finalizers,verbs=update
44
//+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete
45
//+kubebuilder:rbac:groups=argoproj.io,resources=workflows,verbs=get;list;watch;create;update;patch;delete
46
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
47

48
// Reconcile moves the current state of the cluster closer to the desired state
49
func (r *ArtifactWorkflowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
2,187✔
50
        log := ctrl.LoggerFrom(ctx)
2,187✔
51

2,187✔
52
        aw := &arcv1alpha1.ArtifactWorkflow{}
2,187✔
53
        if err := r.Get(ctx, req.NamespacedName, aw); err != nil {
2,190✔
54
                if apierrors.IsNotFound(err) {
6✔
55
                        // Object not found, return.
3✔
56
                        return ctrl.Result{}, nil
3✔
57
                }
3✔
58
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to get object")
×
59
        }
60

61
        if !aw.DeletionTimestamp.IsZero() {
2,187✔
62
                log.V(1).Info("ArtifactWorkflow is being deleted")
3✔
63
                // Cleanup workflow, if exists
3✔
64
                wf := wfv1alpha1.Workflow{
3✔
65
                        ObjectMeta: metav1.ObjectMeta{
3✔
66
                                Namespace: aw.Namespace,
3✔
67
                                Name:      aw.Name,
3✔
68
                        },
3✔
69
                }
3✔
70
                if err := r.Delete(ctx, &wf); client.IgnoreNotFound(err) != nil {
3✔
71
                        r.Recorder.Event(aw, corev1.EventTypeWarning, "DeletionFailed", fmt.Sprintf("Failed to delete associated workflow '%s': %v", aw.Name, err))
×
72
                        return ctrl.Result{}, errLogAndWrap(log, err, "workflow deletion failed")
×
73
                }
×
74
                // Remove finalizer
75
                if slices.Contains(aw.Finalizers, artifactWorkflowFinalizer) {
6✔
76
                        log.V(1).Info("Removing finalizer from ArtifactWorkflow")
3✔
77
                        aw.Finalizers = slices.DeleteFunc(aw.Finalizers, func(f string) bool {
6✔
78
                                return f == artifactWorkflowFinalizer
3✔
79
                        })
3✔
80
                        if err := r.Update(ctx, aw); err != nil {
3✔
81
                                return ctrl.Result{}, errLogAndWrap(log, err, "failed to remove finalizer")
×
82
                        }
×
83
                }
84
                return ctrl.Result{}, nil
3✔
85
        }
86

87
        // Add finalizer if not present and not deleting
88
        if aw.DeletionTimestamp.IsZero() {
4,362✔
89
                if !slices.Contains(aw.Finalizers, artifactWorkflowFinalizer) {
2,201✔
90
                        log.V(1).Info("Adding finalizer to ArtifactWorkflow")
20✔
91
                        aw.Finalizers = append(aw.Finalizers, artifactWorkflowFinalizer)
20✔
92
                        if err := r.Update(ctx, aw); err != nil {
20✔
93
                                return ctrl.Result{}, errLogAndWrap(log, err, "failed to add finalizer")
×
94
                        }
×
95
                        // Return without requeue; the Update event will trigger reconciliation again
96
                        return ctrl.Result{}, nil
20✔
97
                }
98
        }
99

100
        if aw.Status.Phase == arcv1alpha1.WorkflowUnknown {
3,312✔
101
                return r.createArgoWorkflow(ctx, log, aw)
1,151✔
102
        }
1,151✔
103

104
        if aw.Status.Phase.InProgress() {
2,018✔
105
                return r.checkArgoWorkflow(ctx, log, aw)
1,008✔
106
        }
1,008✔
107

108
        return ctrl.Result{}, nil
2✔
109
}
110

111
func (r *ArtifactWorkflowReconciler) createArgoWorkflow(ctx context.Context, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) (ctrl.Result, error) {
1,151✔
112
        artifactType := &arcv1alpha1.ArtifactType{}
1,151✔
113
        if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.Type), artifactType); client.IgnoreNotFound(err) != nil {
1,151✔
114
                r.Recorder.Event(aw, corev1.EventTypeWarning, "InvalidArtifactType", fmt.Sprintf("Failed to fetch artifact type '%s': %v", aw.Spec.Type, err))
×
115
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to fetch referenced ArtifactType")
×
116
        }
×
117
        var artifactTypeSpec *arcv1alpha1.ArtifactTypeSpec
1,151✔
118
        if artifactType.Name == "" { // was not found, let's check ClusterArtifactType
2,242✔
119
                clusterArtifactType := &arcv1alpha1.ClusterArtifactType{}
1,091✔
120
                if err := r.Get(ctx, namespacedName("", aw.Spec.Type), clusterArtifactType); err != nil {
1,218✔
121
                        r.Recorder.Event(aw, corev1.EventTypeWarning, "InvalidArtifactType", fmt.Sprintf("Failed to fetch artifact type on cluster scope '%s': %v", aw.Spec.Type, err))
127✔
122
                        return ctrl.Result{}, errLogAndWrap(log, err, "failed to fetch ArtifactType or ClusterArtifactType")
127✔
123
                }
127✔
124
                artifactTypeSpec = &clusterArtifactType.Spec
964✔
125
                // For ClusterArtifactType we only reference ClusterWorkloadTemplates
964✔
126
                artifactTypeSpec.WorkflowTemplateRef.ClusterScope = true
964✔
127
        } else {
60✔
128
                artifactTypeSpec = &artifactType.Spec
60✔
129
        }
60✔
130

131
        srcSecret := corev1.Secret{}
1,024✔
132
        if aw.Spec.SrcSecretRef.Name != "" {
1,835✔
133
                if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.SrcSecretRef.Name), &srcSecret); err != nil {
811✔
134
                        r.Recorder.Event(aw, corev1.EventTypeWarning, "InvalidSecret", fmt.Sprintf("Failed to fetch source secret '%s': %v", aw.Spec.SrcSecretRef.Name, err))
×
135
                        return ctrl.Result{}, errLogAndWrap(log, err, "failed to fetch secret for source")
×
136
                }
×
137
        }
138

139
        dstSecret := corev1.Secret{}
1,024✔
140
        if aw.Spec.DstSecretRef.Name != "" {
1,835✔
141
                if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.DstSecretRef.Name), &dstSecret); err != nil {
811✔
142
                        r.Recorder.Event(aw, corev1.EventTypeWarning, "InvalidSecret", fmt.Sprintf("Failed to fetch destination secret '%s': %v", aw.Spec.DstSecretRef.Name, err))
×
143
                        return ctrl.Result{}, errLogAndWrap(log, err, "failed to fetch secret for destination")
×
144
                }
×
145
        }
146

147
        wf := r.hydrateArgoWorkflow(aw, artifactTypeSpec, &srcSecret, &dstSecret)
1,024✔
148

1,024✔
149
        if err := controllerutil.SetControllerReference(aw, wf, r.Scheme); err != nil {
1,024✔
150
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to set controller reference")
×
151
        }
×
152

153
        if err := r.Create(ctx, wf); client.IgnoreAlreadyExists(err) != nil {
1,042✔
154
                r.Recorder.Event(aw, corev1.EventTypeWarning, "CreationFailed", fmt.Sprintf("Failed to create workflow '%s': %v", wf.Name, err))
18✔
155
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to create argo workflow")
18✔
156
        }
18✔
157
        r.Recorder.Event(aw, corev1.EventTypeNormal, "Created", fmt.Sprintf("Created workflow '%s'", wf.Name))
1,006✔
158

1,006✔
159
        aw.Status.Phase = arcv1alpha1.WorkflowPending
1,006✔
160
        if err := r.Status().Update(ctx, aw); err != nil {
1,006✔
161
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to update status")
×
162
        }
×
163
        return ctrl.Result{}, nil
1,006✔
164
}
165

166
func (r *ArtifactWorkflowReconciler) hydrateArgoWorkflow(aw *arcv1alpha1.ArtifactWorkflow, artifactTypeSpec *arcv1alpha1.ArtifactTypeSpec, srcSecret *corev1.Secret, dstSecret *corev1.Secret) *wfv1alpha1.Workflow {
1,024✔
167
        srcVolume := corev1.Volume{
1,024✔
168
                Name: "src-secret-vol",
1,024✔
169
                VolumeSource: corev1.VolumeSource{
1,024✔
170
                        EmptyDir: &corev1.EmptyDirVolumeSource{},
1,024✔
171
                },
1,024✔
172
        }
1,024✔
173
        if srcSecret.Name != "" {
1,835✔
174
                srcVolume.VolumeSource = corev1.VolumeSource{
811✔
175
                        Secret: &corev1.SecretVolumeSource{
811✔
176
                                SecretName: srcSecret.Name,
811✔
177
                        },
811✔
178
                }
811✔
179
        }
811✔
180

181
        dstVolume := corev1.Volume{
1,024✔
182
                Name: "dst-secret-vol",
1,024✔
183
                VolumeSource: corev1.VolumeSource{
1,024✔
184
                        EmptyDir: &corev1.EmptyDirVolumeSource{},
1,024✔
185
                },
1,024✔
186
        }
1,024✔
187
        if dstSecret.Name != "" {
1,835✔
188
                dstVolume.VolumeSource = corev1.VolumeSource{
811✔
189
                        Secret: &corev1.SecretVolumeSource{
811✔
190
                                SecretName: dstSecret.Name,
811✔
191
                        },
811✔
192
                }
811✔
193
        }
811✔
194

195
        parameters := []wfv1alpha1.Parameter{}
1,024✔
196
        for _, p := range aw.Spec.Parameters {
6,498✔
197
                parameters = append(parameters, wfv1alpha1.Parameter{
5,474✔
198
                        Name:  p.Name,
5,474✔
199
                        Value: (*wfv1alpha1.AnyString)(&p.Value),
5,474✔
200
                })
5,474✔
201
        }
5,474✔
202
        for _, p := range artifactTypeSpec.Parameters {
1,988✔
203
                parameters = append(parameters, wfv1alpha1.Parameter{
964✔
204
                        Name:  p.Name,
964✔
205
                        Value: (*wfv1alpha1.AnyString)(&p.Value),
964✔
206
                })
964✔
207
        }
964✔
208

209
        wf := &wfv1alpha1.Workflow{
1,024✔
210
                ObjectMeta: metav1.ObjectMeta{
1,024✔
211
                        Name:      aw.Name,
1,024✔
212
                        Namespace: aw.Namespace,
1,024✔
213
                },
1,024✔
214
                Spec: wfv1alpha1.WorkflowSpec{
1,024✔
215
                        WorkflowTemplateRef: &wfv1alpha1.WorkflowTemplateRef{
1,024✔
216
                                Name:         artifactTypeSpec.WorkflowTemplateRef.Name,
1,024✔
217
                                ClusterScope: artifactTypeSpec.WorkflowTemplateRef.ClusterScope,
1,024✔
218
                        },
1,024✔
219
                        Volumes: []corev1.Volume{
1,024✔
220
                                srcVolume,
1,024✔
221
                                dstVolume,
1,024✔
222
                        },
1,024✔
223
                        Arguments: wfv1alpha1.Arguments{
1,024✔
224
                                Parameters: parameters,
1,024✔
225
                        },
1,024✔
226
                },
1,024✔
227
        }
1,024✔
228

1,024✔
229
        return wf
1,024✔
230
}
231

232
func (r *ArtifactWorkflowReconciler) checkArgoWorkflow(ctx context.Context, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) (ctrl.Result, error) {
1,008✔
233
        wf := wfv1alpha1.Workflow{}
1,008✔
234
        if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Name), &wf); err != nil {
1,008✔
235
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to get workflow")
×
236
        }
×
237
        if aw.Status.Phase == arcv1alpha1.WorkflowPhase(wf.Status.Phase) {
1,010✔
238
                return ctrl.Result{}, nil // nothing updated
2✔
239
        }
2✔
240
        aw.Status.Phase = arcv1alpha1.WorkflowPhase(wf.Status.Phase)
1,006✔
241

1,006✔
242
        // If workflow has errored or failed, fetch logs and update status message
1,006✔
243
        if (aw.Status.Phase == arcv1alpha1.WorkflowError || aw.Status.Phase == arcv1alpha1.WorkflowFailed) && aw.Status.Message == "" {
1,007✔
244
                switch aw.Status.Phase {
1✔
245
                case arcv1alpha1.WorkflowFailed:
1✔
246
                        r.generateWorkflowStatusMessage(ctx, wf, log, aw)
1✔
247
                case arcv1alpha1.WorkflowError:
×
248
                        // TODO: Properly show why the workflow errored
×
249
                        aw.Status.Message = wf.Status.Message
×
250
                }
251
        }
252

253
        if err := r.Status().Update(ctx, aw); err != nil {
1,006✔
254
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to update status")
×
255
        }
×
256
        return ctrl.Result{}, nil
1,006✔
257
}
258

259
func (r *ArtifactWorkflowReconciler) generateWorkflowStatusMessage(ctx context.Context, wf wfv1alpha1.Workflow, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) {
1✔
260
        failedNodes := []struct {
1✔
261
                Name    string
1✔
262
                Pod     string
1✔
263
                Message string
1✔
264
        }{}
1✔
265
        for _, node := range wf.Status.Nodes {
4✔
266
                if node.Phase == wfv1alpha1.NodeFailed && node.Type == wfv1alpha1.NodeTypePod {
5✔
267
                        nr := struct {
2✔
268
                                Name    string
2✔
269
                                Pod     string
2✔
270
                                Message string
2✔
271
                        }{
2✔
272
                                Name:    node.DisplayName,
2✔
273
                                Pod:     generatePodNameFromNodeStatus(node),
2✔
274
                                Message: node.Message,
2✔
275
                        }
2✔
276
                        failedNodes = append(failedNodes, nr)
2✔
277
                }
2✔
278
        }
279

280
        for _, nr := range failedNodes {
3✔
281
                logs, err := r.fetchPodLogs(ctx, aw.Namespace, nr.Pod)
2✔
282
                if err != nil {
2✔
283
                        log.V(1).Info("failed to fetch pod logs", "pod", nr.Pod, "error", err)
×
284
                        continue
×
285
                }
286
                aw.Status.Message += fmt.Sprintf("Step '%s' failed:\n%s\nLogs:\n%s\n\n", nr.Name, nr.Message, logs)
2✔
287
        }
288
}
289

290
func (r *ArtifactWorkflowReconciler) fetchPodLogs(ctx context.Context, namespace, podName string) (string, error) {
2✔
291
        podLogOptions := corev1.PodLogOptions{
2✔
292
                Container: "main", // Assuming the main container
2✔
293
                Follow:    false,
2✔
294
                TailLines: sprint.ToPointer(int64(30)), // Fetch last 30 lines
2✔
295
        }
2✔
296
        req := r.ClientSet.CoreV1().Pods(namespace).GetLogs(podName, &podLogOptions)
2✔
297
        podLogs, err := req.Stream(ctx)
2✔
298
        if err != nil {
2✔
299
                return "", err
×
300
        }
×
301
        defer sprint.PanicOnErrorFunc(podLogs.Close) // Close the stream when done
2✔
302

2✔
303
        buf := new(bytes.Buffer)
2✔
304
        _, err = io.Copy(buf, podLogs)
2✔
305
        if err != nil {
2✔
306
                return "", err
×
307
        }
×
308
        return buf.String(), nil
2✔
309
}
310

311
// SetupWithManager sets up the controller with the Manager.
312
func (r *ArtifactWorkflowReconciler) SetupWithManager(mgr ctrl.Manager) error {
1✔
313
        return ctrl.NewControllerManagedBy(mgr).
1✔
314
                For(&arcv1alpha1.ArtifactWorkflow{}).
1✔
315
                Owns(&wfv1alpha1.Workflow{}).
1✔
316
                Complete(r)
1✔
317
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc