• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

opendefensecloud / artifact-conduit / 19627487675

24 Nov 2025 08:10AM UTC coverage: 63.293% (+1.6%) from 61.662%
19627487675

push

github

jastBytes
only update status if workflow is not in progress or phase changed

12 of 17 new or added lines in 1 file covered. (70.59%)

48 existing lines in 2 files now uncovered.

469 of 741 relevant lines covered (63.29%)

496.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.44
/pkg/controller/artifactworkflow_controller.go
1
// Copyright 2025 BWI GmbH and Artefact Conduit contributors
2
// SPDX-License-Identifier: Apache-2.0
3

4
package controller
5

6
import (
7
        "bytes"
8
        "context"
9
        "fmt"
10
        "io"
11
        "slices"
12

13
        wfv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
14
        "github.com/go-logr/logr"
15
        "github.com/jastBytes/sprint"
16
        arcv1alpha1 "go.opendefense.cloud/arc/api/arc/v1alpha1"
17
        corev1 "k8s.io/api/core/v1"
18
        apierrors "k8s.io/apimachinery/pkg/api/errors"
19
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20
        "k8s.io/apimachinery/pkg/runtime"
21
        "k8s.io/client-go/kubernetes"
22
        ctrl "sigs.k8s.io/controller-runtime"
23
        "sigs.k8s.io/controller-runtime/pkg/client"
24
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
25
)
26

27
const (
28
        artifactWorkflowFinalizer = "arc.bwi.de/artifact-workflow-finalizer"
29
)
30

31
// ArtifactWorkflowReconciler reconciles a ArtifactWorkflow object
32
type ArtifactWorkflowReconciler struct {
33
        client.Client
34
        ClientSet kubernetes.Interface
35
        Scheme    *runtime.Scheme
36
}
37

38
//+kubebuilder:rbac:groups=arc.bwi.de,resources=artifacttypes,verbs=get;list;watch
39
//+kubebuilder:rbac:groups=arc.bwi.de,resources=artifactworkflows/status,verbs=get;update;patch
40
//+kubebuilder:rbac:groups=arc.bwi.de,resources=artifactworkflows/finalizers,verbs=update
41
//+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete
42
//+kubebuilder:rbac:groups=argoproj.io,resources=workflows,verbs=get;list;watch;create;update;patch;delete
43

44
// Reconcile moves the current state of the cluster closer to the desired state
45
func (r *ArtifactWorkflowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
1,314✔
46
        log := ctrl.LoggerFrom(ctx)
1,314✔
47

1,314✔
48
        aw := &arcv1alpha1.ArtifactWorkflow{}
1,314✔
49
        if err := r.Get(ctx, req.NamespacedName, aw); err != nil {
1,318✔
50
                if apierrors.IsNotFound(err) {
8✔
51
                        // Object not found, return.
4✔
52
                        return ctrl.Result{}, nil
4✔
53
                }
4✔
UNCOV
54
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to get object")
×
55
        }
56

57
        if !aw.DeletionTimestamp.IsZero() {
1,313✔
58
                log.V(1).Info("ArtifactWorkflow is being deleted")
3✔
59
                // Cleanup workflow, if exists
3✔
60
                wf := wfv1alpha1.Workflow{
3✔
61
                        ObjectMeta: metav1.ObjectMeta{
3✔
62
                                Namespace: aw.Namespace,
3✔
63
                                Name:      aw.Name,
3✔
64
                        },
3✔
65
                }
3✔
66
                if err := r.Delete(ctx, &wf); client.IgnoreNotFound(err) != nil {
3✔
UNCOV
67
                        return ctrl.Result{}, errLogAndWrap(log, err, "workflow deletion failed")
×
UNCOV
68
                }
×
69
                // Remove finalizer
70
                if slices.Contains(aw.Finalizers, artifactWorkflowFinalizer) {
6✔
71
                        log.V(1).Info("Removing finalizer from ArtifactWorkflow")
3✔
72
                        aw.Finalizers = slices.DeleteFunc(aw.Finalizers, func(f string) bool {
6✔
73
                                return f == artifactWorkflowFinalizer
3✔
74
                        })
3✔
75
                        if err := r.Update(ctx, aw); err != nil {
3✔
UNCOV
76
                                return ctrl.Result{}, errLogAndWrap(log, err, "failed to remove finalizer")
×
UNCOV
77
                        }
×
78
                }
79
                return ctrl.Result{}, nil
3✔
80
        }
81

82
        // Add finalizer if not present and not deleting
83
        if aw.DeletionTimestamp.IsZero() {
2,614✔
84
                if !slices.Contains(aw.Finalizers, artifactWorkflowFinalizer) {
1,326✔
85
                        log.V(1).Info("Adding finalizer to ArtifactWorkflow")
19✔
86
                        aw.Finalizers = append(aw.Finalizers, artifactWorkflowFinalizer)
19✔
87
                        if err := r.Update(ctx, aw); err != nil {
19✔
UNCOV
88
                                return ctrl.Result{}, errLogAndWrap(log, err, "failed to add finalizer")
×
UNCOV
89
                        }
×
90
                        // Return without requeue; the Update event will trigger reconciliation again
91
                        return ctrl.Result{}, nil
19✔
92
                }
93
        }
94

95
        if aw.Status.Phase == arcv1alpha1.WorkflowUnknown {
1,997✔
96
                return r.createArgoWorkflow(ctx, log, aw)
709✔
97
        }
709✔
98

99
        if aw.Status.Phase.InProgress() {
1,156✔
100
                return r.checkArgoWorkflow(ctx, log, aw)
577✔
101
        }
577✔
102

103
        return ctrl.Result{}, nil
2✔
104
}
105

106
func (r *ArtifactWorkflowReconciler) createArgoWorkflow(ctx context.Context, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) (ctrl.Result, error) {
709✔
107
        artifactType := arcv1alpha1.ArtifactType{}
709✔
108
        if err := r.Get(ctx, namespacedName("", aw.Spec.Type), &artifactType); err != nil {
833✔
109
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to retrieve artifact type")
124✔
110
        }
124✔
111

112
        srcSecret := corev1.Secret{}
585✔
113
        if aw.Spec.SrcSecretRef.Name != "" {
1,006✔
114
                if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.SrcSecretRef.Name), &srcSecret); err != nil {
421✔
115
                        return ctrl.Result{}, errLogAndWrap(log, err, "failed to fetch secret for source")
×
116
                }
×
117
        }
118

119
        dstSecret := corev1.Secret{}
585✔
120
        if aw.Spec.DstSecretRef.Name != "" {
1,006✔
121
                if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.DstSecretRef.Name), &dstSecret); err != nil {
421✔
122
                        return ctrl.Result{}, errLogAndWrap(log, err, "failed to fetch secret for destination")
×
123
                }
×
124
        }
125

126
        wf := r.hydrateArgoWorkflow(aw, &artifactType, &srcSecret, &dstSecret)
585✔
127

585✔
128
        if err := controllerutil.SetControllerReference(aw, wf, r.Scheme); err != nil {
585✔
129
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to set controller reference")
×
130
        }
×
131

132
        if err := r.Create(ctx, wf); client.IgnoreAlreadyExists(err) != nil {
595✔
133
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to create argo workflow")
10✔
134
        }
10✔
135

136
        aw.Status.Phase = arcv1alpha1.WorkflowPending
575✔
137
        if err := r.Status().Update(ctx, aw); err != nil {
576✔
138
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to update status")
1✔
139
        }
1✔
140
        return ctrl.Result{}, nil
574✔
141
}
142

143
func (r *ArtifactWorkflowReconciler) hydrateArgoWorkflow(aw *arcv1alpha1.ArtifactWorkflow, artifactType *arcv1alpha1.ArtifactType, srcSecret *corev1.Secret, dstSecret *corev1.Secret) *wfv1alpha1.Workflow {
585✔
144
        srcVolume := corev1.Volume{
585✔
145
                Name: "src-secret-vol",
585✔
146
                VolumeSource: corev1.VolumeSource{
585✔
147
                        EmptyDir: &corev1.EmptyDirVolumeSource{},
585✔
148
                },
585✔
149
        }
585✔
150
        if srcSecret.Name != "" {
1,006✔
151
                srcVolume.VolumeSource = corev1.VolumeSource{
421✔
152
                        Secret: &corev1.SecretVolumeSource{
421✔
153
                                SecretName: srcSecret.Name,
421✔
154
                        },
421✔
155
                }
421✔
156
        }
421✔
157

158
        dstVolume := corev1.Volume{
585✔
159
                Name: "dst-secret-vol",
585✔
160
                VolumeSource: corev1.VolumeSource{
585✔
161
                        EmptyDir: &corev1.EmptyDirVolumeSource{},
585✔
162
                },
585✔
163
        }
585✔
164
        if dstSecret.Name != "" {
1,006✔
165
                dstVolume.VolumeSource = corev1.VolumeSource{
421✔
166
                        Secret: &corev1.SecretVolumeSource{
421✔
167
                                SecretName: dstSecret.Name,
421✔
168
                        },
421✔
169
                }
421✔
170
        }
421✔
171

172
        parameters := []wfv1alpha1.Parameter{}
585✔
173
        for _, p := range aw.Spec.Parameters {
3,263✔
174
                parameters = append(parameters, wfv1alpha1.Parameter{
2,678✔
175
                        Name:  p.Name,
2,678✔
176
                        Value: (*wfv1alpha1.AnyString)(&p.Value),
2,678✔
177
                })
2,678✔
178
        }
2,678✔
179
        for _, p := range artifactType.Spec.Parameters {
1,170✔
180
                parameters = append(parameters, wfv1alpha1.Parameter{
585✔
181
                        Name:  p.Name,
585✔
182
                        Value: (*wfv1alpha1.AnyString)(&p.Value),
585✔
183
                })
585✔
184
        }
585✔
185

186
        wf := &wfv1alpha1.Workflow{
585✔
187
                ObjectMeta: metav1.ObjectMeta{
585✔
188
                        Name:      aw.Name,
585✔
189
                        Namespace: aw.Namespace,
585✔
190
                },
585✔
191
                Spec: wfv1alpha1.WorkflowSpec{
585✔
192
                        WorkflowTemplateRef: &wfv1alpha1.WorkflowTemplateRef{
585✔
193
                                Name:         artifactType.Spec.WorkflowTemplateRef.Name,
585✔
194
                                ClusterScope: true,
585✔
195
                        },
585✔
196
                        Volumes: []corev1.Volume{
585✔
197
                                srcVolume,
585✔
198
                                dstVolume,
585✔
199
                        },
585✔
200
                        Arguments: wfv1alpha1.Arguments{
585✔
201
                                Parameters: parameters,
585✔
202
                        },
585✔
203
                },
585✔
204
        }
585✔
205

585✔
206
        return wf
585✔
207
}
208

209
func (r *ArtifactWorkflowReconciler) checkArgoWorkflow(ctx context.Context, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) (ctrl.Result, error) {
577✔
210
        wf := wfv1alpha1.Workflow{}
577✔
211
        if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Name), &wf); err != nil {
577✔
NEW
212
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to get workflow")
×
NEW
213
        }
×
214
        if aw.Status.Phase == arcv1alpha1.WorkflowPhase(wf.Status.Phase) {
579✔
215
                return ctrl.Result{}, nil // nothing updated
2✔
216
        }
2✔
217
        aw.Status.Phase = arcv1alpha1.WorkflowPhase(wf.Status.Phase)
575✔
218

575✔
219
        // If workflow has errored or failed, fetch logs and update status message
575✔
220
        if (aw.Status.Phase == arcv1alpha1.WorkflowError || aw.Status.Phase == arcv1alpha1.WorkflowFailed) && aw.Status.Message == "" {
576✔
221
                switch aw.Status.Phase {
1✔
222
                case arcv1alpha1.WorkflowFailed:
1✔
223
                        r.generateWorkflowStatusMessage(ctx, wf, log, aw)
1✔
NEW
224
                case arcv1alpha1.WorkflowError:
×
UNCOV
225
                        // TODO: Properly show why the workflow errored
×
UNCOV
226
                        aw.Status.Message = wf.Status.Message
×
227
                }
228
        }
229

230
        if err := r.Status().Update(ctx, aw); err != nil {
575✔
NEW
231
                return ctrl.Result{}, errLogAndWrap(log, err, "failed to update status")
×
NEW
232
        }
×
233
        return ctrl.Result{}, nil
575✔
234
}
235

236
func (r *ArtifactWorkflowReconciler) generateWorkflowStatusMessage(ctx context.Context, wf wfv1alpha1.Workflow, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) {
1✔
237
        failedNodes := []struct {
1✔
238
                Name    string
1✔
239
                Pod     string
1✔
240
                Message string
1✔
241
        }{}
1✔
242
        for _, node := range wf.Status.Nodes {
4✔
243
                if node.Phase == wfv1alpha1.NodeFailed && node.Type == wfv1alpha1.NodeTypePod {
5✔
244
                        nr := struct {
2✔
245
                                Name    string
2✔
246
                                Pod     string
2✔
247
                                Message string
2✔
248
                        }{
2✔
249
                                Name:    node.DisplayName,
2✔
250
                                Pod:     generatePodNameFromNodeStatus(node),
2✔
251
                                Message: node.Message,
2✔
252
                        }
2✔
253
                        failedNodes = append(failedNodes, nr)
2✔
254
                }
2✔
255
        }
256

257
        for _, nr := range failedNodes {
3✔
258
                logs, err := r.fetchPodLogs(ctx, aw.Namespace, nr.Pod)
2✔
259
                if err != nil {
2✔
UNCOV
260
                        log.V(1).Info("failed to fetch pod logs", "pod", nr.Pod, "error", err)
×
UNCOV
261
                        continue
×
262
                }
263
                aw.Status.Message += fmt.Sprintf("Step '%s' failed:\n%s\nLogs:\n%s\n\n", nr.Name, nr.Message, logs)
2✔
264
        }
265
}
266

267
func (r *ArtifactWorkflowReconciler) fetchPodLogs(ctx context.Context, namespace, podName string) (string, error) {
2✔
268
        podLogOptions := corev1.PodLogOptions{
2✔
269
                Container: "main", // Assuming the main container
2✔
270
                Follow:    false,
2✔
271
                TailLines: sprint.ToPointer(int64(30)), // Fetch last 30 lines
2✔
272
        }
2✔
273
        req := r.ClientSet.CoreV1().Pods(namespace).GetLogs(podName, &podLogOptions)
2✔
274
        podLogs, err := req.Stream(ctx)
2✔
275
        if err != nil {
2✔
UNCOV
276
                return "", err
×
UNCOV
277
        }
×
278
        defer sprint.PanicOnErrorFunc(podLogs.Close) // Close the stream when done
2✔
279

2✔
280
        buf := new(bytes.Buffer)
2✔
281
        _, err = io.Copy(buf, podLogs)
2✔
282
        if err != nil {
2✔
UNCOV
283
                return "", err
×
UNCOV
284
        }
×
285
        return buf.String(), nil
2✔
286
}
287

288
// SetupWithManager sets up the controller with the Manager.
289
func (r *ArtifactWorkflowReconciler) SetupWithManager(mgr ctrl.Manager) error {
1✔
290
        return ctrl.NewControllerManagedBy(mgr).
1✔
291
                For(&arcv1alpha1.ArtifactWorkflow{}).
1✔
292
                Owns(&wfv1alpha1.Workflow{}).
1✔
293
                Complete(r)
1✔
294
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc