• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

opendefensecloud / artifact-conduit / 19715464629

26 Nov 2025 07:38PM UTC coverage: 61.735% (+1.1%) from 60.656%
19715464629

push

github

web-flow
Add LastReconcileAt and LastForceAt fields with force reconcile functionality (#95)

Introduce LastReconcileAt and LastForceAt fields to track reconciliation
times in artifact and order statuses. Implement force reconcile
functionality for orders and workflows, along with tests to ensure
proper handling of the force reconcile annotation.

Closes #58.

81 of 115 new or added lines in 3 files covered. (70.43%)

3 existing lines in 2 files now uncovered.

605 of 980 relevant lines covered (61.73%)

904.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.91
/pkg/controller/artifactworkflow_controller.go
1
// Copyright 2025 BWI GmbH and Artefact Conduit contributors
2
// SPDX-License-Identifier: Apache-2.0
3

4
package controller
5

6
import (
7
        "bytes"
8
        "context"
9
        "fmt"
10
        "io"
11
        "slices"
12

13
        wfv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
14
        "github.com/go-logr/logr"
15
        "github.com/jastBytes/sprint"
16
        arcv1alpha1 "go.opendefense.cloud/arc/api/arc/v1alpha1"
17
        corev1 "k8s.io/api/core/v1"
18
        apierrors "k8s.io/apimachinery/pkg/api/errors"
19
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20
        "k8s.io/apimachinery/pkg/runtime"
21
        "k8s.io/client-go/kubernetes"
22
        "k8s.io/client-go/tools/record"
23
        ctrl "sigs.k8s.io/controller-runtime"
24
        "sigs.k8s.io/controller-runtime/pkg/client"
25
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
26
)
27

28
const (
29
        artifactWorkflowFinalizer = "arc.bwi.de/artifact-workflow-finalizer"
30
)
31

32
// ArtifactWorkflowReconciler reconciles a ArtifactWorkflow object
33
type ArtifactWorkflowReconciler struct {
34
        client.Client
35
        ClientSet kubernetes.Interface
36
        Scheme    *runtime.Scheme
37
        Recorder  record.EventRecorder
38
}
39

40
//+kubebuilder:rbac:groups=arc.bwi.de,resources=clusterartifacttypes,verbs=get;list;watch
41
//+kubebuilder:rbac:groups=arc.bwi.de,resources=artifactworkflows/status,verbs=get;update;patch
42
//+kubebuilder:rbac:groups=arc.bwi.de,resources=artifactworkflows/finalizers,verbs=update
43
//+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete
44
//+kubebuilder:rbac:groups=argoproj.io,resources=workflows,verbs=get;list;watch;create;update;patch;delete
45
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
46

47
// Reconcile moves the current state of the cluster closer to the desired state
48
func (r *ArtifactWorkflowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
2,435✔
49
        log := ctrl.LoggerFrom(ctx)
2,435✔
50
        ctrlResult := ctrl.Result{}
2,435✔
51

2,435✔
52
        aw := &arcv1alpha1.ArtifactWorkflow{}
2,435✔
53
        if err := r.Get(ctx, req.NamespacedName, aw); err != nil {
2,441✔
54
                if apierrors.IsNotFound(err) {
12✔
55
                        // Object not found, return.
6✔
56
                        return ctrlResult, nil
6✔
57
                }
6✔
NEW
58
                return ctrlResult, errLogAndWrap(log, err, "failed to get object")
×
59
        }
60

61
        // Update last reconcile time
62
        aw.Status.LastReconcileAt = metav1.Now()
2,429✔
63

2,429✔
64
        // Handle deletion
2,429✔
65
        if !aw.DeletionTimestamp.IsZero() {
2,435✔
66
                log.V(1).Info("ArtifactWorkflow is being deleted")
6✔
67
                // Cleanup workflow, if exists
6✔
68
                if err := r.deleteArgoWorkflow(ctx, log, aw); err != nil {
6✔
NEW
69
                        return ctrlResult, errLogAndWrap(log, err, "workflow deletion failed")
×
UNCOV
70
                }
×
71

72
                // Remove finalizer
73
                if slices.Contains(aw.Finalizers, artifactWorkflowFinalizer) {
12✔
74
                        log.V(1).Info("Removing finalizer from ArtifactWorkflow")
6✔
75
                        aw.Finalizers = slices.DeleteFunc(aw.Finalizers, func(f string) bool {
12✔
76
                                return f == artifactWorkflowFinalizer
6✔
77
                        })
6✔
78
                        if err := r.Update(ctx, aw); err != nil {
7✔
79
                                return ctrlResult, errLogAndWrap(log, err, "failed to remove finalizer")
1✔
80
                        }
1✔
81
                }
82
                return ctrlResult, nil
5✔
83
        }
84

85
        // Add finalizer if not present and not deleting
86
        if aw.DeletionTimestamp.IsZero() {
4,846✔
87
                if !slices.Contains(aw.Finalizers, artifactWorkflowFinalizer) {
2,447✔
88
                        log.V(1).Info("Adding finalizer to ArtifactWorkflow")
24✔
89
                        aw.Finalizers = append(aw.Finalizers, artifactWorkflowFinalizer)
24✔
90
                        if err := r.Update(ctx, aw); err != nil {
24✔
NEW
91
                                return ctrlResult, errLogAndWrap(log, err, "failed to add finalizer")
×
92
                        }
×
93
                        // Return without requeue; the Update event will trigger reconciliation again
94
                        return ctrlResult, nil
24✔
95
                }
96
        }
97

98
        // Handle force reconcile annotation
99
        forceAt, err := GetForceAtAnnotationValue(aw)
2,399✔
100
        if err != nil {
2,399✔
NEW
101
                log.V(1).Error(err, "Invalid force reconcile annotation, ignoring")
×
NEW
102
        }
×
103
        if !forceAt.IsZero() && (aw.Status.LastForceAt.IsZero() || forceAt.After(aw.Status.LastForceAt.Time)) {
2,400✔
104
                log.V(1).Info("Force reconcile requested")
1✔
105
                r.Recorder.Event(aw, corev1.EventTypeNormal, "ForceReconcile", "Force reconcile requested via annotation")
1✔
106
                // Delete existing workflow, if any
1✔
107
                if err := r.deleteArgoWorkflow(ctx, log, aw); err != nil {
1✔
NEW
108
                        return ctrlResult, errLogAndWrap(log, err, "failed to delete existing workflow for force reconcile")
×
NEW
109
                }
×
110
                // Update last force time
111
                aw.Status.LastForceAt = metav1.Now()
1✔
112
                if err := r.Status().Update(ctx, aw); err != nil {
1✔
NEW
113
                        return ctrlResult, errLogAndWrap(log, err, "failed to update last force time")
×
NEW
114
                }
×
115
                // Return without requeue; the update event will trigger reconciliation again
116
                return ctrlResult, nil
1✔
117
        }
118

119
        if aw.Status.Phase == arcv1alpha1.WorkflowUnknown {
3,682✔
120
                return ctrlResult, r.createArgoWorkflow(ctx, log, aw)
1,284✔
121
        }
1,284✔
122

123
        if aw.Status.Phase.InProgress() {
2,225✔
124
                return ctrlResult, r.checkArgoWorkflow(ctx, log, aw)
1,111✔
125
        }
1,111✔
126

127
        return ctrlResult, nil
3✔
128
}
129

130
func (r *ArtifactWorkflowReconciler) deleteArgoWorkflow(ctx context.Context, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) error {
7✔
131
        wf := wfv1alpha1.Workflow{
7✔
132
                ObjectMeta: metav1.ObjectMeta{
7✔
133
                        Namespace: aw.Namespace,
7✔
134
                        Name:      aw.Name,
7✔
135
                },
7✔
136
        }
7✔
137
        if err := r.Delete(ctx, &wf); client.IgnoreNotFound(err) != nil {
7✔
NEW
138
                r.Recorder.Event(aw, corev1.EventTypeWarning, "DeletionFailed", fmt.Sprintf("Failed to delete associated workflow '%s': %v", aw.Name, err))
×
NEW
139
                return errLogAndWrap(log, err, "workflow deletion failed")
×
NEW
140
        }
×
141
        r.Recorder.Event(aw, corev1.EventTypeNormal, "Deleted", fmt.Sprintf("Deleted workflow '%s'", aw.Name))
7✔
142
        return nil
7✔
143
}
144

145
func (r *ArtifactWorkflowReconciler) createArgoWorkflow(ctx context.Context, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) error {
1,284✔
146
        srcSecret := corev1.Secret{}
1,284✔
147
        if aw.Spec.SrcSecretRef.Name != "" {
2,160✔
148
                if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.SrcSecretRef.Name), &srcSecret); err != nil {
876✔
149
                        r.Recorder.Event(aw, corev1.EventTypeWarning, "InvalidSecret", fmt.Sprintf("Failed to fetch source secret '%s': %v", aw.Spec.SrcSecretRef.Name, err))
×
NEW
150
                        return errLogAndWrap(log, err, "failed to fetch secret for source")
×
151
                }
×
152
        }
153

154
        dstSecret := corev1.Secret{}
1,284✔
155
        if aw.Spec.DstSecretRef.Name != "" {
2,160✔
156
                if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.DstSecretRef.Name), &dstSecret); err != nil {
876✔
157
                        r.Recorder.Event(aw, corev1.EventTypeWarning, "InvalidSecret", fmt.Sprintf("Failed to fetch destination secret '%s': %v", aw.Spec.DstSecretRef.Name, err))
×
NEW
158
                        return errLogAndWrap(log, err, "failed to fetch secret for destination")
×
159
                }
×
160
        }
161

162
        wf := r.hydrateArgoWorkflow(aw, &srcSecret, &dstSecret)
1,284✔
163

1,284✔
164
        if err := controllerutil.SetControllerReference(aw, wf, r.Scheme); err != nil {
1,284✔
NEW
165
                return errLogAndWrap(log, err, "failed to set controller reference")
×
166
        }
×
167

168
        if err := r.Create(ctx, wf); client.IgnoreAlreadyExists(err) != nil {
1,456✔
169
                r.Recorder.Event(aw, corev1.EventTypeWarning, "CreationFailed", fmt.Sprintf("Failed to create workflow '%s': %v", wf.Name, err))
172✔
170
                return errLogAndWrap(log, err, "failed to create argo workflow")
172✔
171
        }
172✔
172
        r.Recorder.Event(aw, corev1.EventTypeNormal, "Created", fmt.Sprintf("Created workflow '%s'", wf.Name))
1,112✔
173

1,112✔
174
        aw.Status.Phase = arcv1alpha1.WorkflowPending
1,112✔
175
        if err := r.Status().Update(ctx, aw); err != nil {
1,115✔
176
                return errLogAndWrap(log, err, "failed to update status")
3✔
177
        }
3✔
178
        return nil
1,109✔
179
}
180

181
func (r *ArtifactWorkflowReconciler) hydrateArgoWorkflow(aw *arcv1alpha1.ArtifactWorkflow, srcSecret *corev1.Secret, dstSecret *corev1.Secret) *wfv1alpha1.Workflow {
1,284✔
182
        srcVolume := corev1.Volume{
1,284✔
183
                Name: "src-secret-vol",
1,284✔
184
                VolumeSource: corev1.VolumeSource{
1,284✔
185
                        EmptyDir: &corev1.EmptyDirVolumeSource{},
1,284✔
186
                },
1,284✔
187
        }
1,284✔
188
        if srcSecret.Name != "" {
2,160✔
189
                srcVolume.VolumeSource = corev1.VolumeSource{
876✔
190
                        Secret: &corev1.SecretVolumeSource{
876✔
191
                                SecretName: srcSecret.Name,
876✔
192
                        },
876✔
193
                }
876✔
194
        }
876✔
195

196
        dstVolume := corev1.Volume{
1,284✔
197
                Name: "dst-secret-vol",
1,284✔
198
                VolumeSource: corev1.VolumeSource{
1,284✔
199
                        EmptyDir: &corev1.EmptyDirVolumeSource{},
1,284✔
200
                },
1,284✔
201
        }
1,284✔
202
        if dstSecret.Name != "" {
2,160✔
203
                dstVolume.VolumeSource = corev1.VolumeSource{
876✔
204
                        Secret: &corev1.SecretVolumeSource{
876✔
205
                                SecretName: dstSecret.Name,
876✔
206
                        },
876✔
207
                }
876✔
208
        }
876✔
209

210
        parameters := []wfv1alpha1.Parameter{}
1,284✔
211
        for _, p := range aw.Spec.Parameters {
8,152✔
212
                parameters = append(parameters, wfv1alpha1.Parameter{
6,868✔
213
                        Name:  p.Name,
6,868✔
214
                        Value: (*wfv1alpha1.AnyString)(&p.Value),
6,868✔
215
                })
6,868✔
216
        }
6,868✔
217

218
        wf := &wfv1alpha1.Workflow{
1,284✔
219
                ObjectMeta: metav1.ObjectMeta{
1,284✔
220
                        Name:      aw.Name,
1,284✔
221
                        Namespace: aw.Namespace,
1,284✔
222
                },
1,284✔
223
                Spec: wfv1alpha1.WorkflowSpec{
1,284✔
224
                        WorkflowTemplateRef: &wfv1alpha1.WorkflowTemplateRef{
1,284✔
225
                                Name:         aw.Spec.WorkflowTemplateRef.Name,
1,284✔
226
                                ClusterScope: aw.Spec.WorkflowTemplateRef.ClusterScope,
1,284✔
227
                        },
1,284✔
228
                        Volumes: []corev1.Volume{
1,284✔
229
                                srcVolume,
1,284✔
230
                                dstVolume,
1,284✔
231
                        },
1,284✔
232
                        Arguments: wfv1alpha1.Arguments{
1,284✔
233
                                Parameters: parameters,
1,284✔
234
                        },
1,284✔
235
                },
1,284✔
236
        }
1,284✔
237

1,284✔
238
        return wf
1,284✔
239
}
240

241
func (r *ArtifactWorkflowReconciler) checkArgoWorkflow(ctx context.Context, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) error {
1,111✔
242
        wf := wfv1alpha1.Workflow{}
1,111✔
243
        if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Name), &wf); err != nil {
1,111✔
NEW
244
                return errLogAndWrap(log, err, "failed to get workflow")
×
245
        }
×
246
        if aw.Status.Phase == arcv1alpha1.WorkflowPhase(wf.Status.Phase) {
1,113✔
247
                return nil // nothing updated
2✔
248
        }
2✔
249
        aw.Status.Phase = arcv1alpha1.WorkflowPhase(wf.Status.Phase)
1,109✔
250

1,109✔
251
        switch aw.Status.Phase {
1,109✔
252
        case arcv1alpha1.WorkflowSucceeded:
2✔
253
                aw.Status.CompletionTime = metav1.Now()
2✔
254
        case arcv1alpha1.WorkflowError, arcv1alpha1.WorkflowFailed:
1✔
255
                // If workflow has errored or failed, fetch logs and update status message
1✔
256
                switch aw.Status.Phase {
1✔
257
                case arcv1alpha1.WorkflowFailed:
1✔
258
                        r.generateWorkflowStatusMessage(ctx, wf, log, aw)
1✔
259
                case arcv1alpha1.WorkflowError:
×
260
                        // TODO: Properly show why the workflow errored
×
261
                        aw.Status.Message = wf.Status.Message
×
262
                }
263
        }
264

265
        if err := r.Status().Update(ctx, aw); err != nil {
1,109✔
NEW
266
                return errLogAndWrap(log, err, "failed to update status")
×
267
        }
×
268

269
        return nil
1,109✔
270
}
271

272
func (r *ArtifactWorkflowReconciler) generateWorkflowStatusMessage(ctx context.Context, wf wfv1alpha1.Workflow, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) {
1✔
273
        failedNodes := []struct {
1✔
274
                Name    string
1✔
275
                Pod     string
1✔
276
                Message string
1✔
277
        }{}
1✔
278
        for _, node := range wf.Status.Nodes {
4✔
279
                if node.Phase == wfv1alpha1.NodeFailed && node.Type == wfv1alpha1.NodeTypePod {
5✔
280
                        nr := struct {
2✔
281
                                Name    string
2✔
282
                                Pod     string
2✔
283
                                Message string
2✔
284
                        }{
2✔
285
                                Name:    node.DisplayName,
2✔
286
                                Pod:     generatePodNameFromNodeStatus(node),
2✔
287
                                Message: node.Message,
2✔
288
                        }
2✔
289
                        failedNodes = append(failedNodes, nr)
2✔
290
                }
2✔
291
        }
292

293
        for _, nr := range failedNodes {
3✔
294
                logs, err := r.fetchPodLogs(ctx, aw.Namespace, nr.Pod)
2✔
295
                if err != nil {
2✔
296
                        log.V(1).Info("failed to fetch pod logs", "pod", nr.Pod, "error", err)
×
297
                        continue
×
298
                }
299
                aw.Status.Message += fmt.Sprintf("Step '%s' failed:\n%s\nLogs:\n%s\n\n", nr.Name, nr.Message, logs)
2✔
300
        }
301
}
302

303
func (r *ArtifactWorkflowReconciler) fetchPodLogs(ctx context.Context, namespace, podName string) (string, error) {
2✔
304
        podLogOptions := corev1.PodLogOptions{
2✔
305
                Container: "main", // Assuming the main container
2✔
306
                Follow:    false,
2✔
307
                TailLines: sprint.ToPointer(int64(30)), // Fetch last 30 lines
2✔
308
        }
2✔
309
        req := r.ClientSet.CoreV1().Pods(namespace).GetLogs(podName, &podLogOptions)
2✔
310
        podLogs, err := req.Stream(ctx)
2✔
311
        if err != nil {
2✔
312
                return "", err
×
313
        }
×
314
        defer sprint.PanicOnErrorFunc(podLogs.Close) // Close the stream when done
2✔
315

2✔
316
        buf := new(bytes.Buffer)
2✔
317
        _, err = io.Copy(buf, podLogs)
2✔
318
        if err != nil {
2✔
319
                return "", err
×
320
        }
×
321
        return buf.String(), nil
2✔
322
}
323

324
// SetupWithManager sets up the controller with the Manager.
325
func (r *ArtifactWorkflowReconciler) SetupWithManager(mgr ctrl.Manager) error {
1✔
326
        return ctrl.NewControllerManagedBy(mgr).
1✔
327
                For(&arcv1alpha1.ArtifactWorkflow{}).
1✔
328
                Owns(&wfv1alpha1.Workflow{}).
1✔
329
                Complete(r)
1✔
330
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc