• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

heathcliff26 / kube-upgrade / 19192074357

08 Nov 2025 10:56AM UTC coverage: 66.1% (+2.7%) from 63.42%
19192074357

Pull #150

github

web-flow
Merge 003af39f9 into 366233455
Pull Request #150: controller: Deploy upgraded daemonsets depending from plan

135 of 157 new or added lines in 3 files covered. (85.99%)

1 existing line in 1 file now uncovered.

934 of 1413 relevant lines covered (66.1%)

10.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.7
/pkg/upgrade-controller/controller/controller.go
1
package controller
2

3
import (
4
        "context"
5
        "fmt"
6
        "os"
7
        "time"
8

9
        "github.com/go-logr/logr"
10
        api "github.com/heathcliff26/kube-upgrade/pkg/apis/kubeupgrade/v1alpha2"
11
        "github.com/heathcliff26/kube-upgrade/pkg/client/clientset/versioned/scheme"
12
        "github.com/heathcliff26/kube-upgrade/pkg/constants"
13
        "golang.org/x/mod/semver"
14
        appv1 "k8s.io/api/apps/v1"
15
        corev1 "k8s.io/api/core/v1"
16
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17
        "k8s.io/client-go/kubernetes"
18
        "k8s.io/client-go/rest"
19
        "k8s.io/klog/v2"
20
        ctrl "sigs.k8s.io/controller-runtime"
21
        "sigs.k8s.io/controller-runtime/pkg/client"
22
        "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
23
        "sigs.k8s.io/controller-runtime/pkg/healthz"
24
        "sigs.k8s.io/controller-runtime/pkg/manager"
25
        "sigs.k8s.io/controller-runtime/pkg/manager/signals"
26
)
27

28
func init() {
2✔
29
        ctrl.SetLogger(klog.NewKlogr())
2✔
30
}
2✔
31

32
type controller struct {
33
        client.Client
34
        manager       manager.Manager
35
        client        kubernetes.Interface
36
        namespace     string
37
        upgradedImage string
38
}
39

40
// Run make generate when changing these comments
41
// +kubebuilder:rbac:groups=kubeupgrade.heathcliff.eu,resources=kubeupgradeplans,verbs=get;list;watch;create;update;patch;delete
42
// +kubebuilder:rbac:groups=kubeupgrade.heathcliff.eu,resources=kubeupgradeplans/status,verbs=get;update;patch
43
// +kubebuilder:rbac:groups="",resources=nodes,verbs=list;update
44
func NewController(name string) (*controller, error) {
1✔
45
        config, err := rest.InClusterConfig()
1✔
46
        if err != nil {
2✔
47
                return nil, err
1✔
48
        }
1✔
49
        client, err := kubernetes.NewForConfig(config)
×
50
        if err != nil {
×
51
                return nil, err
×
52
        }
×
53

54
        ns, err := GetNamespace()
×
55
        if err != nil {
×
56
                return nil, err
×
57
        }
×
58

59
        mgr, err := ctrl.NewManager(config, manager.Options{
×
60
                Scheme:                        scheme.Scheme,
×
61
                LeaderElection:                true,
×
62
                LeaderElectionNamespace:       ns,
×
63
                LeaderElectionID:              name,
×
64
                LeaderElectionReleaseOnCancel: true,
×
65
                LeaseDuration:                 Pointer(time.Minute),
×
66
                RenewDeadline:                 Pointer(10 * time.Second),
×
67
                RetryPeriod:                   Pointer(5 * time.Second),
×
68
                HealthProbeBindAddress:        ":9090",
×
69
        })
×
70
        if err != nil {
×
71
                return nil, err
×
72
        }
×
73
        err = mgr.AddHealthzCheck("healthz", healthz.Ping)
×
74
        if err != nil {
×
75
                return nil, err
×
76
        }
×
77
        err = mgr.AddReadyzCheck("readyz", healthz.Ping)
×
78
        if err != nil {
×
79
                return nil, err
×
80
        }
×
81

NEW
82
        upgradedImage := os.Getenv("UPGRADED_IMAGE")
×
NEW
83
        if upgradedImage == "" {
×
NEW
84
                return nil, fmt.Errorf("UPGRADED_IMAGE environment variable is not set")
×
NEW
85
        }
×
86

87
        return &controller{
×
NEW
88
                Client:        mgr.GetClient(),
×
NEW
89
                manager:       mgr,
×
NEW
90
                client:        client,
×
NEW
91
                namespace:     ns,
×
NEW
92
                upgradedImage: upgradedImage,
×
UNCOV
93
        }, nil
×
94
}
95

96
func (c *controller) Run() error {
×
97
        err := ctrl.NewControllerManagedBy(c.manager).For(&api.KubeUpgradePlan{}).Complete(c)
×
98
        if err != nil {
×
99
                return err
×
100
        }
×
101

102
        err = ctrl.NewWebhookManagedBy(c.manager).
×
103
                For(&api.KubeUpgradePlan{}).
×
104
                WithDefaulter(&planMutatingHook{}).
×
105
                WithValidator(&planValidatingHook{}).
×
106
                Complete()
×
107
        if err != nil {
×
108
                return err
×
109
        }
×
110

111
        return c.manager.Start(signals.SetupSignalHandler())
×
112
}
113

114
func (c *controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
×
115
        logger := klog.LoggerWithValues(klog.NewKlogr(), "plan", req.Name)
×
116

×
117
        var plan api.KubeUpgradePlan
×
118
        err := c.Get(ctx, req.NamespacedName, &plan)
×
119
        if err != nil {
×
120
                logger.Error(err, "Failed to get Plan")
×
121
                return ctrl.Result{}, err
×
122
        }
×
123

124
        err = c.reconcile(ctx, &plan, logger)
×
125
        if err != nil {
×
126
                return ctrl.Result{}, err
×
127
        }
×
128

129
        err = c.Status().Update(ctx, &plan)
×
130
        if err != nil {
×
131
                logger.Error(err, "Failed to update plan status")
×
132
                return ctrl.Result{}, err
×
133
        }
×
134

135
        return ctrl.Result{
×
136
                Requeue:      plan.Status.Summary != api.PlanStatusComplete,
×
137
                RequeueAfter: time.Minute,
×
138
        }, nil
×
139
}
140

141
func (c *controller) reconcile(ctx context.Context, plan *api.KubeUpgradePlan, logger logr.Logger) error {
17✔
142
        if plan.Status.Groups == nil {
26✔
143
                plan.Status.Groups = make(map[string]string, len(plan.Spec.Groups))
9✔
144
        }
9✔
145

146
        if controllerutil.AddFinalizer(plan, constants.Finalizer) {
33✔
147
                err := c.Update(ctx, plan)
16✔
148
                if err != nil {
16✔
NEW
149
                        return fmt.Errorf("failed to add finalizer to plan %s: %v", plan.Name, err)
×
NEW
150
                }
×
151
        }
152

153
        daemonsList, err := c.client.AppsV1().DaemonSets(c.namespace).List(ctx, metav1.ListOptions{
17✔
154
                LabelSelector: fmt.Sprintf("%s=%s", constants.LabelPlanName, plan.Name),
17✔
155
        })
17✔
156
        if err != nil {
17✔
NEW
157
                logger.WithValues("plan", plan.Name).Error(err, "Failed to fetch upgraded daemonsets")
×
NEW
158
                return err
×
NEW
159
        }
×
160

161
        if !plan.DeletionTimestamp.IsZero() {
18✔
162
                logger.WithValues("plan", plan.Name).Info("Plan is being deleted, cleaning up resources")
1✔
163
                for _, daemon := range daemonsList.Items {
2✔
164
                        err := c.client.AppsV1().DaemonSets(c.namespace).Delete(ctx, daemon.Name, metav1.DeleteOptions{})
1✔
165
                        if err != nil {
1✔
NEW
166
                                return fmt.Errorf("failed to delete DaemonSet %s: %v", daemon.Name, err)
×
NEW
167
                        }
×
168
                        logger.WithValues("daemon", daemon.Name).Info("Deleted DaemonSet")
1✔
169
                }
170
                controllerutil.RemoveFinalizer(plan, constants.Finalizer)
1✔
171
                err := c.Update(ctx, plan)
1✔
172
                if err != nil {
1✔
NEW
173
                        return fmt.Errorf("failed to remove finalizer from plan %s: %v", plan.Name, err)
×
NEW
174
                }
×
175
                logger.WithValues("plan", plan.Name).Info("Finished cleanup of resources")
1✔
176
                return nil
1✔
177
        }
178

179
        daemons := make(map[string]appv1.DaemonSet, len(plan.Spec.Groups))
16✔
180

16✔
181
        for _, daemon := range daemonsList.Items {
24✔
182
                group := daemon.Labels[constants.LabelNodeGroup]
8✔
183
                if _, ok := plan.Spec.Groups[group]; ok {
14✔
184
                        daemons[group] = daemon
6✔
185
                } else {
8✔
186
                        err := c.client.AppsV1().DaemonSets(c.namespace).Delete(ctx, daemon.Name, metav1.DeleteOptions{})
2✔
187
                        if err != nil {
2✔
NEW
188
                                return fmt.Errorf("failed to delete DaemonSet %s: %v", daemon.Name, err)
×
NEW
189
                        }
×
190
                        logger.WithValues("daemon", daemon.Name).Info("Deleted obsolete DaemonSet")
2✔
191
                }
192
        }
193

194
        nodesToUpdate := make(map[string][]corev1.Node, len(plan.Spec.Groups))
16✔
195
        newGroupStatus := make(map[string]string, len(plan.Spec.Groups))
16✔
196

16✔
197
        for name, cfg := range plan.Spec.Groups {
52✔
198
                upgradedCfg := combineConfig(plan.Spec.Upgraded, plan.Spec.Groups[name].Upgraded)
36✔
199

36✔
200
                selector, err := metav1.LabelSelectorAsSelector(cfg.Labels)
36✔
201
                if err != nil {
36✔
202
                        logger.WithValues("group", name).Error(err, "Failed to convert labelSelector to selector for listing nodes")
×
203
                        return err
×
204
                }
×
205

206
                daemon, ok := daemons[name]
36✔
207
                if !ok {
66✔
208
                        daemon = c.NewEmptyUpgradedDaemonSet(plan.Name, name)
30✔
209
                }
30✔
210
                daemon.Spec = c.NewUpgradedDaemonSetSpec(plan.Name, name)
36✔
211
                daemon.Spec.Template.Spec.NodeSelector = cfg.Labels.MatchLabels
36✔
212
                if ok {
42✔
213
                        _, err = c.client.AppsV1().DaemonSets(c.namespace).Update(ctx, &daemon, metav1.UpdateOptions{})
6✔
214
                } else {
36✔
215
                        logger.WithValues("group", name, "daemon", daemon.Name).Info("Creating upgraded DaemonSet for group")
30✔
216
                        _, err = c.client.AppsV1().DaemonSets(c.namespace).Create(ctx, &daemon, metav1.CreateOptions{})
30✔
217
                }
30✔
218
                if err != nil {
36✔
NEW
219
                        return fmt.Errorf("failed to create/update DaemonSet %s: %v", daemon.Name, err)
×
NEW
220
                }
×
221

222
                nodeList, err := c.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{
36✔
223
                        LabelSelector: selector.String(),
36✔
224
                })
36✔
225
                if err != nil {
36✔
226
                        logger.WithValues("group", name).Error(err, "Failed to get nodes for group")
×
227
                        return err
×
228
                }
×
229

230
                status, update, nodes, err := c.reconcileNodes(plan.Spec.KubernetesVersion, plan.Spec.AllowDowngrade, nodeList.Items, upgradedCfg)
36✔
231
                if err != nil {
36✔
232
                        logger.WithValues("group", name).Error(err, "Failed to reconcile nodes for group")
×
233
                        return err
×
234
                }
×
235

236
                newGroupStatus[name] = status
36✔
237

36✔
238
                if update {
58✔
239
                        nodesToUpdate[name] = nodes
22✔
240
                } else if plan.Status.Groups[name] != newGroupStatus[name] {
46✔
241
                        logger.WithValues("group", name, "status", newGroupStatus[name]).Info("Group changed status")
10✔
242
                }
10✔
243
        }
244

245
        for name, nodes := range nodesToUpdate {
38✔
246
                if groupWaitForDependency(plan.Spec.Groups[name].DependsOn, newGroupStatus) {
28✔
247
                        logger.WithValues("group", name).Info("Group is waiting on dependencies")
6✔
248
                        newGroupStatus[name] = api.PlanStatusWaiting
6✔
249
                        continue
6✔
250
                } else if plan.Status.Groups[name] != newGroupStatus[name] {
30✔
251
                        logger.WithValues("group", name, "status", newGroupStatus[name]).Info("Group changed status")
14✔
252
                }
14✔
253

254
                for _, node := range nodes {
36✔
255
                        _, err := c.client.CoreV1().Nodes().Update(ctx, &node, metav1.UpdateOptions{})
20✔
256
                        if err != nil {
20✔
257
                                return fmt.Errorf("failed to update node %s: %v", node.GetName(), err)
×
258
                        }
×
259
                }
260
        }
261

262
        plan.Status.Groups = newGroupStatus
16✔
263
        plan.Status.Summary = createStatusSummary(plan.Status.Groups)
16✔
264

16✔
265
        return nil
16✔
266
}
267

268
func (c *controller) reconcileNodes(kubeVersion string, downgrade bool, nodes []corev1.Node, cfgAnnotations map[string]string) (string, bool, []corev1.Node, error) {
38✔
269
        if len(nodes) == 0 {
42✔
270
                return api.PlanStatusUnknown, false, nil, nil
4✔
271
        }
4✔
272

273
        completed := 0
34✔
274
        needUpdate := false
34✔
275
        errorNodes := make([]string, 0)
34✔
276

34✔
277
        for i := range nodes {
72✔
278
                if nodes[i].Annotations == nil {
61✔
279
                        nodes[i].Annotations = make(map[string]string)
23✔
280
                }
23✔
281

282
                if applyConfigAnnotations(nodes[i].Annotations, cfgAnnotations) {
40✔
283
                        needUpdate = true
2✔
284
                }
2✔
285

286
                if !downgrade && semver.Compare(kubeVersion, nodes[i].Status.NodeInfo.KubeletVersion) < 0 {
39✔
287
                        return api.PlanStatusError, false, nil, fmt.Errorf("node %s version %s is newer than %s, but downgrade is disabled", nodes[i].GetName(), nodes[i].Status.NodeInfo.KubeletVersion, kubeVersion)
1✔
288
                }
1✔
289

290
                if nodes[i].Annotations[constants.NodeKubernetesVersion] == kubeVersion {
49✔
291
                        switch nodes[i].Annotations[constants.NodeUpgradeStatus] {
12✔
292
                        case constants.NodeUpgradeStatusCompleted:
11✔
293
                                completed++
11✔
294
                        case constants.NodeUpgradeStatusError:
1✔
295
                                errorNodes = append(errorNodes, nodes[i].GetName())
1✔
296
                        }
297
                        continue
12✔
298
                }
299

300
                nodes[i].Annotations[constants.NodeKubernetesVersion] = kubeVersion
25✔
301
                nodes[i].Annotations[constants.NodeUpgradeStatus] = constants.NodeUpgradeStatusPending
25✔
302

25✔
303
                needUpdate = true
25✔
304
        }
305

306
        var status string
33✔
307
        if len(errorNodes) > 0 {
34✔
308
                status = fmt.Sprintf("%s: The nodes %v are reporting errors", api.PlanStatusError, errorNodes)
1✔
309
        } else if len(nodes) == completed {
44✔
310
                status = api.PlanStatusComplete
11✔
311
        } else {
32✔
312
                status = fmt.Sprintf("%s: %d/%d nodes upgraded", api.PlanStatusProgressing, completed, len(nodes))
21✔
313
        }
21✔
314
        return status, needUpdate, nodes, nil
33✔
315
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc