• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / lws / 21697534108

05 Feb 2026 03:24AM UTC coverage: 36.111% (-0.2%) from 36.264%
21697534108

Pull #752

github

yankay
fix Failing test in 1.35

Signed-off-by: Kay Yan <kay.yan@daocloud.io>
Pull Request #752: [WIP]Fix Failing Tests in kube 1.35

0 of 32 new or added lines in 4 files covered. (0.0%)

1 existing line in 1 file now uncovered.

858 of 2376 relevant lines covered (36.11%)

2.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.21
/pkg/controllers/leaderworkerset_controller.go
1
/*
2
Copyright 2023.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controllers
18

19
import (
20
        "context"
21
        "fmt"
22
        "strconv"
23
        "time"
24

25
        appsv1 "k8s.io/api/apps/v1"
26
        corev1 "k8s.io/api/core/v1"
27
        apierrors "k8s.io/apimachinery/pkg/api/errors"
28
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
        "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        "k8s.io/apimachinery/pkg/util/intstr"
33
        appsapplyv1 "k8s.io/client-go/applyconfigurations/apps/v1"
34
        coreapplyv1 "k8s.io/client-go/applyconfigurations/core/v1"
35
        metaapplyv1 "k8s.io/client-go/applyconfigurations/meta/v1"
36
        "k8s.io/client-go/tools/record"
37
        "k8s.io/klog/v2"
38
        "k8s.io/utils/ptr"
39
        ctrl "sigs.k8s.io/controller-runtime"
40
        "sigs.k8s.io/controller-runtime/pkg/client"
41
        "sigs.k8s.io/controller-runtime/pkg/handler"
42
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
43

44
        leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1"
45
        "sigs.k8s.io/lws/pkg/utils"
46
        controllerutils "sigs.k8s.io/lws/pkg/utils/controller"
47
        podutils "sigs.k8s.io/lws/pkg/utils/pod"
48
        revisionutils "sigs.k8s.io/lws/pkg/utils/revision"
49
        statefulsetutils "sigs.k8s.io/lws/pkg/utils/statefulset"
50
)
51

52
// LeaderWorkerSetReconciler reconciles a LeaderWorkerSet object
53
type LeaderWorkerSetReconciler struct {
54
        client.Client
55
        Scheme *runtime.Scheme
56
        Record record.EventRecorder
57
}
58

59
var (
60
        apiGVStr = leaderworkerset.GroupVersion.String()
61
)
62

63
const (
64
        lwsOwnerKey  = ".metadata.controller"
65
        fieldManager = "lws"
66
)
67

68
const (
69
        // FailedCreate Event reason used when a resource creation fails.
70
        // The event uses the error(s) as the reason.
71
        FailedCreate      = "FailedCreate"
72
        GroupsProgressing = "GroupsProgressing"
73
        GroupsUpdating    = "GroupsUpdating"
74
        CreatingRevision  = "CreatingRevision"
75
)
76

77
func NewLeaderWorkerSetReconciler(client client.Client, scheme *runtime.Scheme, record record.EventRecorder) *LeaderWorkerSetReconciler {
×
78
        return &LeaderWorkerSetReconciler{
×
79
                Client: client,
×
80
                Scheme: scheme,
×
81
                Record: record,
×
82
        }
×
83
}
×
84

85
//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch
86
//+kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets,verbs=get;list;watch;create;update;patch;delete
87
//+kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets/status,verbs=get;update;patch
88
//+kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets/finalizers,verbs=update
89
//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
90
//+kubebuilder:rbac:groups=apps,resources=statefulsets/status,verbs=get;update;patch
91
//+kubebuilder:rbac:groups=apps,resources=statefulsets/finalizers,verbs=update
92
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
93
//+kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;patch
94
//+kubebuilder:rbac:groups=apps,resources=controllerrevisions,verbs=get;list;watch;create;update;patch;delete
95
//+kubebuilder:rbac:groups=apps,resources=controllerrevisions/status,verbs=get;update;patch
96
//+kubebuilder:rbac:groups=apps,resources=controllerrevisions/finalizers,verbs=update
97

98
func (r *LeaderWorkerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
×
99
        // Get leaderworkerset object
×
100
        lws := &leaderworkerset.LeaderWorkerSet{}
×
101
        if err := r.Get(ctx, types.NamespacedName{Name: req.Name, Namespace: req.Namespace}, lws); err != nil {
×
102
                return ctrl.Result{}, client.IgnoreNotFound(err)
×
103
        }
×
104

105
        if lws.DeletionTimestamp != nil {
×
106
                return ctrl.Result{}, nil
×
107
        }
×
108

109
        log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws))
×
110
        ctx = ctrl.LoggerInto(ctx, log)
×
111

×
112
        leaderSts, err := r.getLeaderStatefulSet(ctx, lws)
×
113
        if err != nil {
×
114
                log.Error(err, "Fetching leader statefulset")
×
115
                return ctrl.Result{}, err
×
116
        }
×
117

118
        if leaderSts != nil && leaderSts.DeletionTimestamp != nil {
×
119
                return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
×
120
        }
×
121

122
        // Handles two cases:
123
        // Case 1: Upgrading the LWS controller from a version that doesn't support controller revision
124
        // Case 2: Creating the controller revision for a newly created LWS object
125
        revision, err := r.getOrCreateRevisionIfNonExist(ctx, leaderSts, lws, r.Record)
×
126
        if err != nil {
×
127
                log.Error(err, "Creating controller revision")
×
128
                return ctrl.Result{}, err
×
129
        }
×
130

131
        updatedRevision, err := r.getUpdatedRevision(ctx, leaderSts, lws, revision)
×
132
        if err != nil {
×
133
                log.Error(err, "Validating if LWS has been updated")
×
134
                return ctrl.Result{}, err
×
135
        }
×
136
        lwsUpdated := updatedRevision != nil
×
137
        if lwsUpdated {
×
138
                revision, err = revisionutils.CreateRevision(ctx, r.Client, updatedRevision)
×
139
                if err != nil {
×
140
                        log.Error(err, "Creating revision for updated LWS")
×
141
                        return ctrl.Result{}, err
×
142
                }
×
143
                r.Record.Eventf(lws, corev1.EventTypeNormal, CreatingRevision, fmt.Sprintf("Creating revision with key %s for updated LWS", revisionutils.GetRevisionKey(revision)))
×
144
        }
145

146
        partition, replicas, err := r.rollingUpdateParameters(ctx, lws, leaderSts, revisionutils.GetRevisionKey(revision), lwsUpdated)
×
147
        if err != nil {
×
148
                log.Error(err, "Rolling partition error")
×
149
                return ctrl.Result{}, err
×
150
        }
×
151

152
        if err := r.SSAWithStatefulset(ctx, lws, partition, replicas, revisionutils.GetRevisionKey(revision)); err != nil {
×
153
                if leaderSts == nil {
×
154
                        r.Record.Eventf(lws, corev1.EventTypeWarning, FailedCreate, fmt.Sprintf("Failed to create leader statefulset %s", lws.Name))
×
155
                }
×
156
                return ctrl.Result{}, err
×
157
        }
158

159
        if leaderSts == nil {
×
160
                // An event is logged to track sts creation.
×
161
                r.Record.Eventf(lws, corev1.EventTypeNormal, GroupsProgressing, fmt.Sprintf("Created leader statefulset %s", lws.Name))
×
162
        } else if !lwsUpdated && partition != *leaderSts.Spec.UpdateStrategy.RollingUpdate.Partition {
×
163
                // An event is logged to track update progress.
×
164
                r.Record.Eventf(lws, corev1.EventTypeNormal, GroupsUpdating, fmt.Sprintf("Updating replicas %d to %d", *leaderSts.Spec.UpdateStrategy.RollingUpdate.Partition, partition))
×
165
        }
×
166

167
        // Create headless service if it does not exist.
168
        if err := r.reconcileHeadlessServices(ctx, lws); err != nil {
×
169
                log.Error(err, "Creating headless service.")
×
170
                r.Record.Eventf(lws, corev1.EventTypeWarning, FailedCreate,
×
171
                        fmt.Sprintf("Failed to create headless service for error: %v", err))
×
172
                return ctrl.Result{}, err
×
173
        }
×
174

175
        updateDone, err := r.updateStatus(ctx, lws, revisionutils.GetRevisionKey(revision))
×
176
        if err != nil {
×
177
                if apierrors.IsConflict(err) {
×
178
                        return ctrl.Result{Requeue: true}, nil
×
179
                }
×
180
                return ctrl.Result{}, err
×
181
        }
182

183
        if updateDone {
×
184
                if err := revisionutils.TruncateRevisions(ctx, r.Client, lws, revisionutils.GetRevisionKey(revision)); err != nil {
×
185
                        return ctrl.Result{}, err
×
186
                }
×
187
        }
188
        log.V(2).Info("Leader Reconcile completed.")
×
189
        return ctrl.Result{}, nil
×
190
}
191

192
func (r *LeaderWorkerSetReconciler) reconcileHeadlessServices(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) error {
×
193
        if lws.Spec.NetworkConfig == nil || *lws.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainShared {
×
194
                if err := controllerutils.CreateHeadlessServiceIfNotExists(ctx, r.Client, r.Scheme, lws, lws.Name, map[string]string{leaderworkerset.SetNameLabelKey: lws.Name}, lws); err != nil {
×
195
                        return err
×
196
                }
×
197
                return nil
×
198
        }
199
        return nil
×
200
}
201

202
// SetupWithManager sets up the controller with the Manager.
203
func (r *LeaderWorkerSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
×
204
        return ctrl.NewControllerManagedBy(mgr).
×
205
                For(&leaderworkerset.LeaderWorkerSet{}).
×
206
                Owns(&appsv1.StatefulSet{}).
×
207
                Owns(&corev1.Service{}).
×
208
                Watches(&appsv1.StatefulSet{},
×
209
                        handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request {
×
210
                                return []reconcile.Request{
×
211
                                        {NamespacedName: types.NamespacedName{
×
212
                                                Name:      a.GetLabels()[leaderworkerset.SetNameLabelKey],
×
213
                                                Namespace: a.GetNamespace(),
×
214
                                        }},
×
215
                                }
×
216
                        })).
×
217
                Complete(r)
218
}
219

220
func SetupIndexes(indexer client.FieldIndexer) error {
×
221
        return indexer.IndexField(context.Background(), &appsv1.StatefulSet{}, lwsOwnerKey, func(rawObj client.Object) []string {
×
222
                // grab the statefulSet object, extract the owner...
×
223
                statefulSet := rawObj.(*appsv1.StatefulSet)
×
224
                owner := metav1.GetControllerOf(statefulSet)
×
225
                if owner == nil {
×
226
                        return nil
×
227
                }
×
228
                // ...make sure it's a LeaderWorkerSet...
229
                if owner.APIVersion != apiGVStr || owner.Kind != "LeaderWorkerSet" {
×
230
                        return nil
×
231
                }
×
232
                // ...and if so, return it
233
                return []string{owner.Name}
×
234
        })
235
}
236

237
// Rolling update will always wait for the former replica to be ready then process the next one,
238
// we didn't consider rollout strategy type here since we only support rollingUpdate now,
239
// once we have more policies, we should update the logic here.
240
// Possible scenarios for Partition:
241
//   - When sts is under creation, partition is always 0 because pods are created in parallel, rolling update is not relevant here.
242
//   - When sts is in rolling update, the partition will start from the last index to the index 0 processing in maxUnavailable step.
243
//   - When sts is in rolling update, and Replicas increases, we'll delay the rolling update until the scaling up is done,
244
//     Partition will not change, new replicas are created using the new template from the get go.
245
//   - When sts is rolling update, and Replicas decreases, the partition will not change until new Replicas < Partition,
246
//     in which case Partition will be reset to the new Replicas value.
247
//   - When sts is ready for a rolling update and Replicas increases at the same time, we'll delay the rolling update until
248
//     the scaling up is done.
249
//   - When sts is ready for a rolling update and Replicas decreases at the same time, we'll start the rolling update
250
//     together with scaling down.
251
//
252
// At rest, Partition should always be zero.
253
//
254
// For Replicas:
255
//   - When rolling update, Replicas is equal to (spec.Replicas+maxSurge)
256
//   - Otherwise, Replicas is equal to spec.Replicas
257
//   - One exception here is when unready replicas of leaderWorkerSet is equal to MaxSurge,
258
//     we should reclaim the extra replicas gradually to accommodate for the new replicas.
259
func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, sts *appsv1.StatefulSet, revisionKey string, leaderWorkerSetUpdated bool) (stsPartition int32, replicas int32, err error) {
×
260
        log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws))
×
261
        ctx = ctrl.LoggerInto(ctx, log)
×
262
        lwsReplicas := *lws.Spec.Replicas
×
263

×
264
        defer func() {
×
265
                // Limit the replicas with less than lwsPartition will not be updated.
×
266
                stsPartition = max(stsPartition, *lws.Spec.RolloutStrategy.RollingUpdateConfiguration.Partition)
×
267
        }()
×
268

269
        // Case 1:
270
        // If sts not created yet, all partitions should be updated,
271
        // replicas should not change.
272
        if sts == nil {
×
273
                return 0, lwsReplicas, nil
×
274
        }
×
275

276
        stsReplicas := *sts.Spec.Replicas
×
277
        maxSurge, err := intstr.GetScaledValueFromIntOrPercent(&lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxSurge, int(lwsReplicas), true)
×
278
        if err != nil {
×
279
                return 0, 0, err
×
280
        }
×
281
        // No need to burst more than the replicas.
282
        if maxSurge > int(lwsReplicas) {
×
283
                maxSurge = int(lwsReplicas)
×
284
        }
×
285
        burstReplicas := lwsReplicas + int32(maxSurge)
×
286

×
287
        // wantReplicas calculates the final replicas if needed.
×
288
        wantReplicas := func(unreadyReplicas int32) int32 {
×
289
                if unreadyReplicas <= int32(maxSurge) {
×
290
                        // When we have n unready replicas and n bursted replicas, we should
×
291
                        // start to release the burst replica gradually for the accommodation of
×
292
                        // the unready ones.
×
293
                        finalReplicas := lwsReplicas + utils.NonZeroValue(int32(unreadyReplicas)-1)
×
294
                        r.Record.Eventf(lws, corev1.EventTypeNormal, GroupsProgressing, fmt.Sprintf("deleting surge replica %s-%d", lws.Name, finalReplicas))
×
295
                        return finalReplicas
×
296
                }
×
297
                return burstReplicas
×
298
        }
299

300
        // Case 2:
301
        // Indicates a new rolling update here.
302
        if leaderWorkerSetUpdated {
×
303
                // Processing scaling up/down first prior to rolling update.
×
304
                return min(lwsReplicas, stsReplicas), wantReplicas(lwsReplicas), nil
×
305
        }
×
306

307
        partition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition
×
308
        rollingUpdateCompleted := partition == 0 && stsReplicas == lwsReplicas
×
309
        // Case 3:
×
310
        // In normal cases, return the values directly.
×
311
        if rollingUpdateCompleted {
×
312
                return 0, lwsReplicas, nil
×
313
        }
×
314

315
        states, err := r.getReplicaStates(ctx, lws, stsReplicas, revisionKey)
×
316
        if err != nil {
×
317
                return 0, 0, err
×
318
        }
×
319
        lwsUnreadyReplicas := calculateLWSUnreadyReplicas(states, lwsReplicas)
×
320

×
321
        originalLwsReplicas, err := strconv.Atoi(sts.Annotations[leaderworkerset.ReplicasAnnotationKey])
×
322
        if err != nil {
×
323
                return 0, 0, err
×
324
        }
×
325
        replicasUpdated := originalLwsReplicas != int(*lws.Spec.Replicas)
×
326
        // Case 4:
×
327
        // Replicas changed during rolling update.
×
328
        if replicasUpdated {
×
329
                return min(partition, burstReplicas), wantReplicas(lwsUnreadyReplicas), nil
×
330
        }
×
331

332
        // Case 5:
333
        // Calculating the Partition during rolling update, no leaderWorkerSet updates happens.
334

335
        rollingStep, err := intstr.GetScaledValueFromIntOrPercent(&lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxUnavailable, int(lwsReplicas), false)
×
336
        if err != nil {
×
337
                return 0, 0, err
×
338
        }
×
339
        // Make sure that we always respect the maxUnavailable, or
340
        // we'll violate it when reclaiming bursted replicas.
341
        rollingStep += maxSurge - (int(burstReplicas) - int(stsReplicas))
×
342

×
343
        return rollingUpdatePartition(states, stsReplicas, int32(rollingStep), partition), wantReplicas(lwsUnreadyReplicas), nil
×
344
}
345

346
func (r *LeaderWorkerSetReconciler) SSAWithStatefulset(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32, revisionKey string) error {
×
347
        log := ctrl.LoggerFrom(ctx)
×
348

×
349
        // construct the statefulset apply configuration
×
350
        leaderStatefulSetApplyConfig, err := constructLeaderStatefulSetApplyConfiguration(lws, partition, replicas, revisionKey)
×
351
        if err != nil {
×
352
                log.Error(err, "Constructing StatefulSet apply configuration.")
×
353
                return err
×
354
        }
×
355
        if err := setControllerReferenceWithStatefulSet(lws, leaderStatefulSetApplyConfig, r.Scheme); err != nil {
×
356
                log.Error(err, "Setting controller reference.")
×
357
                return err
×
358
        }
×
359
        obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(leaderStatefulSetApplyConfig)
×
360
        if err != nil {
×
361
                log.Error(err, "Converting StatefulSet configuration to json.")
×
362
                return err
×
363
        }
×
364
        patch := &unstructured.Unstructured{
×
365
                Object: obj,
×
366
        }
×
367
        // Use server side apply and add fieldmanager to the lws owned fields
×
368
        // If there are conflicts in the fields owned by the lws controller, lws will obtain the ownership and force override
×
369
        // these fields to the ones desired by the lws controller
×
370
        // TODO b/316776287 add E2E test for SSA
×
NEW
371
        err = r.Patch(ctx, patch, client.Apply, &client.PatchOptions{
×
372
                FieldManager: fieldManager,
×
373
                Force:        ptr.To[bool](true),
×
374
        })
×
375
        if err != nil {
×
376
                log.Error(err, "Using server side apply to update leader statefulset")
×
377
                return err
×
378
        }
×
379

380
        return nil
×
381
}
382

383
// updates the condition of the leaderworkerset to either Progressing or Available.
384
func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, revisionKey string) (bool, bool, error) {
×
385
        log := ctrl.LoggerFrom(ctx)
×
386
        podSelector := client.MatchingLabels(map[string]string{
×
387
                leaderworkerset.SetNameLabelKey:     lws.Name,
×
388
                leaderworkerset.WorkerIndexLabelKey: "0",
×
389
        })
×
390
        leaderPodList := &corev1.PodList{}
×
391
        if err := r.List(ctx, leaderPodList, podSelector, client.InNamespace(lws.Namespace)); err != nil {
×
392
                log.Error(err, "Fetching leaderPods")
×
393
                return false, false, err
×
394
        }
×
395

396
        updateStatus := false
×
397
        readyCount, updatedCount, readyNonBurstWorkerCount := 0, 0, 0
×
398
        partitionedUpdatedNonBurstCount, partitionedCurrentNonBurstCount, partitionedUpdatedAndReadyCount := 0, 0, 0
×
399
        noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1
×
400
        lwsPartition := *lws.Spec.RolloutStrategy.RollingUpdateConfiguration.Partition
×
401

×
402
        // Iterate through all leaderPods.
×
403
        for _, pod := range leaderPodList.Items {
×
404
                index, err := strconv.Atoi(pod.Labels[leaderworkerset.GroupIndexLabelKey])
×
405
                if err != nil {
×
406
                        return false, false, err
×
407
                }
×
408

409
                var sts appsv1.StatefulSet
×
410
                if !noWorkerSts {
×
411
                        if err := r.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: pod.Name}, &sts); err != nil {
×
412
                                if client.IgnoreNotFound(err) != nil {
×
413
                                        log.Error(err, "Fetching worker statefulSet")
×
414
                                        return false, false, err
×
415
                                }
×
416
                                continue
×
417
                        }
418
                }
419

420
                if index < int(*lws.Spec.Replicas) && index >= int(lwsPartition) {
×
421
                        partitionedCurrentNonBurstCount++
×
422
                }
×
423

424
                var ready, updated bool
×
425
                if (noWorkerSts || statefulsetutils.StatefulsetReady(sts)) && podutils.PodRunningAndReady(pod) {
×
426
                        ready = true
×
427
                        readyCount++
×
428
                }
×
429
                if (noWorkerSts || revisionutils.GetRevisionKey(&sts) == revisionKey) && revisionutils.GetRevisionKey(&pod) == revisionKey {
×
430
                        updated = true
×
431
                        updatedCount++
×
432
                        if index < int(*lws.Spec.Replicas) && index >= int(lwsPartition) {
×
433
                                // Bursted replicas do not count when determining if rollingUpdate has been completed.
×
434
                                partitionedUpdatedNonBurstCount++
×
435
                        }
×
436
                }
437

438
                if index < int(*lws.Spec.Replicas) {
×
439
                        if ready {
×
440
                                readyNonBurstWorkerCount++
×
441
                        }
×
442
                        if index >= int(lwsPartition) && ready && updated {
×
443
                                partitionedUpdatedAndReadyCount++
×
444
                        }
×
445
                }
446
        }
447

448
        if lws.Status.ReadyReplicas != int32(readyCount) {
×
449
                lws.Status.ReadyReplicas = int32(readyCount)
×
450
                updateStatus = true
×
451
        }
×
452

453
        if lws.Status.UpdatedReplicas != int32(updatedCount) {
×
454
                lws.Status.UpdatedReplicas = int32(updatedCount)
×
455
                updateStatus = true
×
456
        }
×
457

458
        var conditions []metav1.Condition
×
459
        if partitionedUpdatedNonBurstCount < partitionedCurrentNonBurstCount {
×
460
                // upgradeInProgress is true when the upgrade replicas is smaller than the expected
×
461
                // number of total replicas not including the burst replicas
×
462
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetUpdateInProgress))
×
463
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetProgressing))
×
464
        } else if readyNonBurstWorkerCount == int(*lws.Spec.Replicas) && partitionedUpdatedAndReadyCount == partitionedCurrentNonBurstCount {
×
465
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetAvailable))
×
466
        } else {
×
467
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetProgressing))
×
468
        }
×
469

470
        // updateDone is true when all replicas are updated and ready
471
        updateDone := (lwsPartition == 0) && partitionedUpdatedAndReadyCount == int(*lws.Spec.Replicas)
×
472

×
473
        updateCondition := setConditions(lws, conditions)
×
474
        // if condition changed, record events
×
475
        if updateCondition {
×
476
                r.Record.Eventf(lws, corev1.EventTypeNormal, conditions[0].Reason, conditions[0].Message+fmt.Sprintf(", with %d groups ready of total %d groups", readyCount, int(*lws.Spec.Replicas)))
×
477
        }
×
478
        return updateStatus || updateCondition, updateDone, nil
×
479
}
480

481
// Updates status and condition of LeaderWorkerSet and returns whether or not an update actually occurred.
482
func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, revisionKey string) (bool, error) {
×
483
        updateStatus := false
×
484
        log := ctrl.LoggerFrom(ctx)
×
485

×
486
        // Retrieve the leader StatefulSet.
×
487
        sts := &appsv1.StatefulSet{}
×
488
        if err := r.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, sts); err != nil {
×
489
                log.Error(err, "Error retrieving leader StatefulSet")
×
490
                return false, err
×
491
        }
×
492

493
        // retrieve the current number of replicas -- the number of leaders
494
        replicas := int(sts.Status.Replicas)
×
495
        if lws.Status.Replicas != int32(replicas) {
×
496
                lws.Status.Replicas = int32(replicas)
×
497
                updateStatus = true
×
498
        }
×
499

500
        if lws.Status.HPAPodSelector == "" {
×
501
                labelSelector := &metav1.LabelSelector{
×
502
                        MatchLabels: map[string]string{
×
503
                                leaderworkerset.SetNameLabelKey:     lws.Name,
×
504
                                leaderworkerset.WorkerIndexLabelKey: "0", // select leaders
×
505
                        },
×
506
                }
×
507
                selector, err := metav1.LabelSelectorAsSelector(labelSelector)
×
508
                if err != nil {
×
509
                        log.Error(err, "Converting label selector to selector")
×
510
                        return false, err
×
511
                }
×
512

513
                lws.Status.HPAPodSelector = selector.String()
×
514
                updateStatus = true
×
515
        }
516

517
        // check if an update is needed
518
        updateConditions, updateDone, err := r.updateConditions(ctx, lws, revisionKey)
×
519
        if err != nil {
×
520
                return false, err
×
521
        }
×
522

523
        if updateStatus || updateConditions {
×
524
                if err := r.Status().Update(ctx, lws); err != nil {
×
525
                        if !apierrors.IsConflict(err) {
×
526
                                log.Error(err, "Updating LeaderWorkerSet status and/or condition.")
×
527
                        }
×
528
                        return false, err
×
529
                }
530
        }
531
        return updateDone, nil
×
532
}
533

534
type replicaState struct {
535
        // ready indicates whether both the leader pod and its worker statefulset (if any) are ready.
536
        ready bool
537
        // updated indicates whether both the leader pod and its worker statefulset (if any) are updated to the latest revision.
538
        updated bool
539
}
540

541
func (r *LeaderWorkerSetReconciler) getReplicaStates(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, stsReplicas int32, revisionKey string) ([]replicaState, error) {
×
542
        states := make([]replicaState, stsReplicas)
×
543

×
544
        podSelector := client.MatchingLabels(map[string]string{
×
545
                leaderworkerset.SetNameLabelKey:     lws.Name,
×
546
                leaderworkerset.WorkerIndexLabelKey: "0",
×
547
        })
×
548
        var leaderPodList corev1.PodList
×
549
        if err := r.List(ctx, &leaderPodList, podSelector, client.InNamespace(lws.Namespace)); err != nil {
×
550
                return nil, err
×
551
        }
×
552

553
        // Get a sorted leader pod list matches with the following sorted statefulsets one by one, which means
554
        // the leader pod and the corresponding worker statefulset has the same index.
555
        sortedPods := utils.SortByIndex(func(pod corev1.Pod) (int, error) {
×
556
                return strconv.Atoi(pod.Labels[leaderworkerset.GroupIndexLabelKey])
×
557
        }, leaderPodList.Items, int(stsReplicas))
×
558

559
        stsSelector := client.MatchingLabels(map[string]string{
×
560
                leaderworkerset.SetNameLabelKey: lws.Name,
×
561
        })
×
562
        var stsList appsv1.StatefulSetList
×
563
        if err := r.List(ctx, &stsList, stsSelector, client.InNamespace(lws.Namespace)); err != nil {
×
564
                return nil, err
×
565
        }
×
566
        sortedSts := utils.SortByIndex(func(sts appsv1.StatefulSet) (int, error) {
×
567
                return strconv.Atoi(sts.Labels[leaderworkerset.GroupIndexLabelKey])
×
568
        }, stsList.Items, int(stsReplicas))
×
569

570
        // Once size==1, no worker statefulSets will be created.
571
        noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1
×
572

×
573
        for idx := int32(0); idx < stsReplicas; idx++ {
×
574
                nominatedName := fmt.Sprintf("%s-%d", lws.Name, idx)
×
575
                // It can happen that the leader pod or the worker statefulset hasn't created yet
×
576
                // or under rebuilding, which also indicates not ready.
×
577
                if nominatedName != sortedPods[idx].Name || (!noWorkerSts && nominatedName != sortedSts[idx].Name) {
×
578
                        states[idx] = replicaState{
×
579
                                ready:   false,
×
580
                                updated: false,
×
581
                        }
×
582
                        continue
×
583
                }
584

585
                leaderUpdated := revisionutils.GetRevisionKey(&sortedPods[idx]) == revisionKey
×
586
                leaderReady := podutils.PodRunningAndReady(sortedPods[idx])
×
587

×
588
                if noWorkerSts {
×
589
                        states[idx] = replicaState{
×
590
                                ready:   leaderReady,
×
591
                                updated: leaderUpdated,
×
592
                        }
×
593
                        continue
×
594
                }
595

596
                workersUpdated := revisionutils.GetRevisionKey(&sortedSts[idx]) == revisionKey
×
597
                workersReady := statefulsetutils.StatefulsetReady(sortedSts[idx])
×
598

×
599
                states[idx] = replicaState{
×
600
                        ready:   leaderReady && workersReady,
×
601
                        updated: leaderUpdated && workersUpdated,
×
602
                }
×
603
        }
604

605
        return states, nil
×
606
}
607

608
func rollingUpdatePartition(states []replicaState, stsReplicas int32, rollingStep int32, currentPartition int32) int32 {
×
609
        continuousReadyReplicas := calculateContinuousReadyReplicas(states)
×
610

×
611
        // Update up to rollingStep replicas at once.
×
612
        var rollingStepPartition = utils.NonZeroValue(stsReplicas - continuousReadyReplicas - rollingStep)
×
613

×
614
        // rollingStepPartition calculation above disregards the state of replicas with idx<rollingStepPartition.
×
615
        // To prevent violating the maxUnavailable, we have to account for these replicas and increase the partition if some are not ready.
×
616
        var unavailable int32
×
617
        for idx := 0; idx < int(rollingStepPartition); idx++ {
×
618
                if !states[idx].ready {
×
619
                        unavailable++
×
620
                }
×
621
        }
622
        var partition = rollingStepPartition + unavailable
×
623

×
624
        // Reduce the partition if replicas are continuously not ready. It is safe since updating these replicas does not impact
×
625
        // the availability of the LWS. This is important to prevent update from getting stuck in case maxUnavailable is already violated
×
626
        // (for example, all replicas are not ready when rolling update is started).
×
627
        // Note that we never drop the partition below rolliingStepPartition.
×
628
        for idx := min(partition, stsReplicas-1); idx >= rollingStepPartition; idx-- {
×
629
                if !states[idx].ready || states[idx].updated {
×
630
                        partition = idx
×
631
                } else {
×
632
                        break
×
633
                }
634
        }
635

636
        // That means Partition moves in one direction to make it simple.
637
        return min(partition, currentPartition)
×
638
}
639

640
func calculateLWSUnreadyReplicas(states []replicaState, lwsReplicas int32) int32 {
×
641
        var unreadyCount int32
×
642
        for idx := int32(0); idx < lwsReplicas; idx++ {
×
643
                if idx >= int32(len(states)) || !states[idx].ready || !states[idx].updated {
×
644
                        unreadyCount++
×
645
                }
×
646
        }
647
        return unreadyCount
×
648
}
649

650
func calculateContinuousReadyReplicas(states []replicaState) int32 {
×
651
        // Count ready replicas at tail (from last index down)
×
652
        var continuousReadyCount int32
×
653
        for idx := len(states) - 1; idx >= 0; idx-- {
×
654
                if !states[idx].ready || !states[idx].updated {
×
655
                        break
×
656
                }
657
                continuousReadyCount++
×
658
        }
659
        return continuousReadyCount
×
660
}
661

662
func (r *LeaderWorkerSetReconciler) getLeaderStatefulSet(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) (*appsv1.StatefulSet, error) {
×
663
        sts := &appsv1.StatefulSet{}
×
664
        err := r.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, sts)
×
665
        if err != nil {
×
666
                if apierrors.IsNotFound(err) {
×
667
                        return nil, nil
×
668
                }
×
669
                return nil, err
×
670
        }
671
        return sts, nil
×
672
}
673

674
func (r *LeaderWorkerSetReconciler) getOrCreateRevisionIfNonExist(ctx context.Context, sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet, recorder record.EventRecorder) (*appsv1.ControllerRevision, error) {
×
675
        revisionKey := ""
×
676
        if sts != nil {
×
677
                // Uses the hash in the leader sts to avoid detecting update in the case where LWS controller is upgraded from a version where
×
678
                // the revisionKey was used to detect update instead of controller revision.
×
679
                revisionKey = revisionutils.GetRevisionKey(sts)
×
680
        }
×
681
        if stsRevision, err := revisionutils.GetRevision(ctx, r.Client, lws, revisionKey); stsRevision != nil || err != nil {
×
682
                return stsRevision, err
×
683
        }
×
684
        revision, err := revisionutils.NewRevision(ctx, r.Client, lws, revisionKey)
×
685
        if err != nil {
×
686
                return nil, err
×
687
        }
×
688
        newRevision, err := revisionutils.CreateRevision(ctx, r.Client, revision)
×
689
        if err == nil {
×
690
                message := fmt.Sprintf("Creating revision with key %s for a newly created LeaderWorkerSet", revision.Labels[leaderworkerset.RevisionKey])
×
691
                if revisionKey != "" {
×
692
                        message = fmt.Sprintf("Creating missing revision with key %s for existing LeaderWorkerSet", revision.Labels[leaderworkerset.RevisionKey])
×
693
                }
×
694
                recorder.Eventf(lws, corev1.EventTypeNormal, CreatingRevision, message)
×
695
        }
696
        return newRevision, err
×
697
}
698

699
func (r *LeaderWorkerSetReconciler) getUpdatedRevision(ctx context.Context, sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet, revision *appsv1.ControllerRevision) (*appsv1.ControllerRevision, error) {
×
700
        if sts == nil {
×
701
                return nil, nil
×
702
        }
×
703

704
        currentRevision, err := revisionutils.NewRevision(ctx, r.Client, lws, "")
×
705
        if err != nil {
×
706
                return nil, err
×
707
        }
×
708

709
        if !revisionutils.EqualRevision(currentRevision, revision) {
×
710
                return currentRevision, nil
×
711
        }
×
712

713
        return nil, nil
×
714
}
715

716
// constructLeaderStatefulSetApplyConfiguration constructs the applied configuration for the leader StatefulSet
717
func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32, revisionKey string) (*appsapplyv1.StatefulSetApplyConfiguration, error) {
6✔
718
        var podTemplateSpec corev1.PodTemplateSpec
6✔
719
        if lws.Spec.LeaderWorkerTemplate.LeaderTemplate != nil {
8✔
720
                podTemplateSpec = *lws.Spec.LeaderWorkerTemplate.LeaderTemplate.DeepCopy()
2✔
721
        } else {
6✔
722
                podTemplateSpec = *lws.Spec.LeaderWorkerTemplate.WorkerTemplate.DeepCopy()
4✔
723
        }
4✔
724
        // construct pod template spec configuration
725
        obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&podTemplateSpec)
6✔
726
        if err != nil {
6✔
727
                return nil, err
×
728
        }
×
729
        var podTemplateApplyConfiguration coreapplyv1.PodTemplateSpecApplyConfiguration
6✔
730
        err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj, &podTemplateApplyConfiguration)
6✔
731
        if err != nil {
6✔
732
                return nil, err
×
733
        }
×
734

735
        podTemplateApplyConfiguration.WithLabels(map[string]string{
6✔
736
                leaderworkerset.WorkerIndexLabelKey: "0",
6✔
737
                leaderworkerset.SetNameLabelKey:     lws.Name,
6✔
738
                leaderworkerset.RevisionKey:         revisionKey,
6✔
739
        })
6✔
740
        podAnnotations := make(map[string]string)
6✔
741
        podAnnotations[leaderworkerset.SizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.Size))
6✔
742
        if lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey] != "" {
8✔
743
                podAnnotations[leaderworkerset.ExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey]
2✔
744
        }
2✔
745
        if lws.Spec.LeaderWorkerTemplate.SubGroupPolicy != nil {
7✔
746
                podAnnotations[leaderworkerset.SubGroupPolicyTypeAnnotationKey] = (string(*lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.Type))
1✔
747
                podAnnotations[leaderworkerset.SubGroupSizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.SubGroupSize))
1✔
748
                if lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] != "" {
2✔
749
                        podAnnotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey]
1✔
750
                }
1✔
751
        }
752

753
        if lws.Spec.NetworkConfig != nil && *lws.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainUniquePerReplica {
6✔
754
                podAnnotations[leaderworkerset.SubdomainPolicyAnnotationKey] = string(leaderworkerset.SubdomainUniquePerReplica)
×
755
        }
×
756

757
        podTemplateApplyConfiguration.WithAnnotations(podAnnotations)
6✔
758

6✔
759
        // construct statefulset apply configuration
6✔
760
        statefulSetConfig := appsapplyv1.StatefulSet(lws.Name, lws.Namespace).
6✔
761
                WithSpec(appsapplyv1.StatefulSetSpec().
6✔
762
                        WithServiceName(lws.Name).
6✔
763
                        WithReplicas(replicas).
6✔
764
                        WithPodManagementPolicy(appsv1.ParallelPodManagement).
6✔
765
                        WithTemplate(&podTemplateApplyConfiguration).
6✔
766
                        WithUpdateStrategy(appsapplyv1.StatefulSetUpdateStrategy().WithType(appsv1.StatefulSetUpdateStrategyType(lws.Spec.RolloutStrategy.Type)).WithRollingUpdate(
6✔
767
                                appsapplyv1.RollingUpdateStatefulSetStrategy().WithMaxUnavailable(lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxUnavailable).WithPartition(partition),
6✔
768
                        )).
6✔
769
                        WithSelector(metaapplyv1.LabelSelector().
6✔
770
                                WithMatchLabels(map[string]string{
6✔
771
                                        leaderworkerset.SetNameLabelKey:     lws.Name,
6✔
772
                                        leaderworkerset.WorkerIndexLabelKey: "0",
6✔
773
                                }))).
6✔
774
                WithLabels(map[string]string{
6✔
775
                        leaderworkerset.SetNameLabelKey: lws.Name,
6✔
776
                        leaderworkerset.RevisionKey:     revisionKey,
6✔
777
                }).
6✔
778
                WithAnnotations(map[string]string{
6✔
779
                        leaderworkerset.ReplicasAnnotationKey: strconv.Itoa(int(*lws.Spec.Replicas)),
6✔
780
                })
6✔
781

6✔
782
        pvcApplyConfiguration := controllerutils.GetPVCApplyConfiguration(lws)
6✔
783
        if len(pvcApplyConfiguration) > 0 {
7✔
784
                statefulSetConfig.Spec.WithVolumeClaimTemplates(pvcApplyConfiguration...)
1✔
785
        }
1✔
786

787
        if lws.Spec.LeaderWorkerTemplate.PersistentVolumeClaimRetentionPolicy != nil {
7✔
788
                pvcRetentionPolicy := &appsapplyv1.StatefulSetPersistentVolumeClaimRetentionPolicyApplyConfiguration{
1✔
789
                        WhenDeleted: &lws.Spec.LeaderWorkerTemplate.PersistentVolumeClaimRetentionPolicy.WhenDeleted,
1✔
790
                        WhenScaled:  &lws.Spec.LeaderWorkerTemplate.PersistentVolumeClaimRetentionPolicy.WhenScaled,
1✔
791
                }
1✔
792
                statefulSetConfig.Spec.WithPersistentVolumeClaimRetentionPolicy(pvcRetentionPolicy)
1✔
793
        }
1✔
794
        return statefulSetConfig, nil
6✔
795
}
796

797
func makeCondition(conditionType leaderworkerset.LeaderWorkerSetConditionType) metav1.Condition {
×
798
        var condtype, reason, message string
×
799
        switch conditionType {
×
800
        case leaderworkerset.LeaderWorkerSetAvailable:
×
801
                condtype = string(leaderworkerset.LeaderWorkerSetAvailable)
×
802
                reason = "AllGroupsReady"
×
803
                message = "All replicas are ready"
×
804
        case leaderworkerset.LeaderWorkerSetUpdateInProgress:
×
805
                condtype = string(leaderworkerset.LeaderWorkerSetUpdateInProgress)
×
806
                reason = GroupsUpdating
×
807
                message = "Rolling Upgrade is in progress"
×
808
        default:
×
809
                condtype = string(leaderworkerset.LeaderWorkerSetProgressing)
×
810
                reason = GroupsProgressing
×
811
                message = "Replicas are progressing"
×
812
        }
813

814
        condition := metav1.Condition{
×
815
                Type:               condtype,
×
816
                Status:             metav1.ConditionStatus(corev1.ConditionTrue),
×
817
                LastTransitionTime: metav1.Now(),
×
818
                Reason:             reason,
×
819
                Message:            message,
×
820
        }
×
821
        return condition
×
822
}
823

824
func setConditions(lws *leaderworkerset.LeaderWorkerSet, conditions []metav1.Condition) bool {
×
825
        shouldUpdate := false
×
826
        for _, condition := range conditions {
×
827
                shouldUpdate = shouldUpdate || setCondition(lws, condition)
×
828
        }
×
829

830
        return shouldUpdate
×
831
}
832

833
func setCondition(lws *leaderworkerset.LeaderWorkerSet, newCondition metav1.Condition) bool {
6✔
834
        newCondition.LastTransitionTime = metav1.Now()
6✔
835
        found := false
6✔
836
        shouldUpdate := false
6✔
837

6✔
838
        // Precondition: newCondition has status true.
6✔
839
        for i, curCondition := range lws.Status.Conditions {
11✔
840
                if newCondition.Type == curCondition.Type {
7✔
841
                        if newCondition.Status != curCondition.Status {
3✔
842
                                // the conditions match but one is true and one is false. Update the stored condition
1✔
843
                                // with the new condition.
1✔
844
                                lws.Status.Conditions[i] = newCondition
1✔
845
                                shouldUpdate = true
1✔
846
                        }
1✔
847
                        // if both are true or both are false, do nothing.
848
                        found = true
2✔
849
                } else {
3✔
850
                        // if the conditions are not of the same type, do nothing unless one is Progressing and one is
3✔
851
                        // Available and both are true. Must be mutually exclusive.
3✔
852
                        if exclusiveConditionTypes(curCondition, newCondition) &&
3✔
853
                                (newCondition.Status == metav1.ConditionTrue) && (curCondition.Status == metav1.ConditionTrue) {
4✔
854
                                // Progressing is true and Available is true. Prevent this.
1✔
855
                                lws.Status.Conditions[i].Status = metav1.ConditionFalse
1✔
856
                                shouldUpdate = true
1✔
857
                        }
1✔
858
                }
859
        }
860
        // condition doesn't exist, update only if the status is true
861
        if newCondition.Status == metav1.ConditionTrue && !found {
9✔
862
                lws.Status.Conditions = append(lws.Status.Conditions, newCondition)
3✔
863
                shouldUpdate = true
3✔
864
        }
3✔
865
        return shouldUpdate
6✔
866
}
867

868
func exclusiveConditionTypes(condition1 metav1.Condition, condition2 metav1.Condition) bool {
10✔
869
        if (condition1.Type == string(leaderworkerset.LeaderWorkerSetAvailable) && condition2.Type == string(leaderworkerset.LeaderWorkerSetProgressing)) ||
10✔
870
                (condition1.Type == string(leaderworkerset.LeaderWorkerSetProgressing) && condition2.Type == string(leaderworkerset.LeaderWorkerSetAvailable)) {
15✔
871
                return true
5✔
872
        }
5✔
873

874
        if (condition1.Type == string(leaderworkerset.LeaderWorkerSetAvailable) && condition2.Type == string(leaderworkerset.LeaderWorkerSetUpdateInProgress)) ||
5✔
875
                (condition1.Type == string(leaderworkerset.LeaderWorkerSetUpdateInProgress) && condition2.Type == string(leaderworkerset.LeaderWorkerSetAvailable)) {
7✔
876
                return true
2✔
877
        }
2✔
878

879
        return false
3✔
880
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc