• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

heathcliff26 / kube-upgrade / 19192297901

08 Nov 2025 11:16AM UTC coverage: 66.1% (+2.7%) from 63.42%
19192297901

push

github

heathcliff26
controller: Deploy upgraded daemonsets depending from plan

Ensure that upgraded daemons are managed by the controller directly.
This ensures they are only scheduled where needed.
They will be deleted when the plan is deleted.

As node selectors only accept a list of labels, the api will need to be
updated.

Signed-off-by: Heathcliff <heathcliff@heathcliff.eu>

135 of 157 new or added lines in 3 files covered. (85.99%)

1 existing line in 1 file now uncovered.

934 of 1413 relevant lines covered (66.1%)

10.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.7
/pkg/upgrade-controller/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "os"
7
        "time"
8

9
        "github.com/go-logr/logr"
10
        api "github.com/heathcliff26/kube-upgrade/pkg/apis/kubeupgrade/v1alpha2"
11
        "github.com/heathcliff26/kube-upgrade/pkg/client/clientset/versioned/scheme"
12
        "github.com/heathcliff26/kube-upgrade/pkg/constants"
13
        "golang.org/x/mod/semver"
14
        appv1 "k8s.io/api/apps/v1"
15
        corev1 "k8s.io/api/core/v1"
16
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17
        "k8s.io/client-go/kubernetes"
18
        "k8s.io/client-go/rest"
19
        "k8s.io/klog/v2"
20
        ctrl "sigs.k8s.io/controller-runtime"
21
        "sigs.k8s.io/controller-runtime/pkg/client"
22
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
23
        "sigs.k8s.io/controller-runtime/pkg/healthz"
24
        "sigs.k8s.io/controller-runtime/pkg/manager"
25
        "sigs.k8s.io/controller-runtime/pkg/manager/signals"
26
)
27

28
func init() {
2✔
29
        ctrl.SetLogger(klog.NewKlogr())
2✔
30
}
2✔
31

32
type controller struct {
33
        client.Client
34
        manager       manager.Manager
35
        client        kubernetes.Interface
36
        namespace     string
37
        upgradedImage string
38
}
39

40
// Run make generate when changing these comments
41
// +kubebuilder:rbac:groups=kubeupgrade.heathcliff.eu,resources=kubeupgradeplans,verbs=get;list;watch;create;update;patch;delete
42
// +kubebuilder:rbac:groups=kubeupgrade.heathcliff.eu,resources=kubeupgradeplans/status,verbs=get;update;patch
43
// +kubebuilder:rbac:groups="",resources=nodes,verbs=list;update
44
func NewController(name string) (*controller, error) {
1✔
45
        config, err := rest.InClusterConfig()
1✔
46
        if err != nil {
2✔
47
                return nil, err
1✔
48
        }
1✔
49
        client, err := kubernetes.NewForConfig(config)
×
50
        if err != nil {
×
51
                return nil, err
×
52
        }
×
53

54
        ns, err := GetNamespace()
×
55
        if err != nil {
×
56
                return nil, err
×
57
        }
×
58

59
        mgr, err := ctrl.NewManager(config, manager.Options{
×
60
                Scheme:                        scheme.Scheme,
×
61
                LeaderElection:                true,
×
62
                LeaderElectionNamespace:       ns,
×
63
                LeaderElectionID:              name,
×
64
                LeaderElectionReleaseOnCancel: true,
×
65
                LeaseDuration:                 Pointer(time.Minute),
×
66
                RenewDeadline:                 Pointer(10 * time.Second),
×
67
                RetryPeriod:                   Pointer(5 * time.Second),
×
68
                HealthProbeBindAddress:        ":9090",
×
69
        })
×
70
        if err != nil {
×
71
                return nil, err
×
72
        }
×
73
        err = mgr.AddHealthzCheck("healthz", healthz.Ping)
×
74
        if err != nil {
×
75
                return nil, err
×
76
        }
×
77
        err = mgr.AddReadyzCheck("readyz", healthz.Ping)
×
78
        if err != nil {
×
79
                return nil, err
×
80
        }
×
81

NEW
82
        upgradedImage := os.Getenv("UPGRADED_IMAGE")
×
NEW
83
        if upgradedImage == "" {
×
NEW
84
                return nil, fmt.Errorf("UPGRADED_IMAGE environment variable is not set")
×
NEW
85
        }
×
86

87
        return &controller{
×
NEW
88
                Client:        mgr.GetClient(),
×
NEW
89
                manager:       mgr,
×
NEW
90
                client:        client,
×
NEW
91
                namespace:     ns,
×
NEW
92
                upgradedImage: upgradedImage,
×
UNCOV
93
        }, nil
×
94
}
95

96
func (c *controller) Run() error {
×
97
        err := ctrl.NewControllerManagedBy(c.manager).For(&api.KubeUpgradePlan{}).Complete(c)
×
98
        if err != nil {
×
99
                return err
×
100
        }
×
101

102
        err = ctrl.NewWebhookManagedBy(c.manager).
×
103
                For(&api.KubeUpgradePlan{}).
×
104
                WithDefaulter(&planMutatingHook{}).
×
105
                WithValidator(&planValidatingHook{}).
×
106
                Complete()
×
107
        if err != nil {
×
108
                return err
×
109
        }
×
110

111
        return c.manager.Start(signals.SetupSignalHandler())
×
112
}
113

114
func (c *controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
×
115
        logger := klog.LoggerWithValues(klog.NewKlogr(), "plan", req.Name)
×
116

×
117
        var plan api.KubeUpgradePlan
×
118
        err := c.Get(ctx, req.NamespacedName, &plan)
×
119
        if err != nil {
×
120
                logger.Error(err, "Failed to get Plan")
×
121
                return ctrl.Result{}, err
×
122
        }
×
123

124
        err = c.reconcile(ctx, &plan, logger)
×
125
        if err != nil {
×
126
                return ctrl.Result{}, err
×
127
        }
×
128

129
        err = c.Status().Update(ctx, &plan)
×
130
        if err != nil {
×
131
                logger.Error(err, "Failed to update plan status")
×
132
                return ctrl.Result{}, err
×
133
        }
×
134

135
        return ctrl.Result{
×
136
                Requeue:      plan.Status.Summary != api.PlanStatusComplete,
×
137
                RequeueAfter: time.Minute,
×
138
        }, nil
×
139
}
140

141
func (c *controller) reconcile(ctx context.Context, plan *api.KubeUpgradePlan, logger logr.Logger) error {
17✔
142
        if plan.Status.Groups == nil {
26✔
143
                plan.Status.Groups = make(map[string]string, len(plan.Spec.Groups))
9✔
144
        }
9✔
145

146
        if controllerutil.AddFinalizer(plan, constants.Finalizer) {
33✔
147
                err := c.Update(ctx, plan)
16✔
148
                if err != nil {
16✔
NEW
149
                        return fmt.Errorf("failed to add finalizer to plan %s: %v", plan.Name, err)
×
NEW
150
                }
×
151
        }
152

153
        daemonsList, err := c.client.AppsV1().DaemonSets(c.namespace).List(ctx, metav1.ListOptions{
17✔
154
                LabelSelector: fmt.Sprintf("%s=%s", constants.LabelPlanName, plan.Name),
17✔
155
        })
17✔
156
        if err != nil {
17✔
NEW
157
                logger.WithValues("plan", plan.Name).Error(err, "Failed to fetch upgraded daemonsets")
×
NEW
158
                return err
×
NEW
159
        }
×
160

161
        if !plan.DeletionTimestamp.IsZero() {
18✔
162
                logger.WithValues("plan", plan.Name).Info("Plan is being deleted, cleaning up resources")
1✔
163
                for _, daemon := range daemonsList.Items {
2✔
164
                        err := c.client.AppsV1().DaemonSets(c.namespace).Delete(ctx, daemon.Name, metav1.DeleteOptions{})
1✔
165
                        if err != nil {
1✔
NEW
166
                                return fmt.Errorf("failed to delete DaemonSet %s: %v", daemon.Name, err)
×
NEW
167
                        }
×
168
                        logger.WithValues("daemon", daemon.Name).Info("Deleted DaemonSet")
1✔
169
                }
170
                controllerutil.RemoveFinalizer(plan, constants.Finalizer)
1✔
171
                err := c.Update(ctx, plan)
1✔
172
                if err != nil {
1✔
NEW
173
                        return fmt.Errorf("failed to remove finalizer from plan %s: %v", plan.Name, err)
×
NEW
174
                }
×
175
                logger.WithValues("plan", plan.Name).Info("Finished cleanup of resources")
1✔
176
                return nil
1✔
177
        }
178

179
        daemons := make(map[string]appv1.DaemonSet, len(plan.Spec.Groups))
16✔
180

16✔
181
        for _, daemon := range daemonsList.Items {
24✔
182
                group := daemon.Labels[constants.LabelNodeGroup]
8✔
183
                if _, ok := plan.Spec.Groups[group]; ok {
14✔
184
                        daemons[group] = daemon
6✔
185
                } else {
8✔
186
                        err := c.client.AppsV1().DaemonSets(c.namespace).Delete(ctx, daemon.Name, metav1.DeleteOptions{})
2✔
187
                        if err != nil {
2✔
NEW
188
                                return fmt.Errorf("failed to delete DaemonSet %s: %v", daemon.Name, err)
×
NEW
189
                        }
×
190
                        logger.WithValues("daemon", daemon.Name).Info("Deleted obsolete DaemonSet")
2✔
191
                }
192
        }
193

194
        nodesToUpdate := make(map[string][]corev1.Node, len(plan.Spec.Groups))
16✔
195
        newGroupStatus := make(map[string]string, len(plan.Spec.Groups))
16✔
196

16✔
197
        for name, cfg := range plan.Spec.Groups {
52✔
198
                upgradedCfg := combineConfig(plan.Spec.Upgraded, plan.Spec.Groups[name].Upgraded)
36✔
199

36✔
200
                selector, err := metav1.LabelSelectorAsSelector(cfg.Labels)
36✔
201
                if err != nil {
36✔
202
                        logger.WithValues("group", name).Error(err, "Failed to convert labelSelector to selector for listing nodes")
×
203
                        return err
×
204
                }
×
205

206
                daemon, ok := daemons[name]
36✔
207
                if !ok {
66✔
208
                        daemon = c.NewEmptyUpgradedDaemonSet(plan.Name, name)
30✔
209
                }
30✔
210
                daemon.Spec = c.NewUpgradedDaemonSetSpec(plan.Name, name)
36✔
211
                daemon.Spec.Template.Spec.NodeSelector = cfg.Labels.MatchLabels
36✔
212
                if ok {
42✔
213
                        _, err = c.client.AppsV1().DaemonSets(c.namespace).Update(ctx, &daemon, metav1.UpdateOptions{})
6✔
214
                } else {
36✔
215
                        logger.WithValues("group", name, "daemon", daemon.Name).Info("Creating upgraded DaemonSet for group")
30✔
216
                        _, err = c.client.AppsV1().DaemonSets(c.namespace).Create(ctx, &daemon, metav1.CreateOptions{})
30✔
217
                }
30✔
218
                if err != nil {
36✔
NEW
219
                        return fmt.Errorf("failed to create/update DaemonSet %s: %v", daemon.Name, err)
×
NEW
220
                }
×
221

222
                nodeList, err := c.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{
36✔
223
                        LabelSelector: selector.String(),
36✔
224
                })
36✔
225
                if err != nil {
36✔
226
                        logger.WithValues("group", name).Error(err, "Failed to get nodes for group")
×
227
                        return err
×
228
                }
×
229

230
                status, update, nodes, err := c.reconcileNodes(plan.Spec.KubernetesVersion, plan.Spec.AllowDowngrade, nodeList.Items, upgradedCfg)
36✔
231
                if err != nil {
36✔
232
                        logger.WithValues("group", name).Error(err, "Failed to reconcile nodes for group")
×
233
                        return err
×
234
                }
×
235

236
                newGroupStatus[name] = status
36✔
237

36✔
238
                if update {
58✔
239
                        nodesToUpdate[name] = nodes
22✔
240
                } else if plan.Status.Groups[name] != newGroupStatus[name] {
46✔
241
                        logger.WithValues("group", name, "status", newGroupStatus[name]).Info("Group changed status")
10✔
242
                }
10✔
243
        }
244

245
        for name, nodes := range nodesToUpdate {
38✔
246
                if groupWaitForDependency(plan.Spec.Groups[name].DependsOn, newGroupStatus) {
28✔
247
                        logger.WithValues("group", name).Info("Group is waiting on dependencies")
6✔
248
                        newGroupStatus[name] = api.PlanStatusWaiting
6✔
249
                        continue
6✔
250
                } else if plan.Status.Groups[name] != newGroupStatus[name] {
30✔
251
                        logger.WithValues("group", name, "status", newGroupStatus[name]).Info("Group changed status")
14✔
252
                }
14✔
253

254
                for _, node := range nodes {
36✔
255
                        _, err := c.client.CoreV1().Nodes().Update(ctx, &node, metav1.UpdateOptions{})
20✔
256
                        if err != nil {
20✔
257
                                return fmt.Errorf("failed to update node %s: %v", node.GetName(), err)
×
258
                        }
×
259
                }
260
        }
261

262
        plan.Status.Groups = newGroupStatus
16✔
263
        plan.Status.Summary = createStatusSummary(plan.Status.Groups)
16✔
264

16✔
265
        return nil
16✔
266
}
267

268
func (c *controller) reconcileNodes(kubeVersion string, downgrade bool, nodes []corev1.Node, cfgAnnotations map[string]string) (string, bool, []corev1.Node, error) {
38✔
269
        if len(nodes) == 0 {
42✔
270
                return api.PlanStatusUnknown, false, nil, nil
4✔
271
        }
4✔
272

273
        completed := 0
34✔
274
        needUpdate := false
34✔
275
        errorNodes := make([]string, 0)
34✔
276

34✔
277
        for i := range nodes {
72✔
278
                if nodes[i].Annotations == nil {
61✔
279
                        nodes[i].Annotations = make(map[string]string)
23✔
280
                }
23✔
281

282
                if applyConfigAnnotations(nodes[i].Annotations, cfgAnnotations) {
40✔
283
                        needUpdate = true
2✔
284
                }
2✔
285

286
                if !downgrade && semver.Compare(kubeVersion, nodes[i].Status.NodeInfo.KubeletVersion) < 0 {
39✔
287
                        return api.PlanStatusError, false, nil, fmt.Errorf("node %s version %s is newer than %s, but downgrade is disabled", nodes[i].GetName(), nodes[i].Status.NodeInfo.KubeletVersion, kubeVersion)
1✔
288
                }
1✔
289

290
                if nodes[i].Annotations[constants.NodeKubernetesVersion] == kubeVersion {
49✔
291
                        switch nodes[i].Annotations[constants.NodeUpgradeStatus] {
12✔
292
                        case constants.NodeUpgradeStatusCompleted:
11✔
293
                                completed++
11✔
294
                        case constants.NodeUpgradeStatusError:
1✔
295
                                errorNodes = append(errorNodes, nodes[i].GetName())
1✔
296
                        }
297
                        continue
12✔
298
                }
299

300
                nodes[i].Annotations[constants.NodeKubernetesVersion] = kubeVersion
25✔
301
                nodes[i].Annotations[constants.NodeUpgradeStatus] = constants.NodeUpgradeStatusPending
25✔
302

25✔
303
                needUpdate = true
25✔
304
        }
305

306
        var status string
33✔
307
        if len(errorNodes) > 0 {
34✔
308
                status = fmt.Sprintf("%s: The nodes %v are reporting errors", api.PlanStatusError, errorNodes)
1✔
309
        } else if len(nodes) == completed {
44✔
310
                status = api.PlanStatusComplete
11✔
311
        } else {
32✔
312
                status = fmt.Sprintf("%s: %d/%d nodes upgraded", api.PlanStatusProgressing, completed, len(nodes))
21✔
313
        }
21✔
314
        return status, needUpdate, nodes, nil
33✔
315
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc