• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / lws / 21448367270

28 Jan 2026 05:19PM UTC coverage: 36.264% (+0.2%) from 36.111%
21448367270

push

github

web-flow
chore(deps): upgrade Kubernetes and related dependencies to v0.35.0 (#741)

Signed-off-by: Kay Yan <kay.yan@daocloud.io>

0 of 20 new or added lines in 4 files covered. (0.0%)

1 existing line in 1 file now uncovered.

858 of 2366 relevant lines covered (36.26%)

2.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.18
/pkg/controllers/leaderworkerset_controller.go
1
/*
2
Copyright 2023.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controllers
18

19
import (
20
        "context"
21
        "fmt"
22
        "strconv"
23
        "time"
24

25
        appsv1 "k8s.io/api/apps/v1"
26
        corev1 "k8s.io/api/core/v1"
27
        apierrors "k8s.io/apimachinery/pkg/api/errors"
28
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
        "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        "k8s.io/apimachinery/pkg/util/intstr"
33
        appsapplyv1 "k8s.io/client-go/applyconfigurations/apps/v1"
34
        coreapplyv1 "k8s.io/client-go/applyconfigurations/core/v1"
35
        metaapplyv1 "k8s.io/client-go/applyconfigurations/meta/v1"
36
        "k8s.io/client-go/tools/record"
37
        "k8s.io/klog/v2"
38
        "k8s.io/utils/ptr"
39
        ctrl "sigs.k8s.io/controller-runtime"
40
        "sigs.k8s.io/controller-runtime/pkg/client"
41
        "sigs.k8s.io/controller-runtime/pkg/handler"
42
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
43

44
        leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1"
45
        "sigs.k8s.io/lws/pkg/utils"
46
        controllerutils "sigs.k8s.io/lws/pkg/utils/controller"
47
        podutils "sigs.k8s.io/lws/pkg/utils/pod"
48
        revisionutils "sigs.k8s.io/lws/pkg/utils/revision"
49
        statefulsetutils "sigs.k8s.io/lws/pkg/utils/statefulset"
50
)
51

52
// LeaderWorkerSetReconciler reconciles a LeaderWorkerSet object
53
type LeaderWorkerSetReconciler struct {
54
        client.Client
55
        Scheme *runtime.Scheme
56
        Record record.EventRecorder
57
}
58

59
var (
60
        apiGVStr = leaderworkerset.GroupVersion.String()
61
)
62

63
const (
64
        lwsOwnerKey  = ".metadata.controller"
65
        fieldManager = "lws"
66
)
67

68
const (
69
        // FailedCreate Event reason used when a resource creation fails.
70
        // The event uses the error(s) as the reason.
71
        FailedCreate      = "FailedCreate"
72
        GroupsProgressing = "GroupsProgressing"
73
        GroupsUpdating    = "GroupsUpdating"
74
        CreatingRevision  = "CreatingRevision"
75
)
76

77
func NewLeaderWorkerSetReconciler(client client.Client, scheme *runtime.Scheme, record record.EventRecorder) *LeaderWorkerSetReconciler {
×
78
        return &LeaderWorkerSetReconciler{
×
79
                Client: client,
×
80
                Scheme: scheme,
×
81
                Record: record,
×
82
        }
×
83
}
×
84

85
//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch
86
//+kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets,verbs=get;list;watch;create;update;patch;delete
87
//+kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets/status,verbs=get;update;patch
88
//+kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets/finalizers,verbs=update
89
//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
90
//+kubebuilder:rbac:groups=apps,resources=statefulsets/status,verbs=get;update;patch
91
//+kubebuilder:rbac:groups=apps,resources=statefulsets/finalizers,verbs=update
92
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
93
//+kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;patch
94
//+kubebuilder:rbac:groups=apps,resources=controllerrevisions,verbs=get;list;watch;create;update;patch;delete
95
//+kubebuilder:rbac:groups=apps,resources=controllerrevisions/status,verbs=get;update;patch
96
//+kubebuilder:rbac:groups=apps,resources=controllerrevisions/finalizers,verbs=update
97

98
func (r *LeaderWorkerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
×
99
        // Get leaderworkerset object
×
100
        lws := &leaderworkerset.LeaderWorkerSet{}
×
101
        if err := r.Get(ctx, types.NamespacedName{Name: req.Name, Namespace: req.Namespace}, lws); err != nil {
×
102
                return ctrl.Result{}, client.IgnoreNotFound(err)
×
103
        }
×
104

105
        if lws.DeletionTimestamp != nil {
×
106
                return ctrl.Result{}, nil
×
107
        }
×
108

109
        log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws))
×
110
        ctx = ctrl.LoggerInto(ctx, log)
×
111

×
112
        leaderSts, err := r.getLeaderStatefulSet(ctx, lws)
×
113
        if err != nil {
×
114
                log.Error(err, "Fetching leader statefulset")
×
115
                return ctrl.Result{}, err
×
116
        }
×
117

118
        if leaderSts != nil && leaderSts.DeletionTimestamp != nil {
×
119
                return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
×
120
        }
×
121

122
        // Handles two cases:
123
        // Case 1: Upgrading the LWS controller from a version that doesn't support controller revision
124
        // Case 2: Creating the controller revision for a newly created LWS object
125
        revision, err := r.getOrCreateRevisionIfNonExist(ctx, leaderSts, lws, r.Record)
×
126
        if err != nil {
×
127
                log.Error(err, "Creating controller revision")
×
128
                return ctrl.Result{}, err
×
129
        }
×
130

131
        updatedRevision, err := r.getUpdatedRevision(ctx, leaderSts, lws, revision)
×
132
        if err != nil {
×
133
                log.Error(err, "Validating if LWS has been updated")
×
134
                return ctrl.Result{}, err
×
135
        }
×
136
        lwsUpdated := updatedRevision != nil
×
137
        if lwsUpdated {
×
138
                revision, err = revisionutils.CreateRevision(ctx, r.Client, updatedRevision)
×
139
                if err != nil {
×
140
                        log.Error(err, "Creating revision for updated LWS")
×
141
                        return ctrl.Result{}, err
×
142
                }
×
143
                r.Record.Eventf(lws, corev1.EventTypeNormal, CreatingRevision, fmt.Sprintf("Creating revision with key %s for updated LWS", revisionutils.GetRevisionKey(revision)))
×
144
        }
145

146
        partition, replicas, err := r.rollingUpdateParameters(ctx, lws, leaderSts, revisionutils.GetRevisionKey(revision), lwsUpdated)
×
147
        if err != nil {
×
148
                log.Error(err, "Rolling partition error")
×
149
                return ctrl.Result{}, err
×
150
        }
×
151

152
        if err := r.SSAWithStatefulset(ctx, lws, partition, replicas, revisionutils.GetRevisionKey(revision)); err != nil {
×
153
                if leaderSts == nil {
×
154
                        r.Record.Eventf(lws, corev1.EventTypeWarning, FailedCreate, fmt.Sprintf("Failed to create leader statefulset %s", lws.Name))
×
155
                }
×
156
                return ctrl.Result{}, err
×
157
        }
158

159
        if leaderSts == nil {
×
160
                // An event is logged to track sts creation.
×
161
                r.Record.Eventf(lws, corev1.EventTypeNormal, GroupsProgressing, fmt.Sprintf("Created leader statefulset %s", lws.Name))
×
162
        } else if !lwsUpdated && partition != *leaderSts.Spec.UpdateStrategy.RollingUpdate.Partition {
×
163
                // An event is logged to track update progress.
×
164
                r.Record.Eventf(lws, corev1.EventTypeNormal, GroupsUpdating, fmt.Sprintf("Updating replicas %d to %d", *leaderSts.Spec.UpdateStrategy.RollingUpdate.Partition, partition))
×
165
        }
×
166

167
        // Create headless service if it does not exist.
168
        if err := r.reconcileHeadlessServices(ctx, lws); err != nil {
×
169
                log.Error(err, "Creating headless service.")
×
170
                r.Record.Eventf(lws, corev1.EventTypeWarning, FailedCreate,
×
171
                        fmt.Sprintf("Failed to create headless service for error: %v", err))
×
172
                return ctrl.Result{}, err
×
173
        }
×
174

175
        updateDone, err := r.updateStatus(ctx, lws, revisionutils.GetRevisionKey(revision))
×
176
        if err != nil {
×
177
                if apierrors.IsConflict(err) {
×
178
                        return ctrl.Result{Requeue: true}, nil
×
179
                }
×
180
                return ctrl.Result{}, err
×
181
        }
182

183
        if updateDone {
×
184
                if err := revisionutils.TruncateRevisions(ctx, r.Client, lws, revisionutils.GetRevisionKey(revision)); err != nil {
×
185
                        return ctrl.Result{}, err
×
186
                }
×
187
        }
188
        log.V(2).Info("Leader Reconcile completed.")
×
189
        return ctrl.Result{}, nil
×
190
}
191

192
func (r *LeaderWorkerSetReconciler) reconcileHeadlessServices(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) error {
×
193
        if lws.Spec.NetworkConfig == nil || *lws.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainShared {
×
194
                if err := controllerutils.CreateHeadlessServiceIfNotExists(ctx, r.Client, r.Scheme, lws, lws.Name, map[string]string{leaderworkerset.SetNameLabelKey: lws.Name}, lws); err != nil {
×
195
                        return err
×
196
                }
×
197
                return nil
×
198
        }
199
        return nil
×
200
}
201

202
// SetupWithManager sets up the controller with the Manager.
203
func (r *LeaderWorkerSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
×
204
        return ctrl.NewControllerManagedBy(mgr).
×
205
                For(&leaderworkerset.LeaderWorkerSet{}).
×
206
                Owns(&appsv1.StatefulSet{}).
×
207
                Owns(&corev1.Service{}).
×
208
                Watches(&appsv1.StatefulSet{},
×
209
                        handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request {
×
210
                                return []reconcile.Request{
×
211
                                        {NamespacedName: types.NamespacedName{
×
212
                                                Name:      a.GetLabels()[leaderworkerset.SetNameLabelKey],
×
213
                                                Namespace: a.GetNamespace(),
×
214
                                        }},
×
215
                                }
×
216
                        })).
×
217
                Complete(r)
218
}
219

220
func SetupIndexes(indexer client.FieldIndexer) error {
×
221
        return indexer.IndexField(context.Background(), &appsv1.StatefulSet{}, lwsOwnerKey, func(rawObj client.Object) []string {
×
222
                // grab the statefulSet object, extract the owner...
×
223
                statefulSet := rawObj.(*appsv1.StatefulSet)
×
224
                owner := metav1.GetControllerOf(statefulSet)
×
225
                if owner == nil {
×
226
                        return nil
×
227
                }
×
228
                // ...make sure it's a LeaderWorkerSet...
229
                if owner.APIVersion != apiGVStr || owner.Kind != "LeaderWorkerSet" {
×
230
                        return nil
×
231
                }
×
232
                // ...and if so, return it
233
                return []string{owner.Name}
×
234
        })
235
}
236

237
// Rolling update will always wait for the former replica to be ready then process the next one,
238
// we didn't consider rollout strategy type here since we only support rollingUpdate now,
239
// once we have more policies, we should update the logic here.
240
// Possible scenarios for Partition:
241
//   - When sts is under creation, partition is always 0 because pods are created in parallel, rolling update is not relevant here.
242
//   - When sts is in rolling update, the partition will start from the last index to the index 0 processing in maxUnavailable step.
243
//   - When sts is in rolling update, and Replicas increases, we'll delay the rolling update until the scaling up is done,
244
//     Partition will not change, new replicas are created using the new template from the get go.
245
//   - When sts is rolling update, and Replicas decreases, the partition will not change until new Replicas < Partition,
246
//     in which case Partition will be reset to the new Replicas value.
247
//   - When sts is ready for a rolling update and Replicas increases at the same time, we'll delay the rolling update until
248
//     the scaling up is done.
249
//   - When sts is ready for a rolling update and Replicas decreases at the same time, we'll start the rolling update
250
//     together with scaling down.
251
//
252
// At rest, Partition should always be zero.
253
//
254
// For Replicas:
255
//   - When rolling update, Replicas is equal to (spec.Replicas+maxSurge)
256
//   - Otherwise, Replicas is equal to spec.Replicas
257
//   - One exception here is when unready replicas of leaderWorkerSet is equal to MaxSurge,
258
//     we should reclaim the extra replicas gradually to accommodate for the new replicas.
259
func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, sts *appsv1.StatefulSet, revisionKey string, leaderWorkerSetUpdated bool) (stsPartition int32, replicas int32, err error) {
×
260
        log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws))
×
261
        ctx = ctrl.LoggerInto(ctx, log)
×
262
        lwsReplicas := *lws.Spec.Replicas
×
263

×
264
        defer func() {
×
265
                // Limit the replicas with less than lwsPartition will not be updated.
×
266
                stsPartition = max(stsPartition, *lws.Spec.RolloutStrategy.RollingUpdateConfiguration.Partition)
×
267
        }()
×
268

269
        // Case 1:
270
        // If sts not created yet, all partitions should be updated,
271
        // replicas should not change.
272
        if sts == nil {
×
273
                return 0, lwsReplicas, nil
×
274
        }
×
275

276
        stsReplicas := *sts.Spec.Replicas
×
277
        maxSurge, err := intstr.GetScaledValueFromIntOrPercent(&lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxSurge, int(lwsReplicas), true)
×
278
        if err != nil {
×
279
                return 0, 0, err
×
280
        }
×
281
        // No need to burst more than the replicas.
282
        if maxSurge > int(lwsReplicas) {
×
283
                maxSurge = int(lwsReplicas)
×
284
        }
×
285
        burstReplicas := lwsReplicas + int32(maxSurge)
×
286

×
287
        // wantReplicas calculates the final replicas if needed.
×
288
        wantReplicas := func(unreadyReplicas int32) int32 {
×
289
                if unreadyReplicas <= int32(maxSurge) {
×
290
                        // When we have n unready replicas and n bursted replicas, we should
×
291
                        // start to release the burst replica gradually for the accommodation of
×
292
                        // the unready ones.
×
293
                        finalReplicas := lwsReplicas + utils.NonZeroValue(int32(unreadyReplicas)-1)
×
294
                        r.Record.Eventf(lws, corev1.EventTypeNormal, GroupsProgressing, fmt.Sprintf("deleting surge replica %s-%d", lws.Name, finalReplicas))
×
295
                        return finalReplicas
×
296
                }
×
297
                return burstReplicas
×
298
        }
299

300
        // Case 2:
301
        // Indicates a new rolling update here.
302
        if leaderWorkerSetUpdated {
×
303
                // Processing scaling up/down first prior to rolling update.
×
304
                return min(lwsReplicas, stsReplicas), wantReplicas(lwsReplicas), nil
×
305
        }
×
306

307
        partition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition
×
308
        rollingUpdateCompleted := partition == 0 && stsReplicas == lwsReplicas
×
309
        // Case 3:
×
310
        // In normal cases, return the values directly.
×
311
        if rollingUpdateCompleted {
×
312
                return 0, lwsReplicas, nil
×
313
        }
×
314

315
        states, err := r.getReplicaStates(ctx, lws, stsReplicas, revisionKey)
×
316
        if err != nil {
×
317
                return 0, 0, err
×
318
        }
×
319
        lwsUnreadyReplicas := calculateLWSUnreadyReplicas(states, lwsReplicas)
×
320

×
321
        originalLwsReplicas, err := strconv.Atoi(sts.Annotations[leaderworkerset.ReplicasAnnotationKey])
×
322
        if err != nil {
×
323
                return 0, 0, err
×
324
        }
×
325
        replicasUpdated := originalLwsReplicas != int(*lws.Spec.Replicas)
×
326
        // Case 4:
×
327
        // Replicas changed during rolling update.
×
328
        if replicasUpdated {
×
329
                return min(partition, burstReplicas), wantReplicas(lwsUnreadyReplicas), nil
×
330
        }
×
331

332
        // Case 5:
333
        // Calculating the Partition during rolling update, no leaderWorkerSet updates happens.
334

335
        rollingStep, err := intstr.GetScaledValueFromIntOrPercent(&lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxUnavailable, int(lwsReplicas), false)
×
336
        if err != nil {
×
337
                return 0, 0, err
×
338
        }
×
339
        // Make sure that we always respect the maxUnavailable, or
340
        // we'll violate it when reclaiming bursted replicas.
341
        rollingStep += maxSurge - (int(burstReplicas) - int(stsReplicas))
×
342

×
343
        return rollingUpdatePartition(states, stsReplicas, int32(rollingStep), partition), wantReplicas(lwsUnreadyReplicas), nil
×
344
}
345

346
func (r *LeaderWorkerSetReconciler) SSAWithStatefulset(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32, revisionKey string) error {
×
347
        log := ctrl.LoggerFrom(ctx)
×
348

×
349
        // construct the statefulset apply configuration
×
350
        leaderStatefulSetApplyConfig, err := constructLeaderStatefulSetApplyConfiguration(lws, partition, replicas, revisionKey)
×
351
        if err != nil {
×
352
                log.Error(err, "Constructing StatefulSet apply configuration.")
×
353
                return err
×
354
        }
×
355
        if err := setControllerReferenceWithStatefulSet(lws, leaderStatefulSetApplyConfig, r.Scheme); err != nil {
×
356
                log.Error(err, "Setting controller reference.")
×
357
                return err
×
358
        }
×
359
        obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(leaderStatefulSetApplyConfig)
×
360
        if err != nil {
×
361
                log.Error(err, "Converting StatefulSet configuration to json.")
×
362
                return err
×
363
        }
×
364
        patch := &unstructured.Unstructured{
×
365
                Object: obj,
×
366
        }
×
367
        // Use server side apply and add fieldmanager to the lws owned fields
×
368
        // If there are conflicts in the fields owned by the lws controller, lws will obtain the ownership and force override
×
369
        // these fields to the ones desired by the lws controller
×
370
        // TODO b/316776287 add E2E test for SSA
×
NEW
371
        // TODO: Deprecated: Use client.Client.Apply() and client.Client.SubResource("subrsource").Apply() instead.
×
NEW
372
        err = r.Patch(ctx, patch, client.Apply, &client.PatchOptions{ //nolint
×
373
                FieldManager: fieldManager,
×
374
                Force:        ptr.To[bool](true),
×
375
        })
×
376
        if err != nil {
×
377
                log.Error(err, "Using server side apply to update leader statefulset")
×
378
                return err
×
379
        }
×
380

381
        return nil
×
382
}
383

384
// updates the condition of the leaderworkerset to either Progressing or Available.
385
func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, revisionKey string) (bool, bool, error) {
×
386
        log := ctrl.LoggerFrom(ctx)
×
387
        podSelector := client.MatchingLabels(map[string]string{
×
388
                leaderworkerset.SetNameLabelKey:     lws.Name,
×
389
                leaderworkerset.WorkerIndexLabelKey: "0",
×
390
        })
×
391
        leaderPodList := &corev1.PodList{}
×
392
        if err := r.List(ctx, leaderPodList, podSelector, client.InNamespace(lws.Namespace)); err != nil {
×
393
                log.Error(err, "Fetching leaderPods")
×
394
                return false, false, err
×
395
        }
×
396

397
        updateStatus := false
×
398
        readyCount, updatedCount, readyNonBurstWorkerCount := 0, 0, 0
×
399
        partitionedUpdatedNonBurstCount, partitionedCurrentNonBurstCount, partitionedUpdatedAndReadyCount := 0, 0, 0
×
400
        noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1
×
401
        lwsPartition := *lws.Spec.RolloutStrategy.RollingUpdateConfiguration.Partition
×
402

×
403
        // Iterate through all leaderPods.
×
404
        for _, pod := range leaderPodList.Items {
×
405
                index, err := strconv.Atoi(pod.Labels[leaderworkerset.GroupIndexLabelKey])
×
406
                if err != nil {
×
407
                        return false, false, err
×
408
                }
×
409

410
                var sts appsv1.StatefulSet
×
411
                if !noWorkerSts {
×
412
                        if err := r.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: pod.Name}, &sts); err != nil {
×
413
                                if client.IgnoreNotFound(err) != nil {
×
414
                                        log.Error(err, "Fetching worker statefulSet")
×
415
                                        return false, false, err
×
416
                                }
×
417
                                continue
×
418
                        }
419
                }
420

421
                if index < int(*lws.Spec.Replicas) && index >= int(lwsPartition) {
×
422
                        partitionedCurrentNonBurstCount++
×
423
                }
×
424

425
                var ready, updated bool
×
426
                if (noWorkerSts || statefulsetutils.StatefulsetReady(sts)) && podutils.PodRunningAndReady(pod) {
×
427
                        ready = true
×
428
                        readyCount++
×
429
                }
×
430
                if (noWorkerSts || revisionutils.GetRevisionKey(&sts) == revisionKey) && revisionutils.GetRevisionKey(&pod) == revisionKey {
×
431
                        updated = true
×
432
                        updatedCount++
×
433
                        if index < int(*lws.Spec.Replicas) && index >= int(lwsPartition) {
×
434
                                // Bursted replicas do not count when determining if rollingUpdate has been completed.
×
435
                                partitionedUpdatedNonBurstCount++
×
436
                        }
×
437
                }
438

439
                if index < int(*lws.Spec.Replicas) {
×
440
                        if ready {
×
441
                                readyNonBurstWorkerCount++
×
442
                        }
×
443
                        if index >= int(lwsPartition) && ready && updated {
×
444
                                partitionedUpdatedAndReadyCount++
×
445
                        }
×
446
                }
447
        }
448

449
        if lws.Status.ReadyReplicas != int32(readyCount) {
×
450
                lws.Status.ReadyReplicas = int32(readyCount)
×
451
                updateStatus = true
×
452
        }
×
453

454
        if lws.Status.UpdatedReplicas != int32(updatedCount) {
×
455
                lws.Status.UpdatedReplicas = int32(updatedCount)
×
456
                updateStatus = true
×
457
        }
×
458

459
        var conditions []metav1.Condition
×
460
        if partitionedUpdatedNonBurstCount < partitionedCurrentNonBurstCount {
×
461
                // upgradeInProgress is true when the upgrade replicas is smaller than the expected
×
462
                // number of total replicas not including the burst replicas
×
463
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetUpdateInProgress))
×
464
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetProgressing))
×
465
        } else if readyNonBurstWorkerCount == int(*lws.Spec.Replicas) && partitionedUpdatedAndReadyCount == partitionedCurrentNonBurstCount {
×
466
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetAvailable))
×
467
        } else {
×
468
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetProgressing))
×
469
        }
×
470

471
        // updateDone is true when all replicas are updated and ready
472
        updateDone := (lwsPartition == 0) && partitionedUpdatedAndReadyCount == int(*lws.Spec.Replicas)
×
473

×
474
        updateCondition := setConditions(lws, conditions)
×
475
        // if condition changed, record events
×
476
        if updateCondition {
×
477
                r.Record.Eventf(lws, corev1.EventTypeNormal, conditions[0].Reason, conditions[0].Message+fmt.Sprintf(", with %d groups ready of total %d groups", readyCount, int(*lws.Spec.Replicas)))
×
478
        }
×
479
        return updateStatus || updateCondition, updateDone, nil
×
480
}
481

482
// Updates status and condition of LeaderWorkerSet and returns whether or not an update actually occurred.
483
func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, revisionKey string) (bool, error) {
×
484
        updateStatus := false
×
485
        log := ctrl.LoggerFrom(ctx)
×
486

×
487
        // Retrieve the leader StatefulSet.
×
488
        sts := &appsv1.StatefulSet{}
×
489
        if err := r.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, sts); err != nil {
×
490
                log.Error(err, "Error retrieving leader StatefulSet")
×
491
                return false, err
×
492
        }
×
493

494
        // retrieve the current number of replicas -- the number of leaders
495
        replicas := int(sts.Status.Replicas)
×
496
        if lws.Status.Replicas != int32(replicas) {
×
497
                lws.Status.Replicas = int32(replicas)
×
498
                updateStatus = true
×
499
        }
×
500

501
        if lws.Status.HPAPodSelector == "" {
×
502
                labelSelector := &metav1.LabelSelector{
×
503
                        MatchLabels: map[string]string{
×
504
                                leaderworkerset.SetNameLabelKey:     lws.Name,
×
505
                                leaderworkerset.WorkerIndexLabelKey: "0", // select leaders
×
506
                        },
×
507
                }
×
508
                selector, err := metav1.LabelSelectorAsSelector(labelSelector)
×
509
                if err != nil {
×
510
                        log.Error(err, "Converting label selector to selector")
×
511
                        return false, err
×
512
                }
×
513

514
                lws.Status.HPAPodSelector = selector.String()
×
515
                updateStatus = true
×
516
        }
517

518
        // check if an update is needed
519
        updateConditions, updateDone, err := r.updateConditions(ctx, lws, revisionKey)
×
520
        if err != nil {
×
521
                return false, err
×
522
        }
×
523

524
        if updateStatus || updateConditions {
×
525
                if err := r.Status().Update(ctx, lws); err != nil {
×
526
                        if !apierrors.IsConflict(err) {
×
527
                                log.Error(err, "Updating LeaderWorkerSet status and/or condition.")
×
528
                        }
×
529
                        return false, err
×
530
                }
531
        }
532
        return updateDone, nil
×
533
}
534

535
type replicaState struct {
536
        // ready indicates whether both the leader pod and its worker statefulset (if any) are ready.
537
        ready bool
538
        // updated indicates whether both the leader pod and its worker statefulset (if any) are updated to the latest revision.
539
        updated bool
540
}
541

542
func (r *LeaderWorkerSetReconciler) getReplicaStates(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, stsReplicas int32, revisionKey string) ([]replicaState, error) {
×
543
        states := make([]replicaState, stsReplicas)
×
544

×
545
        podSelector := client.MatchingLabels(map[string]string{
×
546
                leaderworkerset.SetNameLabelKey:     lws.Name,
×
547
                leaderworkerset.WorkerIndexLabelKey: "0",
×
548
        })
×
549
        var leaderPodList corev1.PodList
×
550
        if err := r.List(ctx, &leaderPodList, podSelector, client.InNamespace(lws.Namespace)); err != nil {
×
551
                return nil, err
×
552
        }
×
553

554
        // Get a sorted leader pod list matches with the following sorted statefulsets one by one, which means
555
        // the leader pod and the corresponding worker statefulset has the same index.
556
        sortedPods := utils.SortByIndex(func(pod corev1.Pod) (int, error) {
×
557
                return strconv.Atoi(pod.Labels[leaderworkerset.GroupIndexLabelKey])
×
558
        }, leaderPodList.Items, int(stsReplicas))
×
559

560
        stsSelector := client.MatchingLabels(map[string]string{
×
561
                leaderworkerset.SetNameLabelKey: lws.Name,
×
562
        })
×
563
        var stsList appsv1.StatefulSetList
×
564
        if err := r.List(ctx, &stsList, stsSelector, client.InNamespace(lws.Namespace)); err != nil {
×
565
                return nil, err
×
566
        }
×
567
        sortedSts := utils.SortByIndex(func(sts appsv1.StatefulSet) (int, error) {
×
568
                return strconv.Atoi(sts.Labels[leaderworkerset.GroupIndexLabelKey])
×
569
        }, stsList.Items, int(stsReplicas))
×
570

571
        // Once size==1, no worker statefulSets will be created.
572
        noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1
×
573

×
574
        for idx := int32(0); idx < stsReplicas; idx++ {
×
575
                nominatedName := fmt.Sprintf("%s-%d", lws.Name, idx)
×
576
                // It can happen that the leader pod or the worker statefulset hasn't created yet
×
577
                // or under rebuilding, which also indicates not ready.
×
578
                if nominatedName != sortedPods[idx].Name || (!noWorkerSts && nominatedName != sortedSts[idx].Name) {
×
579
                        states[idx] = replicaState{
×
580
                                ready:   false,
×
581
                                updated: false,
×
582
                        }
×
583
                        continue
×
584
                }
585

586
                leaderUpdated := revisionutils.GetRevisionKey(&sortedPods[idx]) == revisionKey
×
587
                leaderReady := podutils.PodRunningAndReady(sortedPods[idx])
×
588

×
589
                if noWorkerSts {
×
590
                        states[idx] = replicaState{
×
591
                                ready:   leaderReady,
×
592
                                updated: leaderUpdated,
×
593
                        }
×
594
                        continue
×
595
                }
596

597
                workersUpdated := revisionutils.GetRevisionKey(&sortedSts[idx]) == revisionKey
×
598
                workersReady := statefulsetutils.StatefulsetReady(sortedSts[idx])
×
599

×
600
                states[idx] = replicaState{
×
601
                        ready:   leaderReady && workersReady,
×
602
                        updated: leaderUpdated && workersUpdated,
×
603
                }
×
604
        }
605

606
        return states, nil
×
607
}
608

609
func rollingUpdatePartition(states []replicaState, stsReplicas int32, rollingStep int32, currentPartition int32) int32 {
×
610
        continuousReadyReplicas := calculateContinuousReadyReplicas(states)
×
611

×
612
        // Update up to rollingStep replicas at once.
×
613
        var rollingStepPartition = utils.NonZeroValue(stsReplicas - continuousReadyReplicas - rollingStep)
×
614

×
615
        // rollingStepPartition calculation above disregards the state of replicas with idx<rollingStepPartition.
×
616
        // To prevent violating the maxUnavailable, we have to account for these replicas and increase the partition if some are not ready.
×
617
        var unavailable int32
×
618
        for idx := 0; idx < int(rollingStepPartition); idx++ {
×
619
                if !states[idx].ready {
×
620
                        unavailable++
×
621
                }
×
622
        }
623
        var partition = rollingStepPartition + unavailable
×
624

×
625
        // Reduce the partition if replicas are continuously not ready. It is safe since updating these replicas does not impact
×
626
        // the availability of the LWS. This is important to prevent update from getting stuck in case maxUnavailable is already violated
×
627
        // (for example, all replicas are not ready when rolling update is started).
×
628
        // Note that we never drop the partition below rolliingStepPartition.
×
629
        for idx := min(partition, stsReplicas-1); idx >= rollingStepPartition; idx-- {
×
630
                if !states[idx].ready || states[idx].updated {
×
631
                        partition = idx
×
632
                } else {
×
633
                        break
×
634
                }
635
        }
636

637
        // That means Partition moves in one direction to make it simple.
638
        return min(partition, currentPartition)
×
639
}
640

641
func calculateLWSUnreadyReplicas(states []replicaState, lwsReplicas int32) int32 {
×
642
        var unreadyCount int32
×
643
        for idx := int32(0); idx < lwsReplicas; idx++ {
×
644
                if idx >= int32(len(states)) || !states[idx].ready || !states[idx].updated {
×
645
                        unreadyCount++
×
646
                }
×
647
        }
648
        return unreadyCount
×
649
}
650

651
func calculateContinuousReadyReplicas(states []replicaState) int32 {
×
652
        // Count ready replicas at tail (from last index down)
×
653
        var continuousReadyCount int32
×
654
        for idx := len(states) - 1; idx >= 0; idx-- {
×
655
                if !states[idx].ready || !states[idx].updated {
×
656
                        break
×
657
                }
658
                continuousReadyCount++
×
659
        }
660
        return continuousReadyCount
×
661
}
662

663
func (r *LeaderWorkerSetReconciler) getLeaderStatefulSet(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) (*appsv1.StatefulSet, error) {
×
664
        sts := &appsv1.StatefulSet{}
×
665
        err := r.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, sts)
×
666
        if err != nil {
×
667
                if apierrors.IsNotFound(err) {
×
668
                        return nil, nil
×
669
                }
×
670
                return nil, err
×
671
        }
672
        return sts, nil
×
673
}
674

675
func (r *LeaderWorkerSetReconciler) getOrCreateRevisionIfNonExist(ctx context.Context, sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet, recorder record.EventRecorder) (*appsv1.ControllerRevision, error) {
×
676
        revisionKey := ""
×
677
        if sts != nil {
×
678
                // Uses the hash in the leader sts to avoid detecting update in the case where LWS controller is upgraded from a version where
×
679
                // the revisionKey was used to detect update instead of controller revision.
×
680
                revisionKey = revisionutils.GetRevisionKey(sts)
×
681
        }
×
682
        if stsRevision, err := revisionutils.GetRevision(ctx, r.Client, lws, revisionKey); stsRevision != nil || err != nil {
×
683
                return stsRevision, err
×
684
        }
×
685
        revision, err := revisionutils.NewRevision(ctx, r.Client, lws, revisionKey)
×
686
        if err != nil {
×
687
                return nil, err
×
688
        }
×
689
        newRevision, err := revisionutils.CreateRevision(ctx, r.Client, revision)
×
690
        if err == nil {
×
691
                message := fmt.Sprintf("Creating revision with key %s for a newly created LeaderWorkerSet", revision.Labels[leaderworkerset.RevisionKey])
×
692
                if revisionKey != "" {
×
693
                        message = fmt.Sprintf("Creating missing revision with key %s for existing LeaderWorkerSet", revision.Labels[leaderworkerset.RevisionKey])
×
694
                }
×
695
                recorder.Eventf(lws, corev1.EventTypeNormal, CreatingRevision, message)
×
696
        }
697
        return newRevision, err
×
698
}
699

700
func (r *LeaderWorkerSetReconciler) getUpdatedRevision(ctx context.Context, sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet, revision *appsv1.ControllerRevision) (*appsv1.ControllerRevision, error) {
×
701
        if sts == nil {
×
702
                return nil, nil
×
703
        }
×
704

705
        currentRevision, err := revisionutils.NewRevision(ctx, r.Client, lws, "")
×
706
        if err != nil {
×
707
                return nil, err
×
708
        }
×
709

710
        if !revisionutils.EqualRevision(currentRevision, revision) {
×
711
                return currentRevision, nil
×
712
        }
×
713

714
        return nil, nil
×
715
}
716

717
// constructLeaderStatefulSetApplyConfiguration constructs the applied configuration for the leader StatefulSet
718
func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32, revisionKey string) (*appsapplyv1.StatefulSetApplyConfiguration, error) {
6✔
719
        var podTemplateSpec corev1.PodTemplateSpec
6✔
720
        if lws.Spec.LeaderWorkerTemplate.LeaderTemplate != nil {
8✔
721
                podTemplateSpec = *lws.Spec.LeaderWorkerTemplate.LeaderTemplate.DeepCopy()
2✔
722
        } else {
6✔
723
                podTemplateSpec = *lws.Spec.LeaderWorkerTemplate.WorkerTemplate.DeepCopy()
4✔
724
        }
4✔
725
        // construct pod template spec configuration
726
        obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&podTemplateSpec)
6✔
727
        if err != nil {
6✔
728
                return nil, err
×
729
        }
×
730
        var podTemplateApplyConfiguration coreapplyv1.PodTemplateSpecApplyConfiguration
6✔
731
        err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj, &podTemplateApplyConfiguration)
6✔
732
        if err != nil {
6✔
733
                return nil, err
×
734
        }
×
735

736
        podTemplateApplyConfiguration.WithLabels(map[string]string{
6✔
737
                leaderworkerset.WorkerIndexLabelKey: "0",
6✔
738
                leaderworkerset.SetNameLabelKey:     lws.Name,
6✔
739
                leaderworkerset.RevisionKey:         revisionKey,
6✔
740
        })
6✔
741
        podAnnotations := make(map[string]string)
6✔
742
        podAnnotations[leaderworkerset.SizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.Size))
6✔
743
        if lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey] != "" {
8✔
744
                podAnnotations[leaderworkerset.ExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey]
2✔
745
        }
2✔
746
        if lws.Spec.LeaderWorkerTemplate.SubGroupPolicy != nil {
7✔
747
                podAnnotations[leaderworkerset.SubGroupPolicyTypeAnnotationKey] = (string(*lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.Type))
1✔
748
                podAnnotations[leaderworkerset.SubGroupSizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.SubGroupSize))
1✔
749
                if lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] != "" {
2✔
750
                        podAnnotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey]
1✔
751
                }
1✔
752
        }
753

754
        if lws.Spec.NetworkConfig != nil && *lws.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainUniquePerReplica {
6✔
755
                podAnnotations[leaderworkerset.SubdomainPolicyAnnotationKey] = string(leaderworkerset.SubdomainUniquePerReplica)
×
756
        }
×
757

758
        podTemplateApplyConfiguration.WithAnnotations(podAnnotations)
6✔
759

6✔
760
        // construct statefulset apply configuration
6✔
761
        statefulSetConfig := appsapplyv1.StatefulSet(lws.Name, lws.Namespace).
6✔
762
                WithSpec(appsapplyv1.StatefulSetSpec().
6✔
763
                        WithServiceName(lws.Name).
6✔
764
                        WithReplicas(replicas).
6✔
765
                        WithPodManagementPolicy(appsv1.ParallelPodManagement).
6✔
766
                        WithTemplate(&podTemplateApplyConfiguration).
6✔
767
                        WithUpdateStrategy(appsapplyv1.StatefulSetUpdateStrategy().WithType(appsv1.StatefulSetUpdateStrategyType(lws.Spec.RolloutStrategy.Type)).WithRollingUpdate(
6✔
768
                                appsapplyv1.RollingUpdateStatefulSetStrategy().WithMaxUnavailable(lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxUnavailable).WithPartition(partition),
6✔
769
                        )).
6✔
770
                        WithSelector(metaapplyv1.LabelSelector().
6✔
771
                                WithMatchLabels(map[string]string{
6✔
772
                                        leaderworkerset.SetNameLabelKey:     lws.Name,
6✔
773
                                        leaderworkerset.WorkerIndexLabelKey: "0",
6✔
774
                                }))).
6✔
775
                WithLabels(map[string]string{
6✔
776
                        leaderworkerset.SetNameLabelKey: lws.Name,
6✔
777
                        leaderworkerset.RevisionKey:     revisionKey,
6✔
778
                }).
6✔
779
                WithAnnotations(map[string]string{
6✔
780
                        leaderworkerset.ReplicasAnnotationKey: strconv.Itoa(int(*lws.Spec.Replicas)),
6✔
781
                })
6✔
782

6✔
783
        pvcApplyConfiguration := controllerutils.GetPVCApplyConfiguration(lws)
6✔
784
        if len(pvcApplyConfiguration) > 0 {
7✔
785
                statefulSetConfig.Spec.WithVolumeClaimTemplates(pvcApplyConfiguration...)
1✔
786
        }
1✔
787

788
        if lws.Spec.LeaderWorkerTemplate.PersistentVolumeClaimRetentionPolicy != nil {
7✔
789
                pvcRetentionPolicy := &appsapplyv1.StatefulSetPersistentVolumeClaimRetentionPolicyApplyConfiguration{
1✔
790
                        WhenDeleted: &lws.Spec.LeaderWorkerTemplate.PersistentVolumeClaimRetentionPolicy.WhenDeleted,
1✔
791
                        WhenScaled:  &lws.Spec.LeaderWorkerTemplate.PersistentVolumeClaimRetentionPolicy.WhenScaled,
1✔
792
                }
1✔
793
                statefulSetConfig.Spec.WithPersistentVolumeClaimRetentionPolicy(pvcRetentionPolicy)
1✔
794
        }
1✔
795
        return statefulSetConfig, nil
6✔
796
}
797

798
func makeCondition(conditionType leaderworkerset.LeaderWorkerSetConditionType) metav1.Condition {
×
799
        var condtype, reason, message string
×
800
        switch conditionType {
×
801
        case leaderworkerset.LeaderWorkerSetAvailable:
×
802
                condtype = string(leaderworkerset.LeaderWorkerSetAvailable)
×
803
                reason = "AllGroupsReady"
×
804
                message = "All replicas are ready"
×
805
        case leaderworkerset.LeaderWorkerSetUpdateInProgress:
×
806
                condtype = string(leaderworkerset.LeaderWorkerSetUpdateInProgress)
×
807
                reason = GroupsUpdating
×
808
                message = "Rolling Upgrade is in progress"
×
809
        default:
×
810
                condtype = string(leaderworkerset.LeaderWorkerSetProgressing)
×
811
                reason = GroupsProgressing
×
812
                message = "Replicas are progressing"
×
813
        }
814

815
        condition := metav1.Condition{
×
816
                Type:               condtype,
×
817
                Status:             metav1.ConditionStatus(corev1.ConditionTrue),
×
818
                LastTransitionTime: metav1.Now(),
×
819
                Reason:             reason,
×
820
                Message:            message,
×
821
        }
×
822
        return condition
×
823
}
824

825
func setConditions(lws *leaderworkerset.LeaderWorkerSet, conditions []metav1.Condition) bool {
×
826
        shouldUpdate := false
×
827
        for _, condition := range conditions {
×
828
                shouldUpdate = shouldUpdate || setCondition(lws, condition)
×
829
        }
×
830

831
        return shouldUpdate
×
832
}
833

834
func setCondition(lws *leaderworkerset.LeaderWorkerSet, newCondition metav1.Condition) bool {
6✔
835
        newCondition.LastTransitionTime = metav1.Now()
6✔
836
        found := false
6✔
837
        shouldUpdate := false
6✔
838

6✔
839
        // Precondition: newCondition has status true.
6✔
840
        for i, curCondition := range lws.Status.Conditions {
11✔
841
                if newCondition.Type == curCondition.Type {
7✔
842
                        if newCondition.Status != curCondition.Status {
3✔
843
                                // the conditions match but one is true and one is false. Update the stored condition
1✔
844
                                // with the new condition.
1✔
845
                                lws.Status.Conditions[i] = newCondition
1✔
846
                                shouldUpdate = true
1✔
847
                        }
1✔
848
                        // if both are true or both are false, do nothing.
849
                        found = true
2✔
850
                } else {
3✔
851
                        // if the conditions are not of the same type, do nothing unless one is Progressing and one is
3✔
852
                        // Available and both are true. Must be mutually exclusive.
3✔
853
                        if exclusiveConditionTypes(curCondition, newCondition) &&
3✔
854
                                (newCondition.Status == metav1.ConditionTrue) && (curCondition.Status == metav1.ConditionTrue) {
4✔
855
                                // Progressing is true and Available is true. Prevent this.
1✔
856
                                lws.Status.Conditions[i].Status = metav1.ConditionFalse
1✔
857
                                shouldUpdate = true
1✔
858
                        }
1✔
859
                }
860
        }
861
        // condition doesn't exist, update only if the status is true
862
        if newCondition.Status == metav1.ConditionTrue && !found {
9✔
863
                lws.Status.Conditions = append(lws.Status.Conditions, newCondition)
3✔
864
                shouldUpdate = true
3✔
865
        }
3✔
866
        return shouldUpdate
6✔
867
}
868

869
func exclusiveConditionTypes(condition1 metav1.Condition, condition2 metav1.Condition) bool {
10✔
870
        if (condition1.Type == string(leaderworkerset.LeaderWorkerSetAvailable) && condition2.Type == string(leaderworkerset.LeaderWorkerSetProgressing)) ||
10✔
871
                (condition1.Type == string(leaderworkerset.LeaderWorkerSetProgressing) && condition2.Type == string(leaderworkerset.LeaderWorkerSetAvailable)) {
15✔
872
                return true
5✔
873
        }
5✔
874

875
        if (condition1.Type == string(leaderworkerset.LeaderWorkerSetAvailable) && condition2.Type == string(leaderworkerset.LeaderWorkerSetUpdateInProgress)) ||
5✔
876
                (condition1.Type == string(leaderworkerset.LeaderWorkerSetUpdateInProgress) && condition2.Type == string(leaderworkerset.LeaderWorkerSetAvailable)) {
7✔
877
                return true
2✔
878
        }
2✔
879

880
        return false
3✔
881
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc