• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / lws / 17861757619

19 Sep 2025 02:49PM UTC coverage: 35.096% (+1.3%) from 33.767%
17861757619

Pull #649

github

andyzhangx
doc: add example in website
Pull Request #649: doc: add volumeClaimTemplates example

803 of 2288 relevant lines covered (35.1%)

2.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.3
/pkg/controllers/leaderworkerset_controller.go
1
/*
2
Copyright 2023.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controllers
18

19
import (
20
        "context"
21
        "fmt"
22
        "strconv"
23

24
        appsv1 "k8s.io/api/apps/v1"
25
        corev1 "k8s.io/api/core/v1"
26
        apierrors "k8s.io/apimachinery/pkg/api/errors"
27
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28
        "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
29
        "k8s.io/apimachinery/pkg/runtime"
30
        "k8s.io/apimachinery/pkg/types"
31
        "k8s.io/apimachinery/pkg/util/intstr"
32
        appsapplyv1 "k8s.io/client-go/applyconfigurations/apps/v1"
33
        coreapplyv1 "k8s.io/client-go/applyconfigurations/core/v1"
34
        metaapplyv1 "k8s.io/client-go/applyconfigurations/meta/v1"
35
        "k8s.io/client-go/tools/record"
36
        "k8s.io/klog/v2"
37
        "k8s.io/utils/ptr"
38
        ctrl "sigs.k8s.io/controller-runtime"
39
        "sigs.k8s.io/controller-runtime/pkg/client"
40
        "sigs.k8s.io/controller-runtime/pkg/handler"
41
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
42

43
        leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1"
44
        "sigs.k8s.io/lws/pkg/utils"
45
        controllerutils "sigs.k8s.io/lws/pkg/utils/controller"
46
        podutils "sigs.k8s.io/lws/pkg/utils/pod"
47
        revisionutils "sigs.k8s.io/lws/pkg/utils/revision"
48
        statefulsetutils "sigs.k8s.io/lws/pkg/utils/statefulset"
49
)
50

51
// LeaderWorkerSetReconciler reconciles a LeaderWorkerSet object
52
type LeaderWorkerSetReconciler struct {
53
        client.Client
54
        Scheme *runtime.Scheme
55
        Record record.EventRecorder
56
}
57

58
var (
59
        apiGVStr = leaderworkerset.GroupVersion.String()
60
)
61

62
const (
63
        lwsOwnerKey  = ".metadata.controller"
64
        fieldManager = "lws"
65
)
66

67
const (
68
        // FailedCreate Event reason used when a resource creation fails.
69
        // The event uses the error(s) as the reason.
70
        FailedCreate      = "FailedCreate"
71
        GroupsProgressing = "GroupsProgressing"
72
        GroupsUpdating    = "GroupsUpdating"
73
        CreatingRevision  = "CreatingRevision"
74
)
75

76
func NewLeaderWorkerSetReconciler(client client.Client, scheme *runtime.Scheme, record record.EventRecorder) *LeaderWorkerSetReconciler {
×
77
        return &LeaderWorkerSetReconciler{
×
78
                Client: client,
×
79
                Scheme: scheme,
×
80
                Record: record,
×
81
        }
×
82
}
×
83

84
//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch
85
//+kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets,verbs=get;list;watch;create;update;patch;delete
86
//+kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets/status,verbs=get;update;patch
87
//+kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets/finalizers,verbs=update
88
//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
89
//+kubebuilder:rbac:groups=apps,resources=statefulsets/status,verbs=get;update;patch
90
//+kubebuilder:rbac:groups=apps,resources=statefulsets/finalizers,verbs=update
91
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
92
//+kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;patch
93
//+kubebuilder:rbac:groups=apps,resources=controllerrevisions,verbs=get;list;watch;create;update;patch;delete
94
//+kubebuilder:rbac:groups=apps,resources=controllerrevisions/status,verbs=get;update;patch
95
//+kubebuilder:rbac:groups=apps,resources=controllerrevisions/finalizers,verbs=update
96

97
func (r *LeaderWorkerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
×
98
        // Get leaderworkerset object
×
99
        lws := &leaderworkerset.LeaderWorkerSet{}
×
100
        if err := r.Get(ctx, types.NamespacedName{Name: req.Name, Namespace: req.Namespace}, lws); err != nil {
×
101
                return ctrl.Result{}, client.IgnoreNotFound(err)
×
102
        }
×
103

104
        if lws.DeletionTimestamp != nil {
×
105
                return ctrl.Result{}, nil
×
106
        }
×
107

108
        log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws))
×
109
        ctx = ctrl.LoggerInto(ctx, log)
×
110

×
111
        leaderSts, err := r.getLeaderStatefulSet(ctx, lws)
×
112
        if err != nil {
×
113
                log.Error(err, "Fetching leader statefulset")
×
114
                return ctrl.Result{}, err
×
115
        }
×
116

117
        // Handles two cases:
118
        // Case 1: Upgrading the LWS controller from a version that doesn't support controller revision
119
        // Case 2: Creating the controller revision for a newly created LWS object
120
        revision, err := r.getOrCreateRevisionIfNonExist(ctx, leaderSts, lws, r.Record)
×
121
        if err != nil {
×
122
                log.Error(err, "Creating controller revision")
×
123
                return ctrl.Result{}, err
×
124
        }
×
125

126
        updatedRevision, err := r.getUpdatedRevision(ctx, leaderSts, lws, revision)
×
127
        if err != nil {
×
128
                log.Error(err, "Validating if LWS has been updated")
×
129
                return ctrl.Result{}, err
×
130
        }
×
131
        lwsUpdated := updatedRevision != nil
×
132
        if lwsUpdated {
×
133
                revision, err = revisionutils.CreateRevision(ctx, r.Client, updatedRevision)
×
134
                if err != nil {
×
135
                        log.Error(err, "Creating revision for updated LWS")
×
136
                        return ctrl.Result{}, err
×
137
                }
×
138
                r.Record.Eventf(lws, corev1.EventTypeNormal, CreatingRevision, fmt.Sprintf("Creating revision with key %s for updated LWS", revisionutils.GetRevisionKey(revision)))
×
139
        }
140

141
        partition, replicas, err := r.rollingUpdateParameters(ctx, lws, leaderSts, revisionutils.GetRevisionKey(revision), lwsUpdated)
×
142
        if err != nil {
×
143
                log.Error(err, "Rolling partition error")
×
144
                return ctrl.Result{}, err
×
145
        }
×
146

147
        if err := r.SSAWithStatefulset(ctx, lws, partition, replicas, revisionutils.GetRevisionKey(revision)); err != nil {
×
148
                if leaderSts == nil {
×
149
                        r.Record.Eventf(lws, corev1.EventTypeWarning, FailedCreate, fmt.Sprintf("Failed to create leader statefulset %s", lws.Name))
×
150
                }
×
151
                return ctrl.Result{}, err
×
152
        }
153

154
        if leaderSts == nil {
×
155
                // An event is logged to track sts creation.
×
156
                r.Record.Eventf(lws, corev1.EventTypeNormal, GroupsProgressing, fmt.Sprintf("Created leader statefulset %s", lws.Name))
×
157
        } else if !lwsUpdated && partition != *leaderSts.Spec.UpdateStrategy.RollingUpdate.Partition {
×
158
                // An event is logged to track update progress.
×
159
                r.Record.Eventf(lws, corev1.EventTypeNormal, GroupsUpdating, fmt.Sprintf("Updating replicas %d to %d", *leaderSts.Spec.UpdateStrategy.RollingUpdate.Partition, partition))
×
160
        }
×
161

162
        // Create headless service if it does not exist.
163
        if err := r.reconcileHeadlessServices(ctx, lws); err != nil {
×
164
                log.Error(err, "Creating headless service.")
×
165
                r.Record.Eventf(lws, corev1.EventTypeWarning, FailedCreate,
×
166
                        fmt.Sprintf("Failed to create headless service for error: %v", err))
×
167
                return ctrl.Result{}, err
×
168
        }
×
169

170
        updateDone, err := r.updateStatus(ctx, lws, revisionutils.GetRevisionKey(revision))
×
171
        if err != nil {
×
172
                if apierrors.IsConflict(err) {
×
173
                        return ctrl.Result{Requeue: true}, nil
×
174
                }
×
175
                return ctrl.Result{}, err
×
176
        }
177

178
        if updateDone {
×
179
                if err := revisionutils.TruncateRevisions(ctx, r.Client, lws, revisionutils.GetRevisionKey(revision)); err != nil {
×
180
                        return ctrl.Result{}, err
×
181
                }
×
182
        }
183
        log.V(2).Info("Leader Reconcile completed.")
×
184
        return ctrl.Result{}, nil
×
185
}
186

187
func (r *LeaderWorkerSetReconciler) reconcileHeadlessServices(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) error {
×
188
        if lws.Spec.NetworkConfig == nil || *lws.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainShared {
×
189
                if err := controllerutils.CreateHeadlessServiceIfNotExists(ctx, r.Client, r.Scheme, lws, lws.Name, map[string]string{leaderworkerset.SetNameLabelKey: lws.Name}, lws); err != nil {
×
190
                        return err
×
191
                }
×
192
                return nil
×
193
        }
194
        return nil
×
195
}
196

197
// SetupWithManager sets up the controller with the Manager.
198
func (r *LeaderWorkerSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
×
199
        return ctrl.NewControllerManagedBy(mgr).
×
200
                For(&leaderworkerset.LeaderWorkerSet{}).
×
201
                Owns(&appsv1.StatefulSet{}).
×
202
                Owns(&corev1.Service{}).
×
203
                Watches(&appsv1.StatefulSet{},
×
204
                        handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request {
×
205
                                return []reconcile.Request{
×
206
                                        {NamespacedName: types.NamespacedName{
×
207
                                                Name:      a.GetLabels()[leaderworkerset.SetNameLabelKey],
×
208
                                                Namespace: a.GetNamespace(),
×
209
                                        }},
×
210
                                }
×
211
                        })).
×
212
                Complete(r)
213
}
214

215
func SetupIndexes(indexer client.FieldIndexer) error {
×
216
        return indexer.IndexField(context.Background(), &appsv1.StatefulSet{}, lwsOwnerKey, func(rawObj client.Object) []string {
×
217
                // grab the statefulSet object, extract the owner...
×
218
                statefulSet := rawObj.(*appsv1.StatefulSet)
×
219
                owner := metav1.GetControllerOf(statefulSet)
×
220
                if owner == nil {
×
221
                        return nil
×
222
                }
×
223
                // ...make sure it's a LeaderWorkerSet...
224
                if owner.APIVersion != apiGVStr || owner.Kind != "LeaderWorkerSet" {
×
225
                        return nil
×
226
                }
×
227
                // ...and if so, return it
228
                return []string{owner.Name}
×
229
        })
230
}
231

232
// Rolling update will always wait for the former replica to be ready then process the next one,
233
// we didn't consider rollout strategy type here since we only support rollingUpdate now,
234
// once we have more policies, we should update the logic here.
235
// Possible scenarios for Partition:
236
//   - When sts is under creation, partition is always 0 because pods are created in parallel, rolling update is not relevant here.
237
//   - When sts is in rolling update, the partition will start from the last index to the index 0 processing in maxUnavailable step.
238
//   - When sts is in rolling update, and Replicas increases, we'll delay the rolling update until the scaling up is done,
239
//     Partition will not change, new replicas are created using the new template from the get go.
240
//   - When sts is rolling update, and Replicas decreases, the partition will not change until new Replicas < Partition,
241
//     in which case Partition will be reset to the new Replicas value.
242
//   - When sts is ready for a rolling update and Replicas increases at the same time, we'll delay the rolling update until
243
//     the scaling up is done.
244
//   - When sts is ready for a rolling update and Replicas decreases at the same time, we'll start the rolling update
245
//     together with scaling down.
246
//
247
// At rest, Partition should always be zero.
248
//
249
// For Replicas:
250
//   - When rolling update, Replicas is equal to (spec.Replicas+maxSurge)
251
//   - Otherwise, Replicas is equal to spec.Replicas
252
//   - One exception here is when unready replicas of leaderWorkerSet is equal to MaxSurge,
253
//     we should reclaim the extra replicas gradually to accommodate for the new replicas.
254
func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, sts *appsv1.StatefulSet, revisionKey string, leaderWorkerSetUpdated bool) (stsPartition int32, replicas int32, err error) {
×
255
        log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws))
×
256
        ctx = ctrl.LoggerInto(ctx, log)
×
257
        lwsReplicas := *lws.Spec.Replicas
×
258

×
259
        defer func() {
×
260
                // Limit the replicas with less than lwsPartition will not be updated.
×
261
                stsPartition = max(stsPartition, *lws.Spec.RolloutStrategy.RollingUpdateConfiguration.Partition)
×
262
        }()
×
263

264
        // Case 1:
265
        // If sts not created yet, all partitions should be updated,
266
        // replicas should not change.
267
        if sts == nil {
×
268
                return 0, lwsReplicas, nil
×
269
        }
×
270

271
        stsReplicas := *sts.Spec.Replicas
×
272
        maxSurge, err := intstr.GetScaledValueFromIntOrPercent(&lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxSurge, int(lwsReplicas), true)
×
273
        if err != nil {
×
274
                return 0, 0, err
×
275
        }
×
276
        // No need to burst more than the replicas.
277
        if maxSurge > int(lwsReplicas) {
×
278
                maxSurge = int(lwsReplicas)
×
279
        }
×
280
        burstReplicas := lwsReplicas + int32(maxSurge)
×
281

×
282
        // wantReplicas calculates the final replicas if needed.
×
283
        wantReplicas := func(unreadyReplicas int32) int32 {
×
284
                if unreadyReplicas <= int32(maxSurge) {
×
285
                        // When we have n unready replicas and n bursted replicas, we should
×
286
                        // start to release the burst replica gradually for the accommodation of
×
287
                        // the unready ones.
×
288
                        finalReplicas := lwsReplicas + utils.NonZeroValue(int32(unreadyReplicas)-1)
×
289
                        r.Record.Eventf(lws, corev1.EventTypeNormal, GroupsProgressing, fmt.Sprintf("deleting surge replica %s-%d", lws.Name, finalReplicas))
×
290
                        return finalReplicas
×
291
                }
×
292
                return burstReplicas
×
293
        }
294

295
        // Case 2:
296
        // Indicates a new rolling update here.
297
        if leaderWorkerSetUpdated {
×
298
                // Processing scaling up/down first prior to rolling update.
×
299
                return min(lwsReplicas, stsReplicas), wantReplicas(lwsReplicas), nil
×
300
        }
×
301

302
        partition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition
×
303
        rollingUpdateCompleted := partition == 0 && stsReplicas == lwsReplicas
×
304
        // Case 3:
×
305
        // In normal cases, return the values directly.
×
306
        if rollingUpdateCompleted {
×
307
                return 0, lwsReplicas, nil
×
308
        }
×
309

310
        states, err := r.getReplicaStates(ctx, lws, stsReplicas, revisionKey)
×
311
        if err != nil {
×
312
                return 0, 0, err
×
313
        }
×
314
        lwsUnreadyReplicas := calculateLWSUnreadyReplicas(states, lwsReplicas)
×
315

×
316
        originalLwsReplicas, err := strconv.Atoi(sts.Annotations[leaderworkerset.ReplicasAnnotationKey])
×
317
        if err != nil {
×
318
                return 0, 0, err
×
319
        }
×
320
        replicasUpdated := originalLwsReplicas != int(*lws.Spec.Replicas)
×
321
        // Case 4:
×
322
        // Replicas changed during rolling update.
×
323
        if replicasUpdated {
×
324
                return min(partition, burstReplicas), wantReplicas(lwsUnreadyReplicas), nil
×
325
        }
×
326

327
        // Case 5:
328
        // Calculating the Partition during rolling update, no leaderWorkerSet updates happens.
329

330
        rollingStep, err := intstr.GetScaledValueFromIntOrPercent(&lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxUnavailable, int(lwsReplicas), false)
×
331
        if err != nil {
×
332
                return 0, 0, err
×
333
        }
×
334
        // Make sure that we always respect the maxUnavailable, or
335
        // we'll violate it when reclaiming bursted replicas.
336
        rollingStep += maxSurge - (int(burstReplicas) - int(stsReplicas))
×
337

×
338
        return rollingUpdatePartition(states, stsReplicas, int32(rollingStep), partition), wantReplicas(lwsUnreadyReplicas), nil
×
339
}
340

341
func (r *LeaderWorkerSetReconciler) SSAWithStatefulset(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32, revisionKey string) error {
×
342
        log := ctrl.LoggerFrom(ctx)
×
343

×
344
        // construct the statefulset apply configuration
×
345
        leaderStatefulSetApplyConfig, err := constructLeaderStatefulSetApplyConfiguration(lws, partition, replicas, revisionKey)
×
346
        if err != nil {
×
347
                log.Error(err, "Constructing StatefulSet apply configuration.")
×
348
                return err
×
349
        }
×
350
        if err := setControllerReferenceWithStatefulSet(lws, leaderStatefulSetApplyConfig, r.Scheme); err != nil {
×
351
                log.Error(err, "Setting controller reference.")
×
352
                return err
×
353
        }
×
354
        obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(leaderStatefulSetApplyConfig)
×
355
        if err != nil {
×
356
                log.Error(err, "Converting StatefulSet configuration to json.")
×
357
                return err
×
358
        }
×
359
        patch := &unstructured.Unstructured{
×
360
                Object: obj,
×
361
        }
×
362
        // Use server side apply and add fieldmanager to the lws owned fields
×
363
        // If there are conflicts in the fields owned by the lws controller, lws will obtain the ownership and force override
×
364
        // these fields to the ones desired by the lws controller
×
365
        // TODO b/316776287 add E2E test for SSA
×
366
        err = r.Patch(ctx, patch, client.Apply, &client.PatchOptions{
×
367
                FieldManager: fieldManager,
×
368
                Force:        ptr.To[bool](true),
×
369
        })
×
370
        if err != nil {
×
371
                log.Error(err, "Using server side apply to update leader statefulset")
×
372
                return err
×
373
        }
×
374

375
        return nil
×
376
}
377

378
// updates the condition of the leaderworkerset to either Progressing or Available.
379
func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, revisionKey string) (bool, bool, error) {
×
380
        log := ctrl.LoggerFrom(ctx)
×
381
        podSelector := client.MatchingLabels(map[string]string{
×
382
                leaderworkerset.SetNameLabelKey:     lws.Name,
×
383
                leaderworkerset.WorkerIndexLabelKey: "0",
×
384
        })
×
385
        leaderPodList := &corev1.PodList{}
×
386
        if err := r.List(ctx, leaderPodList, podSelector, client.InNamespace(lws.Namespace)); err != nil {
×
387
                log.Error(err, "Fetching leaderPods")
×
388
                return false, false, err
×
389
        }
×
390

391
        updateStatus := false
×
392
        readyCount, updatedCount, readyNonBurstWorkerCount := 0, 0, 0
×
393
        partitionedUpdatedNonBurstCount, partitionedCurrentNonBurstCount, partitionedUpdatedAndReadyCount := 0, 0, 0
×
394
        noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1
×
395
        lwsPartition := *lws.Spec.RolloutStrategy.RollingUpdateConfiguration.Partition
×
396

×
397
        // Iterate through all leaderPods.
×
398
        for _, pod := range leaderPodList.Items {
×
399
                index, err := strconv.Atoi(pod.Labels[leaderworkerset.GroupIndexLabelKey])
×
400
                if err != nil {
×
401
                        return false, false, err
×
402
                }
×
403

404
                var sts appsv1.StatefulSet
×
405
                if !noWorkerSts {
×
406
                        if err := r.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: pod.Name}, &sts); err != nil {
×
407
                                if client.IgnoreNotFound(err) != nil {
×
408
                                        log.Error(err, "Fetching worker statefulSet")
×
409
                                        return false, false, err
×
410
                                }
×
411
                                continue
×
412
                        }
413
                }
414

415
                if index < int(*lws.Spec.Replicas) && index >= int(lwsPartition) {
×
416
                        partitionedCurrentNonBurstCount++
×
417
                }
×
418

419
                var ready, updated bool
×
420
                if (noWorkerSts || statefulsetutils.StatefulsetReady(sts)) && podutils.PodRunningAndReady(pod) {
×
421
                        ready = true
×
422
                        readyCount++
×
423
                }
×
424
                if (noWorkerSts || revisionutils.GetRevisionKey(&sts) == revisionKey) && revisionutils.GetRevisionKey(&pod) == revisionKey {
×
425
                        updated = true
×
426
                        updatedCount++
×
427
                        if index < int(*lws.Spec.Replicas) && index >= int(lwsPartition) {
×
428
                                // Bursted replicas do not count when determining if rollingUpdate has been completed.
×
429
                                partitionedUpdatedNonBurstCount++
×
430
                        }
×
431
                }
432

433
                if index < int(*lws.Spec.Replicas) {
×
434
                        if ready {
×
435
                                readyNonBurstWorkerCount++
×
436
                        }
×
437
                        if index >= int(lwsPartition) && ready && updated {
×
438
                                partitionedUpdatedAndReadyCount++
×
439
                        }
×
440
                }
441
        }
442

443
        if lws.Status.ReadyReplicas != int32(readyCount) {
×
444
                lws.Status.ReadyReplicas = int32(readyCount)
×
445
                updateStatus = true
×
446
        }
×
447

448
        if lws.Status.UpdatedReplicas != int32(updatedCount) {
×
449
                lws.Status.UpdatedReplicas = int32(updatedCount)
×
450
                updateStatus = true
×
451
        }
×
452

453
        var conditions []metav1.Condition
×
454
        if partitionedUpdatedNonBurstCount < partitionedCurrentNonBurstCount {
×
455
                // upgradeInProgress is true when the upgrade replicas is smaller than the expected
×
456
                // number of total replicas not including the burst replicas
×
457
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetUpdateInProgress))
×
458
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetProgressing))
×
459
        } else if readyNonBurstWorkerCount == int(*lws.Spec.Replicas) && partitionedUpdatedAndReadyCount == partitionedCurrentNonBurstCount {
×
460
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetAvailable))
×
461
        } else {
×
462
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetProgressing))
×
463
        }
×
464

465
        // updateDone is true when all replicas are updated and ready
466
        updateDone := (lwsPartition == 0) && partitionedUpdatedAndReadyCount == int(*lws.Spec.Replicas)
×
467

×
468
        updateCondition := setConditions(lws, conditions)
×
469
        // if condition changed, record events
×
470
        if updateCondition {
×
471
                r.Record.Eventf(lws, corev1.EventTypeNormal, conditions[0].Reason, conditions[0].Message+fmt.Sprintf(", with %d groups ready of total %d groups", readyCount, int(*lws.Spec.Replicas)))
×
472
        }
×
473
        return updateStatus || updateCondition, updateDone, nil
×
474
}
475

476
// Updates status and condition of LeaderWorkerSet and returns whether or not an update actually occurred.
477
func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, revisionKey string) (bool, error) {
×
478
        updateStatus := false
×
479
        log := ctrl.LoggerFrom(ctx)
×
480

×
481
        // Retrieve the leader StatefulSet.
×
482
        sts := &appsv1.StatefulSet{}
×
483
        if err := r.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, sts); err != nil {
×
484
                log.Error(err, "Error retrieving leader StatefulSet")
×
485
                return false, err
×
486
        }
×
487

488
        // retrieve the current number of replicas -- the number of leaders
489
        replicas := int(sts.Status.Replicas)
×
490
        if lws.Status.Replicas != int32(replicas) {
×
491
                lws.Status.Replicas = int32(replicas)
×
492
                updateStatus = true
×
493
        }
×
494

495
        if lws.Status.HPAPodSelector == "" {
×
496
                labelSelector := &metav1.LabelSelector{
×
497
                        MatchLabels: map[string]string{
×
498
                                leaderworkerset.SetNameLabelKey:     lws.Name,
×
499
                                leaderworkerset.WorkerIndexLabelKey: "0", // select leaders
×
500
                        },
×
501
                }
×
502
                selector, err := metav1.LabelSelectorAsSelector(labelSelector)
×
503
                if err != nil {
×
504
                        log.Error(err, "Converting label selector to selector")
×
505
                        return false, err
×
506
                }
×
507

508
                lws.Status.HPAPodSelector = selector.String()
×
509
                updateStatus = true
×
510
        }
511

512
        // check if an update is needed
513
        updateConditions, updateDone, err := r.updateConditions(ctx, lws, revisionKey)
×
514
        if err != nil {
×
515
                return false, err
×
516
        }
×
517

518
        if updateStatus || updateConditions {
×
519
                if err := r.Status().Update(ctx, lws); err != nil {
×
520
                        if !apierrors.IsConflict(err) {
×
521
                                log.Error(err, "Updating LeaderWorkerSet status and/or condition.")
×
522
                        }
×
523
                        return false, err
×
524
                }
525
        }
526
        return updateDone, nil
×
527
}
528

529
type replicaState struct {
530
        // ready indicates whether both the leader pod and its worker statefulset (if any) are ready.
531
        ready bool
532
        // updated indicates whether both the leader pod and its worker statefulset (if any) are updated to the latest revision.
533
        updated bool
534
}
535

536
func (r *LeaderWorkerSetReconciler) getReplicaStates(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, stsReplicas int32, revisionKey string) ([]replicaState, error) {
×
537
        states := make([]replicaState, stsReplicas)
×
538

×
539
        podSelector := client.MatchingLabels(map[string]string{
×
540
                leaderworkerset.SetNameLabelKey:     lws.Name,
×
541
                leaderworkerset.WorkerIndexLabelKey: "0",
×
542
        })
×
543
        var leaderPodList corev1.PodList
×
544
        if err := r.List(ctx, &leaderPodList, podSelector, client.InNamespace(lws.Namespace)); err != nil {
×
545
                return nil, err
×
546
        }
×
547

548
        // Get a sorted leader pod list matches with the following sorted statefulsets one by one, which means
549
        // the leader pod and the corresponding worker statefulset has the same index.
550
        sortedPods := utils.SortByIndex(func(pod corev1.Pod) (int, error) {
×
551
                return strconv.Atoi(pod.Labels[leaderworkerset.GroupIndexLabelKey])
×
552
        }, leaderPodList.Items, int(stsReplicas))
×
553

554
        stsSelector := client.MatchingLabels(map[string]string{
×
555
                leaderworkerset.SetNameLabelKey: lws.Name,
×
556
        })
×
557
        var stsList appsv1.StatefulSetList
×
558
        if err := r.List(ctx, &stsList, stsSelector, client.InNamespace(lws.Namespace)); err != nil {
×
559
                return nil, err
×
560
        }
×
561
        sortedSts := utils.SortByIndex(func(sts appsv1.StatefulSet) (int, error) {
×
562
                return strconv.Atoi(sts.Labels[leaderworkerset.GroupIndexLabelKey])
×
563
        }, stsList.Items, int(stsReplicas))
×
564

565
        // Once size==1, no worker statefulSets will be created.
566
        noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1
×
567

×
568
        for idx := int32(0); idx < stsReplicas; idx++ {
×
569
                nominatedName := fmt.Sprintf("%s-%d", lws.Name, idx)
×
570
                // It can happen that the leader pod or the worker statefulset hasn't created yet
×
571
                // or under rebuilding, which also indicates not ready.
×
572
                if nominatedName != sortedPods[idx].Name || (!noWorkerSts && nominatedName != sortedSts[idx].Name) {
×
573
                        states[idx] = replicaState{
×
574
                                ready:   false,
×
575
                                updated: false,
×
576
                        }
×
577
                        continue
×
578
                }
579

580
                leaderUpdated := revisionutils.GetRevisionKey(&sortedPods[idx]) == revisionKey
×
581
                leaderReady := podutils.PodRunningAndReady(sortedPods[idx])
×
582

×
583
                if noWorkerSts {
×
584
                        states[idx] = replicaState{
×
585
                                ready:   leaderReady,
×
586
                                updated: leaderUpdated,
×
587
                        }
×
588
                        continue
×
589
                }
590

591
                workersUpdated := revisionutils.GetRevisionKey(&sortedSts[idx]) == revisionKey
×
592
                workersReady := statefulsetutils.StatefulsetReady(sortedSts[idx])
×
593

×
594
                states[idx] = replicaState{
×
595
                        ready:   leaderReady && workersReady,
×
596
                        updated: leaderUpdated && workersUpdated,
×
597
                }
×
598
        }
599

600
        return states, nil
×
601
}
602

603
func rollingUpdatePartition(states []replicaState, stsReplicas int32, rollingStep int32, currentPartition int32) int32 {
×
604
        continuousReadyReplicas := calculateContinuousReadyReplicas(states)
×
605

×
606
        // Update up to rollingStep replicas at once.
×
607
        var rollingStepPartition = utils.NonZeroValue(stsReplicas - continuousReadyReplicas - rollingStep)
×
608

×
609
        // rollingStepPartition calculation above disregards the state of replicas with idx<rollingStepPartition.
×
610
        // To prevent violating the maxUnavailable, we have to account for these replicas and increase the partition if some are not ready.
×
611
        var unavailable int32
×
612
        for idx := 0; idx < int(rollingStepPartition); idx++ {
×
613
                if !states[idx].ready {
×
614
                        unavailable++
×
615
                }
×
616
        }
617
        var partition = rollingStepPartition + unavailable
×
618

×
619
        // Reduce the partition if replicas are continously not ready. It is safe since updating these replicas does not impact
×
620
        // the availability of the LWS. This is important to prevent update from getting stuck in case maxUnavailable is already violated
×
621
        // (for example, all replicas are not ready when rolling update is started).
×
622
        // Note that we never drop the partition below rolliingStepPartition.
×
623
        for idx := min(partition, stsReplicas-1); idx >= rollingStepPartition; idx-- {
×
624
                if !states[idx].ready || states[idx].updated {
×
625
                        partition = idx
×
626
                } else {
×
627
                        break
×
628
                }
629
        }
630

631
        // That means Partition moves in one direction to make it simple.
632
        return min(partition, currentPartition)
×
633
}
634

635
func calculateLWSUnreadyReplicas(states []replicaState, lwsReplicas int32) int32 {
×
636
        var unreadyCount int32
×
637
        for idx := int32(0); idx < lwsReplicas; idx++ {
×
638
                if idx >= int32(len(states)) || !states[idx].ready || !states[idx].updated {
×
639
                        unreadyCount++
×
640
                }
×
641
        }
642
        return unreadyCount
×
643
}
644

645
func calculateContinuousReadyReplicas(states []replicaState) int32 {
×
646
        // Count ready replicas at tail (from last index down)
×
647
        var continuousReadyCount int32
×
648
        for idx := len(states) - 1; idx >= 0; idx-- {
×
649
                if !states[idx].ready || !states[idx].updated {
×
650
                        break
×
651
                }
652
                continuousReadyCount++
×
653
        }
654
        return continuousReadyCount
×
655
}
656

657
func (r *LeaderWorkerSetReconciler) getLeaderStatefulSet(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) (*appsv1.StatefulSet, error) {
×
658
        sts := &appsv1.StatefulSet{}
×
659
        err := r.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, sts)
×
660
        if err != nil {
×
661
                if apierrors.IsNotFound(err) {
×
662
                        return nil, nil
×
663
                }
×
664
                return nil, err
×
665
        }
666
        return sts, nil
×
667
}
668

669
func (r *LeaderWorkerSetReconciler) getOrCreateRevisionIfNonExist(ctx context.Context, sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet, recorder record.EventRecorder) (*appsv1.ControllerRevision, error) {
×
670
        revisionKey := ""
×
671
        if sts != nil {
×
672
                // Uses the hash in the leader sts to avoid detecting update in the case where LWS controller is upgraded from a version where
×
673
                // the revisionKey was used to detect update instead of controller revision.
×
674
                revisionKey = revisionutils.GetRevisionKey(sts)
×
675
        }
×
676
        if stsRevision, err := revisionutils.GetRevision(ctx, r.Client, lws, revisionKey); stsRevision != nil || err != nil {
×
677
                return stsRevision, err
×
678
        }
×
679
        revision, err := revisionutils.NewRevision(ctx, r.Client, lws, revisionKey)
×
680
        if err != nil {
×
681
                return nil, err
×
682
        }
×
683
        newRevision, err := revisionutils.CreateRevision(ctx, r.Client, revision)
×
684
        if err == nil {
×
685
                message := fmt.Sprintf("Creating revision with key %s for a newly created LeaderWorkerSet", revision.Labels[leaderworkerset.RevisionKey])
×
686
                if revisionKey != "" {
×
687
                        message = fmt.Sprintf("Creating missing revision with key %s for existing LeaderWorkerSet", revision.Labels[leaderworkerset.RevisionKey])
×
688
                }
×
689
                recorder.Eventf(lws, corev1.EventTypeNormal, CreatingRevision, message)
×
690
        }
691
        return newRevision, err
×
692
}
693

694
func (r *LeaderWorkerSetReconciler) getUpdatedRevision(ctx context.Context, sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet, revision *appsv1.ControllerRevision) (*appsv1.ControllerRevision, error) {
×
695
        if sts == nil {
×
696
                return nil, nil
×
697
        }
×
698

699
        currentRevision, err := revisionutils.NewRevision(ctx, r.Client, lws, "")
×
700
        if err != nil {
×
701
                return nil, err
×
702
        }
×
703

704
        if !revisionutils.EqualRevision(currentRevision, revision) {
×
705
                return currentRevision, nil
×
706
        }
×
707

708
        return nil, nil
×
709
}
710

711
// constructLeaderStatefulSetApplyConfiguration constructs the applied configuration for the leader StatefulSet
712
func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32, revisionKey string) (*appsapplyv1.StatefulSetApplyConfiguration, error) {
6✔
713
        var podTemplateSpec corev1.PodTemplateSpec
6✔
714
        if lws.Spec.LeaderWorkerTemplate.LeaderTemplate != nil {
8✔
715
                podTemplateSpec = *lws.Spec.LeaderWorkerTemplate.LeaderTemplate.DeepCopy()
2✔
716
        } else {
6✔
717
                podTemplateSpec = *lws.Spec.LeaderWorkerTemplate.WorkerTemplate.DeepCopy()
4✔
718
        }
4✔
719
        // construct pod template spec configuration
720
        obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&podTemplateSpec)
6✔
721
        if err != nil {
6✔
722
                return nil, err
×
723
        }
×
724
        var podTemplateApplyConfiguration coreapplyv1.PodTemplateSpecApplyConfiguration
6✔
725
        err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj, &podTemplateApplyConfiguration)
6✔
726
        if err != nil {
6✔
727
                return nil, err
×
728
        }
×
729

730
        podTemplateApplyConfiguration.WithLabels(map[string]string{
6✔
731
                leaderworkerset.WorkerIndexLabelKey: "0",
6✔
732
                leaderworkerset.SetNameLabelKey:     lws.Name,
6✔
733
                leaderworkerset.RevisionKey:         revisionKey,
6✔
734
        })
6✔
735
        podAnnotations := make(map[string]string)
6✔
736
        podAnnotations[leaderworkerset.SizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.Size))
6✔
737
        if lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey] != "" {
8✔
738
                podAnnotations[leaderworkerset.ExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey]
2✔
739
        }
2✔
740
        if lws.Spec.LeaderWorkerTemplate.SubGroupPolicy != nil {
7✔
741
                podAnnotations[leaderworkerset.SubGroupPolicyTypeAnnotationKey] = (string(*lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.Type))
1✔
742
                podAnnotations[leaderworkerset.SubGroupSizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.SubGroupSize))
1✔
743
                if lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] != "" {
2✔
744
                        podAnnotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey]
1✔
745
                }
1✔
746
        }
747

748
        if lws.Spec.NetworkConfig != nil && *lws.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainUniquePerReplica {
6✔
749
                podAnnotations[leaderworkerset.SubdomainPolicyAnnotationKey] = string(leaderworkerset.SubdomainUniquePerReplica)
×
750
        }
×
751

752
        podTemplateApplyConfiguration.WithAnnotations(podAnnotations)
6✔
753

6✔
754
        // construct statefulset apply configuration
6✔
755
        statefulSetConfig := appsapplyv1.StatefulSet(lws.Name, lws.Namespace).
6✔
756
                WithSpec(appsapplyv1.StatefulSetSpec().
6✔
757
                        WithServiceName(lws.Name).
6✔
758
                        WithReplicas(replicas).
6✔
759
                        WithPodManagementPolicy(appsv1.ParallelPodManagement).
6✔
760
                        WithTemplate(&podTemplateApplyConfiguration).
6✔
761
                        WithUpdateStrategy(appsapplyv1.StatefulSetUpdateStrategy().WithType(appsv1.StatefulSetUpdateStrategyType(lws.Spec.RolloutStrategy.Type)).WithRollingUpdate(
6✔
762
                                appsapplyv1.RollingUpdateStatefulSetStrategy().WithMaxUnavailable(lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxUnavailable).WithPartition(partition),
6✔
763
                        )).
6✔
764
                        WithSelector(metaapplyv1.LabelSelector().
6✔
765
                                WithMatchLabels(map[string]string{
6✔
766
                                        leaderworkerset.SetNameLabelKey:     lws.Name,
6✔
767
                                        leaderworkerset.WorkerIndexLabelKey: "0",
6✔
768
                                }))).
6✔
769
                WithLabels(map[string]string{
6✔
770
                        leaderworkerset.SetNameLabelKey: lws.Name,
6✔
771
                        leaderworkerset.RevisionKey:     revisionKey,
6✔
772
                }).
6✔
773
                WithAnnotations(map[string]string{
6✔
774
                        leaderworkerset.ReplicasAnnotationKey: strconv.Itoa(int(*lws.Spec.Replicas)),
6✔
775
                })
6✔
776
        return statefulSetConfig, nil
6✔
777
}
6✔
778

7✔
779
func makeCondition(conditionType leaderworkerset.LeaderWorkerSetConditionType) metav1.Condition {
1✔
780
        var condtype, reason, message string
1✔
781
        switch conditionType {
782
        case leaderworkerset.LeaderWorkerSetAvailable:
7✔
783
                condtype = string(leaderworkerset.LeaderWorkerSetAvailable)
1✔
784
                reason = "AllGroupsReady"
1✔
785
                message = "All replicas are ready"
1✔
786
        case leaderworkerset.LeaderWorkerSetUpdateInProgress:
1✔
787
                condtype = string(leaderworkerset.LeaderWorkerSetUpdateInProgress)
1✔
788
                reason = GroupsUpdating
1✔
789
                message = "Rolling Upgrade is in progress"
6✔
790
        default:
791
                condtype = string(leaderworkerset.LeaderWorkerSetProgressing)
792
                reason = GroupsProgressing
×
793
                message = "Replicas are progressing"
×
794
        }
×
795

×
796
        condition := metav1.Condition{
×
797
                Type:               condtype,
×
798
                Status:             metav1.ConditionStatus(corev1.ConditionTrue),
×
799
                LastTransitionTime: metav1.Now(),
×
800
                Reason:             reason,
×
801
                Message:            message,
×
802
        }
×
803
        return condition
×
804
}
×
805

×
806
func setConditions(lws *leaderworkerset.LeaderWorkerSet, conditions []metav1.Condition) bool {
×
807
        shouldUpdate := false
808
        for _, condition := range conditions {
809
                shouldUpdate = shouldUpdate || setCondition(lws, condition)
×
810
        }
×
811

×
812
        return shouldUpdate
×
813
}
×
814

×
815
func setCondition(lws *leaderworkerset.LeaderWorkerSet, newCondition metav1.Condition) bool {
×
816
        newCondition.LastTransitionTime = metav1.Now()
×
817
        found := false
818
        shouldUpdate := false
819

×
820
        // Precondition: newCondition has status true.
×
821
        for i, curCondition := range lws.Status.Conditions {
×
822
                if newCondition.Type == curCondition.Type {
×
823
                        if newCondition.Status != curCondition.Status {
×
824
                                // the conditions match but one is true and one is false. Update the stored condition
825
                                // with the new condition.
×
826
                                lws.Status.Conditions[i] = newCondition
827
                                shouldUpdate = true
828
                        }
6✔
829
                        // if both are true or both are false, do nothing.
6✔
830
                        found = true
6✔
831
                } else {
6✔
832
                        // if the conditions are not of the same type, do nothing unless one is Progressing and one is
6✔
833
                        // Available and both are true. Must be mutually exclusive.
6✔
834
                        if exclusiveConditionTypes(curCondition, newCondition) &&
11✔
835
                                (newCondition.Status == metav1.ConditionTrue) && (curCondition.Status == metav1.ConditionTrue) {
7✔
836
                                // Progressing is true and Available is true. Prevent this.
3✔
837
                                lws.Status.Conditions[i].Status = metav1.ConditionFalse
1✔
838
                                shouldUpdate = true
1✔
839
                        }
1✔
840
                }
1✔
841
        }
1✔
842
        // condition doesn't exist, update only if the status is true
843
        if newCondition.Status == metav1.ConditionTrue && !found {
2✔
844
                lws.Status.Conditions = append(lws.Status.Conditions, newCondition)
3✔
845
                shouldUpdate = true
3✔
846
        }
3✔
847
        return shouldUpdate
3✔
848
}
4✔
849

1✔
850
func exclusiveConditionTypes(condition1 metav1.Condition, condition2 metav1.Condition) bool {
1✔
851
        if (condition1.Type == string(leaderworkerset.LeaderWorkerSetAvailable) && condition2.Type == string(leaderworkerset.LeaderWorkerSetProgressing)) ||
1✔
852
                (condition1.Type == string(leaderworkerset.LeaderWorkerSetProgressing) && condition2.Type == string(leaderworkerset.LeaderWorkerSetAvailable)) {
1✔
853
                return true
854
        }
855

856
        if (condition1.Type == string(leaderworkerset.LeaderWorkerSetAvailable) && condition2.Type == string(leaderworkerset.LeaderWorkerSetUpdateInProgress)) ||
9✔
857
                (condition1.Type == string(leaderworkerset.LeaderWorkerSetUpdateInProgress) && condition2.Type == string(leaderworkerset.LeaderWorkerSetAvailable)) {
3✔
858
                return true
3✔
859
        }
3✔
860

6✔
861
        return false
862
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc