• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

opendefensecloud / artifact-conduit / 20776998573

07 Jan 2026 09:36AM UTC coverage: 83.673% (-1.4%) from 85.034%
20776998573

Pull #165

github

web-flow
Merge 234bb52de into 91d2cf8b0
Pull Request #165: Feature/helm workflow example use trivy

738 of 882 relevant lines covered (83.67%)

936.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.98
/pkg/controller/artifactworkflow_controller.go
1
// Copyright 2025 BWI GmbH and Artefact Conduit contributors
2
// SPDX-License-Identifier: Apache-2.0
3

4
package controller
5

6
import (
7
        "bytes"
8
        "context"
9
        "fmt"
10
        "io"
11
        "slices"
12

13
        wfv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
14
        "github.com/go-logr/logr"
15
        "github.com/jastBytes/sprint"
16
        arcv1alpha1 "go.opendefense.cloud/arc/api/arc/v1alpha1"
17
        corev1 "k8s.io/api/core/v1"
18
        apierrors "k8s.io/apimachinery/pkg/api/errors"
19
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20
        "k8s.io/apimachinery/pkg/runtime"
21
        "k8s.io/apimachinery/pkg/types"
22
        "k8s.io/client-go/kubernetes"
23
        "k8s.io/client-go/tools/record"
24
        ctrl "sigs.k8s.io/controller-runtime"
25
        "sigs.k8s.io/controller-runtime/pkg/builder"
26
        "sigs.k8s.io/controller-runtime/pkg/client"
27
        "sigs.k8s.io/controller-runtime/pkg/handler"
28
        "sigs.k8s.io/controller-runtime/pkg/predicate"
29
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
30
)
31

32
const (
33
        artifactWorkflowFinalizer = "arc.opendefense.cloud/artifact-workflow-finalizer"
34
)
35

36
// ArtifactWorkflowReconciler reconciles a ArtifactWorkflow object
37
type ArtifactWorkflowReconciler struct {
38
        client.Client
39
        ClientSet kubernetes.Interface
40
        Scheme    *runtime.Scheme
41
        Recorder  record.EventRecorder
42
}
43

44
//+kubebuilder:rbac:groups=arc.opendefense.cloud,resources=clusterartifacttypes,verbs=get;list;watch
45
//+kubebuilder:rbac:groups=arc.opendefense.cloud,resources=artifactworkflows/status,verbs=get;update;patch
46
//+kubebuilder:rbac:groups=arc.opendefense.cloud,resources=artifactworkflows/finalizers,verbs=update
47
//+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete
48
//+kubebuilder:rbac:groups=argoproj.io,resources=workflows,verbs=get;list;watch;create;update;patch;delete
49
//+kubebuilder:rbac:groups=argoproj.io,resources=cronworkflows,verbs=get;list;watch;create;update;patch;delete
50
//+kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
51
//+kubebuilder:rbac:groups="",resources=pods;pods/log,verbs=get;list
52

53
// Reconcile moves the current state of the cluster closer to the desired state
54
func (r *ArtifactWorkflowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
2,121✔
55
        log := ctrl.LoggerFrom(ctx)
2,121✔
56
        ctrlResult := ctrl.Result{}
2,121✔
57

2,121✔
58
        aw := &arcv1alpha1.ArtifactWorkflow{}
2,121✔
59
        if err := r.Get(ctx, req.NamespacedName, aw); err != nil {
2,126✔
60
                if apierrors.IsNotFound(err) {
10✔
61
                        // Object not found, return.
5✔
62
                        return ctrlResult, nil
5✔
63
                }
5✔
64
                return ctrlResult, errLogAndWrap(log, err, "failed to get object")
×
65
        }
66

67
        // Two different behaviors are implemented depending on whether we have
68
        // a "one-shot" order or cron order, so let's instantiate the corresponding handler:
69
        var handler WorkflowHandler
2,116✔
70
        if aw.Spec.Cron != nil {
2,132✔
71
                handler = NewCronWorkflowHandler(r, log, aw)
16✔
72
        } else {
2,116✔
73
                handler = NewSingleWorkflowHandler(r, log, aw)
2,100✔
74
        }
2,100✔
75

76
        // Update last reconcile time
77
        aw.Status.LastReconcileAt = metav1.Now()
2,116✔
78

2,116✔
79
        // Handle deletion
2,116✔
80
        if !aw.DeletionTimestamp.IsZero() {
2,121✔
81
                log.V(1).Info("ArtifactWorkflow is being deleted")
5✔
82
                // Cleanup workflow, if exists
5✔
83
                if err := handler.DeleteArgoResources(ctx); err != nil {
5✔
84
                        return ctrlResult, errLogAndWrap(log, err, "workflow deletion failed")
×
85
                }
×
86

87
                // Remove finalizer
88
                if slices.Contains(aw.Finalizers, artifactWorkflowFinalizer) {
10✔
89
                        log.V(1).Info("Removing finalizer from ArtifactWorkflow")
5✔
90
                        aw.Finalizers = slices.DeleteFunc(aw.Finalizers, func(f string) bool {
10✔
91
                                return f == artifactWorkflowFinalizer
5✔
92
                        })
5✔
93
                        if err := r.Update(ctx, aw); err != nil {
5✔
94
                                return ctrlResult, errLogAndWrap(log, err, "failed to remove finalizer")
×
95
                        }
×
96
                }
97
                return ctrlResult, nil
5✔
98
        }
99

100
        // Add finalizer if not present and not deleting
101
        if aw.DeletionTimestamp.IsZero() {
4,222✔
102
                if !slices.Contains(aw.Finalizers, artifactWorkflowFinalizer) {
2,139✔
103
                        log.V(1).Info("Adding finalizer to ArtifactWorkflow")
28✔
104
                        aw.Finalizers = append(aw.Finalizers, artifactWorkflowFinalizer)
28✔
105
                        if err := r.Update(ctx, aw); err != nil {
28✔
106
                                return ctrlResult, errLogAndWrap(log, err, "failed to add finalizer")
×
107
                        }
×
108
                        // Return without requeue; the Update event will trigger reconciliation again
109
                        return ctrlResult, nil
28✔
110
                }
111
        }
112

113
        // Handle force reconcile annotation
114
        forceAt, err := GetForceAtAnnotationValue(aw)
2,083✔
115
        if err != nil {
2,083✔
116
                log.V(1).Error(err, "Invalid force reconcile annotation, ignoring")
×
117
        }
×
118
        if !forceAt.IsZero() && (aw.Status.LastForceAt.IsZero() || forceAt.After(aw.Status.LastForceAt.Time)) {
2,084✔
119
                log.V(1).Info("Force reconcile requested")
1✔
120
                r.Recorder.Event(aw, corev1.EventTypeNormal, "ForceReconcile", "Force reconcile requested via annotation")
1✔
121
                // Delete existing workflow, if any
1✔
122
                if err := handler.DeleteArgoResources(ctx); err != nil {
1✔
123
                        return ctrlResult, errLogAndWrap(log, err, "failed to delete existing workflow for force reconcile")
×
124
                }
×
125
                // Reset phase so workflow gets recreated, and update last force time
126
                aw.Status.Phase = arcv1alpha1.WorkflowUnknown
1✔
127
                aw.Status.LastForceAt = metav1.Now()
1✔
128
                if err := r.Status().Update(ctx, aw); err != nil {
1✔
129
                        return ctrlResult, errLogAndWrap(log, err, "failed to update last force time")
×
130
                }
×
131
                // Return without requeue; the update event will trigger reconciliation again
132
                return ctrlResult, nil
1✔
133
        }
134

135
        if aw.Status.Phase == arcv1alpha1.WorkflowUnknown {
3,221✔
136
                return ctrlResult, handler.CreateArgoResources(ctx)
1,139✔
137
        }
1,139✔
138

139
        if aw.Status.Phase.InProgress() || aw.Spec.Cron != nil {
1,882✔
140
                return ctrlResult, handler.CheckArgoResources(ctx)
939✔
141
        }
939✔
142

143
        return ctrlResult, nil
4✔
144
}
145

146
func (r *ArtifactWorkflowReconciler) setStatusFromWorkflow(ctx context.Context, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow, wf *wfv1alpha1.Workflow) bool {
934✔
147
        if aw.Status.Phase == arcv1alpha1.WorkflowPhase(wf.Status.Phase) {
938✔
148
                return false // nothing updated
4✔
149
        }
4✔
150
        aw.Status.Phase = arcv1alpha1.WorkflowPhase(wf.Status.Phase)
930✔
151

930✔
152
        switch aw.Status.Phase {
930✔
153
        case arcv1alpha1.WorkflowSucceeded, arcv1alpha1.WorkflowStopped:
5✔
154
                aw.Status.CompletionTime = metav1.Now()
5✔
155
        case arcv1alpha1.WorkflowError, arcv1alpha1.WorkflowFailed:
6✔
156
                aw.Status.Message = wf.Status.Message
6✔
157
                r.generateWorkflowStatusMessage(ctx, wf, log, aw)
6✔
158
        }
159
        return true
930✔
160
}
161

162
func (r *ArtifactWorkflowReconciler) generateWorkflowStatusMessage(ctx context.Context, wf *wfv1alpha1.Workflow, log logr.Logger, aw *arcv1alpha1.ArtifactWorkflow) {
6✔
163
        failedNodes := []struct {
6✔
164
                Name    string
6✔
165
                Pod     string
6✔
166
                Message string
6✔
167
        }{}
6✔
168
        for _, node := range wf.Status.Nodes {
9✔
169
                if (node.Phase == wfv1alpha1.NodeFailed || node.Phase == wfv1alpha1.NodeError) && node.Type == wfv1alpha1.NodeTypePod {
5✔
170
                        nr := struct {
2✔
171
                                Name    string
2✔
172
                                Pod     string
2✔
173
                                Message string
2✔
174
                        }{
2✔
175
                                Name:    node.DisplayName,
2✔
176
                                Pod:     generatePodNameFromNodeStatus(node),
2✔
177
                                Message: node.Message,
2✔
178
                        }
2✔
179
                        failedNodes = append(failedNodes, nr)
2✔
180
                }
2✔
181
        }
182

183
        for _, nr := range failedNodes {
8✔
184
                logs, err := r.fetchPodLogs(ctx, aw.Namespace, nr.Pod)
2✔
185
                if err != nil {
2✔
186
                        log.V(1).Info("failed to fetch pod logs", "pod", nr.Pod, "error", err)
×
187
                        aw.Status.Message += fmt.Sprintf("Step '%s' failed:\n%s\n\n", nr.Name, nr.Message)
×
188
                        continue
×
189
                }
190
                aw.Status.Message += fmt.Sprintf("Step '%s' failed:\n%s\nLogs:\n%s\n\n", nr.Name, nr.Message, logs)
2✔
191
        }
192
}
193

194
func (r *ArtifactWorkflowReconciler) fetchPodLogs(ctx context.Context, namespace, podName string) (string, error) {
2✔
195
        podLogOptions := corev1.PodLogOptions{
2✔
196
                Container: "main", // Assuming the main container
2✔
197
                Follow:    false,
2✔
198
                TailLines: sprint.ToPointer(int64(30)), // Fetch last 30 lines
2✔
199
        }
2✔
200
        req := r.ClientSet.CoreV1().Pods(namespace).GetLogs(podName, &podLogOptions)
2✔
201
        podLogs, err := req.Stream(ctx)
2✔
202
        if err != nil {
2✔
203
                return "", err
×
204
        }
×
205
        defer sprint.PanicOnErrorFunc(podLogs.Close) // Close the stream when done
2✔
206

2✔
207
        buf := new(bytes.Buffer)
2✔
208
        _, err = io.Copy(buf, podLogs)
2✔
209
        if err != nil {
2✔
210
                return "", err
×
211
        }
×
212
        return buf.String(), nil
2✔
213
}
214

215
func (r *ArtifactWorkflowReconciler) retrieveSecrets(ctx context.Context, aw *arcv1alpha1.ArtifactWorkflow) (*corev1.Secret, *corev1.Secret, error) {
1,139✔
216
        srcSecret := corev1.Secret{}
1,139✔
217
        if aw.Spec.SrcSecretRef.Name != "" {
1,908✔
218
                if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.SrcSecretRef.Name), &srcSecret); err != nil {
769✔
219
                        r.Recorder.Event(aw, corev1.EventTypeWarning, "InvalidSecret", fmt.Sprintf("Failed to fetch source secret '%s': %v", aw.Spec.SrcSecretRef.Name, err))
×
220
                        return nil, nil, fmt.Errorf("failed to fetch secret for source: %w", err)
×
221
                }
×
222
        }
223

224
        dstSecret := corev1.Secret{}
1,139✔
225
        if aw.Spec.DstSecretRef.Name != "" {
1,908✔
226
                if err := r.Get(ctx, namespacedName(aw.Namespace, aw.Spec.DstSecretRef.Name), &dstSecret); err != nil {
769✔
227
                        r.Recorder.Event(aw, corev1.EventTypeWarning, "InvalidSecret", fmt.Sprintf("Failed to fetch destination secret '%s': %v", aw.Spec.DstSecretRef.Name, err))
×
228
                        return nil, nil, fmt.Errorf("failed to fetch secret for destination: %w", err)
×
229
                }
×
230
        }
231

232
        return &srcSecret, &dstSecret, nil
1,139✔
233
}
234

235
// SetupWithManager sets up the controller with the Manager.
236
func (r *ArtifactWorkflowReconciler) SetupWithManager(mgr ctrl.Manager) error {
1✔
237
        return ctrl.NewControllerManagedBy(mgr).
1✔
238
                For(&arcv1alpha1.ArtifactWorkflow{}).
1✔
239
                Owns(&wfv1alpha1.Workflow{}).
1✔
240
                Owns(&wfv1alpha1.CronWorkflow{}).
1✔
241
                Watches(
1✔
242
                        &wfv1alpha1.Workflow{},
1✔
243
                        handler.EnqueueRequestsFromMapFunc(r.findArtifactWorkflowsForWorkflowOwnedByCronWorkflow()),
1✔
244
                        builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
1✔
245
                ).
1✔
246
                Complete(r)
1✔
247
}
1✔
248

249
// findArtifactWorkflowsForWorkflowOwnedByCronWorkflow returns a function that finds ArtifactWorkflows
250
// that own CronWorkflows that own the given Workflow. This handles the ownership chain:
251
// ArtifactWorkflow -> CronWorkflow -> Workflow
252
func (r *ArtifactWorkflowReconciler) findArtifactWorkflowsForWorkflowOwnedByCronWorkflow() func(context.Context, client.Object) []ctrl.Request {
1✔
253
        return func(ctx context.Context, wf client.Object) []ctrl.Request {
54✔
254
                // Find the CronWorkflow that owns this Workflow
53✔
255
                ownerReferences := wf.GetOwnerReferences()
53✔
256
                var cronWorkflowName, cronWorkflowNamespace string
53✔
257

53✔
258
                for _, owner := range ownerReferences {
106✔
259
                        if owner.Kind == "CronWorkflow" && owner.APIVersion == wfv1alpha1.SchemeGroupVersion.String() {
58✔
260
                                cronWorkflowName = owner.Name
5✔
261
                                cronWorkflowNamespace = wf.GetNamespace() // Owner is in same namespace
5✔
262
                                break
5✔
263
                        }
264
                }
265

266
                if cronWorkflowName == "" {
101✔
267
                        // Workflow is not owned by a CronWorkflow
48✔
268
                        return []reconcile.Request{}
48✔
269
                }
48✔
270

271
                // Find the ArtifactWorkflow that owns this CronWorkflow
272
                cwf := &wfv1alpha1.CronWorkflow{}
5✔
273
                if err := r.Get(ctx, types.NamespacedName{Name: cronWorkflowName, Namespace: cronWorkflowNamespace}, cwf); err != nil {
5✔
274
                        // CronWorkflow not found or error occurred
×
275
                        return []reconcile.Request{}
×
276
                }
×
277

278
                var artifactWorkflowName string
5✔
279
                for _, owner := range cwf.GetOwnerReferences() {
10✔
280
                        if owner.Kind == "ArtifactWorkflow" && owner.APIVersion == arcv1alpha1.SchemeGroupVersion.String() {
10✔
281
                                artifactWorkflowName = owner.Name
5✔
282
                                break
5✔
283
                        }
284
                }
285

286
                if artifactWorkflowName == "" {
5✔
287
                        // CronWorkflow is not owned by an ArtifactWorkflow
×
288
                        return []reconcile.Request{}
×
289
                }
×
290

291
                return []reconcile.Request{
5✔
292
                        {
5✔
293
                                NamespacedName: types.NamespacedName{
5✔
294
                                        Name:      artifactWorkflowName,
5✔
295
                                        Namespace: cronWorkflowNamespace,
5✔
296
                                },
5✔
297
                        },
5✔
298
                }
5✔
299
        }
300
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc