• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sapcc / maintenance-controller / 8330432878

18 Mar 2024 04:47PM UTC coverage: 73.558% (-0.1%) from 73.704%
8330432878

push

github

web-flow
Merge pull request #252 from sapcc/renovate/golang.org-x-exp-digest

Update golang.org/x/exp digest to a85f2c6

2512 of 3415 relevant lines covered (73.56%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.42
/kubernikus/node_controller.go
1
/*******************************************************************************
2
*
3
* Copyright 2020 SAP SE
4
*
5
* Licensed under the Apache License, Version 2.0 (the "License");
6
* you may not use this file except in compliance with the License.
7
* You should have received a copy of the License along with this
8
* program. If not, you may obtain a copy of the License at
9
*
10
*     http://www.apache.org/licenses/LICENSE-2.0
11
*
12
* Unless required by applicable law or agreed to in writing, software
13
* distributed under the License is distributed on an "AS IS" BASIS,
14
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
* See the License for the specific language governing permissions and
16
* limitations under the License.
17
*
18
*******************************************************************************/
19

20
package kubernikus
21

22
import (
23
        "context"
24
        "fmt"
25
        "time"
26

27
        semver "github.com/blang/semver/v4"
28
        "github.com/elastic/go-ucfg"
29
        "github.com/go-logr/logr"
30
        "github.com/gophercloud/gophercloud"
31
        "github.com/gophercloud/gophercloud/openstack"
32
        "github.com/gophercloud/gophercloud/openstack/compute/v2/servers"
33
        "github.com/gophercloud/utils/openstack/clientconfig"
34
        "github.com/sapcc/maintenance-controller/common"
35
        "github.com/sapcc/maintenance-controller/constants"
36
        "github.com/sapcc/ucfgwrap"
37
        v1 "k8s.io/api/core/v1"
38
        "k8s.io/apimachinery/pkg/api/errors"
39
        "k8s.io/apimachinery/pkg/runtime"
40
        "k8s.io/client-go/kubernetes"
41
        "k8s.io/client-go/rest"
42
        ctrl "sigs.k8s.io/controller-runtime"
43
        "sigs.k8s.io/controller-runtime/pkg/client"
44
        "sigs.k8s.io/controller-runtime/pkg/controller"
45
)
46

47
// According to https://pkg.go.dev/k8s.io/client-go/util/workqueue
48
// the same node is never reconciled more than once concurrently.
49
const ConcurrentReconciles = 5
50

51
type Config struct {
52
        Intervals struct {
53
                Requeue     time.Duration `config:"requeue" validate:"required"`
54
                PodDeletion struct {
55
                        Period  time.Duration `config:"period" validate:"required"`
56
                        Timeout time.Duration `config:"timeout" validate:"required"`
57
                } `config:"podDeletion" validate:"required"`
58
                PodEviction struct {
59
                        Period  time.Duration `config:"period" validate:"required"`
60
                        Timeout time.Duration `config:"timeout" validate:"required"`
61
                        Force   bool          `config:"force"`
62
                } `config:"podEviction" validate:"required"`
63
        }
64
}
65

66
func (r *NodeReconciler) loadConfig() (Config, error) {
1✔
67
        yamlConf, err := ucfgwrap.FromYAMLFile(constants.KubernikusConfigFilePath, ucfg.VarExp, ucfg.ResolveEnv)
1✔
68
        if err != nil {
1✔
69
                r.Log.Error(err, "Failed to parse configuration file (syntax error)")
×
70
                return Config{}, err
×
71
        }
×
72
        var conf Config
1✔
73
        err = yamlConf.Unpack(&conf)
1✔
74
        if err != nil {
1✔
75
                r.Log.Error(err, "Failed to parse configuration file (semantic error)")
×
76
                return Config{}, err
×
77
        }
×
78
        return conf, nil
1✔
79
}
80

81
// NodeReconciler reconciles a Node object.
82
type NodeReconciler struct {
83
        client.Client
84
        Conf   *rest.Config
85
        Log    logr.Logger
86
        Scheme *runtime.Scheme
87
}
88

89
// Reconcile reconciles the given request.
90
func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
1✔
91
        conf, err := r.loadConfig()
1✔
92
        if err != nil {
1✔
93
                r.Log.Error(err, "Failed to load Kubernikus configuration")
×
94
                return ctrl.Result{RequeueAfter: conf.Intervals.Requeue}, err
×
95
        }
×
96

97
        node := &v1.Node{}
1✔
98
        err = r.Get(ctx, req.NamespacedName, node)
1✔
99
        if errors.IsNotFound(err) {
2✔
100
                r.Log.Info("Could not find node on the API server, maybe it has been deleted?", "node", req.NamespacedName)
1✔
101
                return ctrl.Result{RequeueAfter: conf.Intervals.Requeue}, nil
1✔
102
        } else if err != nil {
2✔
103
                r.Log.Error(err, "Failed to retrieve node", "node", req.Name)
×
104
                return ctrl.Result{RequeueAfter: conf.Intervals.Requeue}, nil
×
105
        }
×
106

107
        // mark kubelet update
108
        err = r.markUpdate(ctx, node)
1✔
109
        if err != nil {
1✔
110
                r.Log.Error(err, "failed to mark node for kubelet upgrade", "node", node.Name)
×
111
                return ctrl.Result{RequeueAfter: conf.Intervals.Requeue}, nil
×
112
        }
×
113

114
        // delete if requested
115
        shouldDelete, ok := node.Labels[constants.DeleteNodeLabelKey]
1✔
116
        if ok && shouldDelete == constants.TrueStr {
2✔
117
                err = r.deleteNode(ctx, node,
1✔
118
                        common.DrainParameters{
1✔
119
                                Client:    r.Client,
1✔
120
                                Clientset: kubernetes.NewForConfigOrDie(r.Conf),
1✔
121
                                AwaitDeletion: common.WaitParameters{
1✔
122
                                        Period:  conf.Intervals.PodDeletion.Period,
1✔
123
                                        Timeout: conf.Intervals.PodDeletion.Timeout,
1✔
124
                                },
1✔
125
                                Eviction: common.WaitParameters{
1✔
126
                                        Period:  conf.Intervals.PodEviction.Period,
1✔
127
                                        Timeout: conf.Intervals.PodEviction.Timeout,
1✔
128
                                },
1✔
129
                                ForceEviction: conf.Intervals.PodEviction.Force,
1✔
130
                        },
1✔
131
                )
1✔
132
                if err != nil {
2✔
133
                        r.Log.Error(err, "failed to remove Kubernikus node", "node", node.Name)
1✔
134
                        return ctrl.Result{RequeueAfter: conf.Intervals.Requeue}, nil
1✔
135
                }
1✔
136
        }
137

138
        return ctrl.Result{RequeueAfter: conf.Intervals.Requeue}, nil
1✔
139
}
140

141
func (r *NodeReconciler) markUpdate(ctx context.Context, node *v1.Node) error {
1✔
142
        unmodified := node.DeepCopy()
1✔
143
        if node.Labels == nil {
2✔
144
                node.Labels = make(map[string]string)
1✔
145
        }
1✔
146
        update, err := r.needsKubeletUpdate(node)
1✔
147
        if err != nil {
1✔
148
                return err
×
149
        }
×
150
        if update {
2✔
151
                node.Labels[constants.KubeletUpdateLabelKey] = constants.TrueStr
1✔
152
        } else {
2✔
153
                node.Labels[constants.KubeletUpdateLabelKey] = "false"
1✔
154
        }
1✔
155
        err = r.Patch(ctx, node, client.MergeFrom(unmodified))
1✔
156
        if err != nil {
1✔
157
                return err
×
158
        }
×
159
        return nil
1✔
160
}
161

162
func (r *NodeReconciler) needsKubeletUpdate(node *v1.Node) (bool, error) {
1✔
163
        KubeletVersion, err := semver.Parse(node.Status.NodeInfo.KubeletVersion[1:])
1✔
164
        if err != nil {
1✔
165
                return false, err
×
166
        }
×
167

168
        APIVersion, err := getAPIServerVersion(r.Conf)
1✔
169
        if err != nil {
1✔
170
                return false, err
×
171
        }
×
172
        return APIVersion.NE(KubeletVersion), nil
1✔
173
}
174

175
func getAPIServerVersion(conf *rest.Config) (semver.Version, error) {
1✔
176
        client, err := kubernetes.NewForConfig(conf)
1✔
177
        if err != nil {
1✔
178
                return semver.Version{}, fmt.Errorf("failed to create API Server client: %w", err)
×
179
        }
×
180
        return common.GetAPIServerVersion(client)
1✔
181
}
182

183
func (r *NodeReconciler) deleteNode(ctx context.Context, node *v1.Node, params common.DrainParameters) error {
1✔
184
        r.Log.Info("Cordoning, draining and deleting node", "node", node.Name)
1✔
185
        err := common.EnsureSchedulable(ctx, r.Client, node, false)
1✔
186
        // In case of error just retry, cordoning is ensured again
1✔
187
        if err != nil {
1✔
188
                return fmt.Errorf("failed to cordon node %s: %w", node.Name, err)
×
189
        }
×
190
        err = common.EnsureDrain(ctx, node, r.Log, params)
1✔
191
        // In case of error just retry, draining is ensured again
1✔
192
        if err != nil {
1✔
193
                return fmt.Errorf("failed to drain node %s: %w", node.Name, err)
×
194
        }
×
195
        err = deleteVM(ctx, node.Name)
1✔
196
        if err != nil {
2✔
197
                return fmt.Errorf("failed to delete VM backing node %s: %w", node.Name, err)
1✔
198
        }
1✔
199
        return nil
×
200
}
201

202
func deleteVM(ctx context.Context, nodeName string) error {
1✔
203
        osConf, err := common.LoadOpenStackConfig()
1✔
204
        if err != nil {
1✔
205
                return fmt.Errorf("failed to parse cloudprovider.conf: %w", err)
×
206
        }
×
207
        opts := &clientconfig.ClientOpts{
1✔
208
                AuthInfo: &clientconfig.AuthInfo{
1✔
209
                        AuthURL:        osConf.AuthURL,
1✔
210
                        Username:       osConf.Username,
1✔
211
                        Password:       osConf.Password,
1✔
212
                        UserDomainName: osConf.Domainname,
1✔
213
                        ProjectID:      osConf.ProjectID,
1✔
214
                },
1✔
215
        }
1✔
216
        provider, err := clientconfig.AuthenticatedClient(opts)
1✔
217
        if err != nil {
2✔
218
                return fmt.Errorf("failed OpenStack authentification: %w", err)
1✔
219
        }
1✔
220
        provider.Context = ctx
×
221
        compute, err := openstack.NewComputeV2(provider, gophercloud.EndpointOpts{
×
222
                Region: osConf.Region,
×
223
        })
×
224
        if err != nil {
×
225
                return fmt.Errorf("failed to create OS compute endpoint: %w", err)
×
226
        }
×
227
        list, err := servers.List(compute, servers.ListOpts{
×
228
                TenantID: osConf.ProjectID,
×
229
                Name:     nodeName,
×
230
        }).AllPages()
×
231
        if err != nil {
×
232
                return fmt.Errorf("failed to list servers: %w", err)
×
233
        }
×
234
        serverList, err := servers.ExtractServers(list)
×
235
        if err != nil {
×
236
                return fmt.Errorf("failed to extract server list: %w", err)
×
237
        }
×
238
        if len(serverList) == 0 {
×
239
                // if 0 servers are returned the backing VM is already hopefully deleted
×
240
                return nil
×
241
        }
×
242
        if len(serverList) != 1 {
×
243
                return fmt.Errorf("expected to list 1 or 0 servers, but got %v", len(serverList))
×
244
        }
×
245
        result := servers.Delete(compute, serverList[0].ID)
×
246
        if result.ExtractErr() != nil {
×
247
                return fmt.Errorf("failed to delete VM: %w body: %v", result.ExtractErr(), result.Body)
×
248
        }
×
249
        return nil
×
250
}
251

252
// SetupWithManager attaches the controller to the given manager.
253
func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
1✔
254
        return ctrl.NewControllerManagedBy(mgr).
1✔
255
                WithOptions(controller.Options{
1✔
256
                        // According to https://pkg.go.dev/k8s.io/client-go/util/workqueue
1✔
257
                        // the same node is never reconciled more than once concurrently.
1✔
258
                        MaxConcurrentReconciles: ConcurrentReconciles,
1✔
259
                }).
1✔
260
                For(&v1.Node{}).
1✔
261
                Complete(r)
1✔
262
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc