• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / lws / 25123894484

29 Apr 2026 05:29PM UTC coverage: 44.419% (-0.3%) from 44.734%
25123894484

push

github

web-flow
add status.observedGeneration field for lws (#807)

* update condition metadata during exclusive state transitions

* update condition metadata during exclusive state transitions

* update condition metadata during exclusive state transitions

* update condition metadata during exclusive state transitions

* update condition metadata during exclusive state transitions

* update generated CRDs and manifests

2 of 19 new or added lines in 1 file covered. (10.53%)

4 existing lines in 1 file now uncovered.

1182 of 2661 relevant lines covered (44.42%)

2.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.04
/pkg/controllers/leaderworkerset_controller.go
1
/*
2
Copyright 2023.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controllers
18

19
import (
20
        "context"
21
        "fmt"
22
        "strconv"
23
        "time"
24

25
        appsv1 "k8s.io/api/apps/v1"
26
        corev1 "k8s.io/api/core/v1"
27
        apierrors "k8s.io/apimachinery/pkg/api/errors"
28
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
        "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        "k8s.io/apimachinery/pkg/util/intstr"
33
        appsapplyv1 "k8s.io/client-go/applyconfigurations/apps/v1"
34
        coreapplyv1 "k8s.io/client-go/applyconfigurations/core/v1"
35
        metaapplyv1 "k8s.io/client-go/applyconfigurations/meta/v1"
36
        "k8s.io/client-go/tools/events"
37
        "k8s.io/klog/v2"
38
        "k8s.io/utils/lru"
39
        "k8s.io/utils/ptr"
40
        ctrl "sigs.k8s.io/controller-runtime"
41
        "sigs.k8s.io/controller-runtime/pkg/client"
42
        "sigs.k8s.io/controller-runtime/pkg/handler"
43
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
44

45
        leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1"
46
        "sigs.k8s.io/lws/pkg/utils"
47
        controllerutils "sigs.k8s.io/lws/pkg/utils/controller"
48
        podutils "sigs.k8s.io/lws/pkg/utils/pod"
49
        revisionutils "sigs.k8s.io/lws/pkg/utils/revision"
50
        statefulsetutils "sigs.k8s.io/lws/pkg/utils/statefulset"
51
)
52

53
// LeaderWorkerSetReconciler reconciles a LeaderWorkerSet object
54
type LeaderWorkerSetReconciler struct {
55
        client.Client
56
        Scheme *runtime.Scheme
57
        Record events.EventRecorder
58

59
        revisionEqualityCache *lru.Cache
60
}
61

62
var (
63
        apiGVStr = leaderworkerset.GroupVersion.String()
64
)
65

66
const (
67
        lwsOwnerKey  = ".metadata.controller"
68
        fieldManager = "lws"
69
)
70

71
const (
72
        // FailedCreate Event reason used when a resource creation fails.
73
        // The event uses the error(s) as the reason.
74
        FailedCreate      = "FailedCreate"
75
        GroupsProgressing = "GroupsProgressing"
76
        GroupsUpdating    = "GroupsUpdating"
77
        CreatingRevision  = "CreatingRevision"
78

79
        // Event actions
80
        Create = "Create"
81
        Update = "Update"
82
        Delete = "Delete"
83
)
84

85
// maxRevisionEqualityCacheEntries is the cache size for semantic revision equality results.
86
const maxRevisionEqualityCacheEntries = 10_000
87

88
func NewLeaderWorkerSetReconciler(client client.Client, scheme *runtime.Scheme, record events.EventRecorder) *LeaderWorkerSetReconciler {
×
89
        return &LeaderWorkerSetReconciler{
×
90
                Client:                client,
×
91
                Scheme:                scheme,
×
92
                Record:                record,
×
93
                revisionEqualityCache: lru.New(maxRevisionEqualityCacheEntries),
×
94
        }
×
95
}
×
96

97
//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch;get;list
98
//+kubebuilder:rbac:groups=events.k8s.io,resources=events,verbs=create;watch;update;patch;get;list
99
//+kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets,verbs=get;list;watch;create;update;patch;delete
100
//+kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets/status,verbs=get;update;patch
101
//+kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets/finalizers,verbs=update
102
//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
103
//+kubebuilder:rbac:groups=apps,resources=statefulsets/status,verbs=get;update;patch
104
//+kubebuilder:rbac:groups=apps,resources=statefulsets/finalizers,verbs=update
105
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
106
//+kubebuilder:rbac:groups=apps,resources=controllerrevisions,verbs=get;list;watch;create;update;patch;delete
107
//+kubebuilder:rbac:groups=apps,resources=controllerrevisions/status,verbs=get;update;patch
108
//+kubebuilder:rbac:groups=apps,resources=controllerrevisions/finalizers,verbs=update
109

110
func (r *LeaderWorkerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
×
111
        // Get leaderworkerset object
×
112
        lws := &leaderworkerset.LeaderWorkerSet{}
×
113
        if err := r.Get(ctx, types.NamespacedName{Name: req.Name, Namespace: req.Namespace}, lws); err != nil {
×
114
                return ctrl.Result{}, client.IgnoreNotFound(err)
×
115
        }
×
116

117
        if lws.DeletionTimestamp != nil {
×
118
                return ctrl.Result{}, nil
×
119
        }
×
120

121
        log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws))
×
122
        ctx = ctrl.LoggerInto(ctx, log)
×
123

×
124
        leaderSts, err := r.getLeaderStatefulSet(ctx, lws)
×
125
        if err != nil {
×
126
                log.Error(err, "Fetching leader statefulset")
×
127
                return ctrl.Result{}, err
×
128
        }
×
129

130
        if leaderSts != nil && leaderSts.DeletionTimestamp != nil {
×
131
                return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
×
132
        }
×
133

134
        // Handles two cases:
135
        // Case 1: Upgrading the LWS controller from a version that doesn't support controller revision
136
        // Case 2: Creating the controller revision for a newly created LWS object
137
        revision, err := r.getOrCreateRevisionIfNonExist(ctx, leaderSts, lws, r.Record)
×
138
        if err != nil {
×
139
                log.Error(err, "Creating controller revision")
×
140
                return ctrl.Result{}, err
×
141
        }
×
142

143
        updatedRevision, err := r.getUpdatedRevision(ctx, leaderSts, lws, revision)
×
144
        if err != nil {
×
145
                log.Error(err, "Validating if LWS has been updated")
×
146
                return ctrl.Result{}, err
×
147
        }
×
148
        lwsUpdated := updatedRevision != nil
×
149
        if lwsUpdated {
×
150
                revision, err = revisionutils.CreateRevision(ctx, r.Client, updatedRevision)
×
151
                if err != nil {
×
152
                        log.Error(err, "Creating revision for updated LWS")
×
153
                        return ctrl.Result{}, err
×
154
                }
×
155
                r.Record.Eventf(lws, revision, corev1.EventTypeNormal, CreatingRevision, Create, fmt.Sprintf("Creating revision with key %s for updated LWS", revisionutils.GetRevisionKey(revision)))
×
156
        }
157

158
        partition, replicas, err := r.rollingUpdateParameters(ctx, lws, leaderSts, revisionutils.GetRevisionKey(revision), lwsUpdated)
×
159
        if err != nil {
×
160
                log.Error(err, "Rolling partition error")
×
161
                return ctrl.Result{}, err
×
162
        }
×
163

164
        if err := r.SSAWithStatefulset(ctx, lws, partition, replicas, revisionutils.GetRevisionKey(revision)); err != nil {
×
165
                if leaderSts == nil {
×
166
                        r.Record.Eventf(lws, nil, corev1.EventTypeWarning, FailedCreate, Create, fmt.Sprintf("Failed to create leader statefulset %s", lws.Name))
×
167
                }
×
168
                return ctrl.Result{}, err
×
169
        }
170

171
        if leaderSts == nil {
×
172
                // An event is logged to track sts creation.
×
173
                r.Record.Eventf(lws, revision, corev1.EventTypeNormal, GroupsProgressing, Create, fmt.Sprintf("Created leader statefulset %s", lws.Name))
×
174
        } else if !lwsUpdated && partition != *leaderSts.Spec.UpdateStrategy.RollingUpdate.Partition {
×
175
                // An event is logged to track update progress.
×
176
                oldPartition := *leaderSts.Spec.UpdateStrategy.RollingUpdate.Partition
×
177
                var updateMsg string
×
178
                if oldPartition-1 == partition {
×
179
                        updateMsg = fmt.Sprintf("Updating replica %d", partition)
×
180
                } else {
×
181
                        updateMsg = fmt.Sprintf("Updating replicas %d to %d (inclusive)", partition, oldPartition-1)
×
182
                }
×
183
                r.Record.Eventf(lws, revision, corev1.EventTypeNormal, GroupsUpdating, Update, updateMsg)
×
184
        }
185

186
        // Create headless service if it does not exist.
187
        if err := r.reconcileHeadlessServices(ctx, lws); err != nil {
×
188
                log.Error(err, "Creating headless service.")
×
189
                r.Record.Eventf(lws, nil, corev1.EventTypeWarning, FailedCreate, Create, fmt.Sprintf("Failed to create headless service for error: %v", err))
×
190
                return ctrl.Result{}, err
×
191
        }
×
192

193
        updateDone, err := r.updateStatus(ctx, lws, revisionutils.GetRevisionKey(revision))
×
194
        if err != nil {
×
195
                if apierrors.IsConflict(err) {
×
196
                        return ctrl.Result{Requeue: true}, nil
×
197
                }
×
198
                return ctrl.Result{}, err
×
199
        }
200

201
        if updateDone {
×
202
                if err := revisionutils.TruncateRevisions(ctx, r.Client, lws, revisionutils.GetRevisionKey(revision)); err != nil {
×
203
                        return ctrl.Result{}, err
×
204
                }
×
205
        }
206
        log.V(2).Info("Leader Reconcile completed.")
×
207
        return ctrl.Result{}, nil
×
208
}
209

210
func (r *LeaderWorkerSetReconciler) reconcileHeadlessServices(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) error {
×
211
        if lws.Spec.NetworkConfig == nil || *lws.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainShared {
×
212
                if err := controllerutils.CreateHeadlessServiceIfNotExists(ctx, r.Client, r.Scheme, lws, lws.Name, map[string]string{leaderworkerset.SetNameLabelKey: lws.Name}, lws); err != nil {
×
213
                        return err
×
214
                }
×
215
                return nil
×
216
        }
217
        return nil
×
218
}
219

220
// SetupWithManager sets up the controller with the Manager.
221
func (r *LeaderWorkerSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
×
222
        return ctrl.NewControllerManagedBy(mgr).
×
223
                For(&leaderworkerset.LeaderWorkerSet{}).
×
224
                Owns(&appsv1.StatefulSet{}).
×
225
                Owns(&corev1.Service{}).
×
226
                Watches(&appsv1.StatefulSet{},
×
227
                        handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request {
×
228
                                return []reconcile.Request{
×
229
                                        {NamespacedName: types.NamespacedName{
×
230
                                                Name:      a.GetLabels()[leaderworkerset.SetNameLabelKey],
×
231
                                                Namespace: a.GetNamespace(),
×
232
                                        }},
×
233
                                }
×
234
                        })).
×
235
                Complete(r)
236
}
237

238
func SetupIndexes(indexer client.FieldIndexer) error {
×
239
        return indexer.IndexField(context.Background(), &appsv1.StatefulSet{}, lwsOwnerKey, func(rawObj client.Object) []string {
×
240
                // grab the statefulSet object, extract the owner...
×
241
                statefulSet := rawObj.(*appsv1.StatefulSet)
×
242
                owner := metav1.GetControllerOf(statefulSet)
×
243
                if owner == nil {
×
244
                        return nil
×
245
                }
×
246
                // ...make sure it's a LeaderWorkerSet...
247
                if owner.APIVersion != apiGVStr || owner.Kind != "LeaderWorkerSet" {
×
248
                        return nil
×
249
                }
×
250
                // ...and if so, return it
251
                return []string{owner.Name}
×
252
        })
253
}
254

255
// Rolling update will always wait for the former replica to be ready then process the next one,
256
// we didn't consider rollout strategy type here since we only support rollingUpdate now,
257
// once we have more policies, we should update the logic here.
258
// Possible scenarios for Partition:
259
//   - When sts is under creation, partition is always 0 because pods are created in parallel, rolling update is not relevant here.
260
//   - When sts is in rolling update, the partition will start from the last index to the index 0 processing in maxUnavailable step.
261
//   - When sts is in rolling update, and Replicas increases, we'll delay the rolling update until the scaling up is done,
262
//     Partition will not change, new replicas are created using the new template from the get go.
263
//   - When sts is rolling update, and Replicas decreases, the partition will not change until new Replicas < Partition,
264
//     in which case Partition will be reset to the new Replicas value.
265
//   - When sts is ready for a rolling update and Replicas increases at the same time, we'll delay the rolling update until
266
//     the scaling up is done.
267
//   - When sts is ready for a rolling update and Replicas decreases at the same time, we'll start the rolling update
268
//     together with scaling down.
269
//
270
// At rest, Partition should always be zero.
271
//
272
// For Replicas:
273
//   - When rolling update, Replicas is equal to (spec.Replicas+maxSurge)
274
//   - Otherwise, Replicas is equal to spec.Replicas
275
//   - One exception here is when unready replicas of leaderWorkerSet is equal to MaxSurge,
276
//     we should reclaim the extra replicas gradually to accommodate for the new replicas.
277
func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, sts *appsv1.StatefulSet, revisionKey string, leaderWorkerSetUpdated bool) (stsPartition int32, replicas int32, err error) {
3✔
278
        log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws))
3✔
279
        ctx = ctrl.LoggerInto(ctx, log)
3✔
280
        lwsReplicas := *lws.Spec.Replicas
3✔
281

3✔
282
        defer func() {
6✔
283
                // Limit the replicas with less than lwsPartition will not be updated.
3✔
284
                stsPartition = max(stsPartition, *lws.Spec.RolloutStrategy.RollingUpdateConfiguration.Partition)
3✔
285
        }()
3✔
286

287
        // Case 1:
288
        // If sts not created yet, all partitions should be updated,
289
        // replicas should not change.
290
        if sts == nil {
3✔
291
                return 0, lwsReplicas, nil
×
292
        }
×
293

294
        stsReplicas := *sts.Spec.Replicas
3✔
295
        maxSurge, err := intstr.GetScaledValueFromIntOrPercent(&lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxSurge, int(lwsReplicas), true)
3✔
296
        if err != nil {
3✔
297
                return 0, 0, err
×
298
        }
×
299
        maxUnavailable, err := intstr.GetScaledValueFromIntOrPercent(&lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxUnavailable, int(lwsReplicas), false)
3✔
300
        if err != nil {
3✔
301
                return 0, 0, err
×
302
        }
×
303
        // No need to burst more than the replicas.
304
        if maxSurge > int(lwsReplicas) {
3✔
305
                maxSurge = int(lwsReplicas)
×
306
        }
×
307
        burstReplicas := lwsReplicas + int32(maxSurge)
3✔
308

3✔
309
        // wantReplicas calculates the final replicas if needed.
3✔
310
        wantReplicas := func(unreadyReplicas int32) int32 {
4✔
311
                finalReplicas := calculateRollingUpdateReplicas(lwsReplicas, int32(maxSurge), int32(maxUnavailable), unreadyReplicas)
1✔
312
                if finalReplicas == stsReplicas-1 {
1✔
313
                        r.Record.Eventf(lws, nil, corev1.EventTypeNormal, GroupsProgressing, Delete, fmt.Sprintf("deleting surge replica %s-%d", lws.Name, finalReplicas))
×
314
                } else if finalReplicas < stsReplicas {
1✔
315
                        r.Record.Eventf(lws, nil, corev1.EventTypeNormal, GroupsProgressing, Delete, fmt.Sprintf("deleting surge replicas from %s-%d to %s-%d", lws.Name, finalReplicas, lws.Name, stsReplicas-1))
×
316
                }
×
317
                return finalReplicas
1✔
318
        }
319

320
        // Case 2:
321
        // Indicates a new rolling update here.
322
        if leaderWorkerSetUpdated {
5✔
323
                // Processing scaling up/down first prior to rolling update.
2✔
324
                partition := min(lwsReplicas, stsReplicas)
2✔
325
                if stsReplicas < lwsReplicas {
3✔
326
                        return partition, lwsReplicas, nil
1✔
327
                }
1✔
328
                return partition, wantReplicas(lwsReplicas), nil
1✔
329
        }
330

331
        partition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition
1✔
332
        rollingUpdateCompleted := partition == 0 && stsReplicas == lwsReplicas
1✔
333
        // Case 3:
1✔
334
        // In normal cases, return the values directly.
1✔
335
        if rollingUpdateCompleted {
1✔
336
                return 0, lwsReplicas, nil
×
337
        }
×
338
        if stsReplicas < lwsReplicas {
2✔
339
                return partition, lwsReplicas, nil
1✔
340
        }
1✔
341

342
        states, err := r.getReplicaStates(ctx, lws, stsReplicas, revisionKey)
×
343
        if err != nil {
×
344
                return 0, 0, err
×
345
        }
×
346
        lwsUnreadyReplicas := calculateLWSUnreadyReplicas(states, lwsReplicas)
×
347

×
348
        originalLwsReplicas, err := strconv.Atoi(sts.Annotations[leaderworkerset.ReplicasAnnotationKey])
×
349
        if err != nil {
×
350
                return 0, 0, err
×
351
        }
×
352
        replicasUpdated := originalLwsReplicas != int(*lws.Spec.Replicas)
×
353
        // Case 4:
×
354
        // Replicas changed during rolling update.
×
355
        if replicasUpdated {
×
356
                partition := min(partition, burstReplicas)
×
357
                return partition, wantReplicas(lwsUnreadyReplicas), nil
×
358
        }
×
359

360
        // Case 5:
361
        // Calculating the Partition during rolling update, no leaderWorkerSet updates happens.
362

363
        rollingStep := maxUnavailable
×
364
        // Make sure that we always respect the maxUnavailable, or
×
365
        // we'll violate it when reclaiming bursted replicas.
×
366
        rollingStep += maxSurge - (int(burstReplicas) - int(stsReplicas))
×
367

×
368
        partition = rollingUpdatePartition(states, stsReplicas, int32(rollingStep), partition)
×
369
        return partition, wantReplicas(lwsUnreadyReplicas), nil
×
370
}
371

372
func (r *LeaderWorkerSetReconciler) SSAWithStatefulset(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32, revisionKey string) error {
×
373
        log := ctrl.LoggerFrom(ctx)
×
374

×
375
        // construct the statefulset apply configuration
×
376
        leaderStatefulSetApplyConfig, err := constructLeaderStatefulSetApplyConfiguration(lws, partition, replicas, revisionKey)
×
377
        if err != nil {
×
378
                log.Error(err, "Constructing StatefulSet apply configuration.")
×
379
                return err
×
380
        }
×
381
        if err := setControllerReferenceWithStatefulSet(lws, leaderStatefulSetApplyConfig, r.Scheme); err != nil {
×
382
                log.Error(err, "Setting controller reference.")
×
383
                return err
×
384
        }
×
385
        obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(leaderStatefulSetApplyConfig)
×
386
        if err != nil {
×
387
                log.Error(err, "Converting StatefulSet configuration to json.")
×
388
                return err
×
389
        }
×
390
        patch := &unstructured.Unstructured{
×
391
                Object: obj,
×
392
        }
×
393
        // Use server side apply and add fieldmanager to the lws owned fields
×
394
        // If there are conflicts in the fields owned by the lws controller, lws will obtain the ownership and force override
×
395
        // these fields to the ones desired by the lws controller
×
396
        // TODO b/316776287 add E2E test for SSA
×
397
        // TODO: Deprecated: Use client.Client.Apply() and client.Client.SubResource("subrsource").Apply() instead.
×
398
        err = r.Patch(ctx, patch, client.Apply, &client.PatchOptions{ //nolint
×
399
                FieldManager: fieldManager,
×
400
                Force:        ptr.To[bool](true),
×
401
        })
×
402
        if err != nil {
×
403
                log.Error(err, "Using server side apply to update leader statefulset")
×
404
                return err
×
405
        }
×
406

407
        return nil
×
408
}
409

410
// updates the condition of the leaderworkerset to either Progressing or Available.
411
func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, revisionKey string) (bool, bool, error) {
×
412
        log := ctrl.LoggerFrom(ctx)
×
413
        podSelector := client.MatchingLabels(map[string]string{
×
414
                leaderworkerset.SetNameLabelKey:     lws.Name,
×
415
                leaderworkerset.WorkerIndexLabelKey: "0",
×
416
        })
×
417
        leaderPodList := &corev1.PodList{}
×
418
        if err := r.List(ctx, leaderPodList, podSelector, client.InNamespace(lws.Namespace)); err != nil {
×
419
                log.Error(err, "Fetching leaderPods")
×
420
                return false, false, err
×
421
        }
×
422

423
        updateStatus := false
×
424
        readyCount, updatedCount, readyNonBurstWorkerCount := 0, 0, 0
×
425
        partitionedUpdatedNonBurstCount, partitionedCurrentNonBurstCount, partitionedUpdatedAndReadyCount := 0, 0, 0
×
426
        noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1
×
427
        lwsPartition := *lws.Spec.RolloutStrategy.RollingUpdateConfiguration.Partition
×
428

×
429
        // Iterate through all leaderPods.
×
430
        for _, pod := range leaderPodList.Items {
×
431
                index, err := strconv.Atoi(pod.Labels[leaderworkerset.GroupIndexLabelKey])
×
432
                if err != nil {
×
433
                        return false, false, err
×
434
                }
×
435

436
                var sts appsv1.StatefulSet
×
437
                if !noWorkerSts {
×
438
                        if err := r.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: pod.Name}, &sts); err != nil {
×
439
                                if client.IgnoreNotFound(err) != nil {
×
440
                                        log.Error(err, "Fetching worker statefulSet")
×
441
                                        return false, false, err
×
442
                                }
×
443
                                continue
×
444
                        }
445
                }
446

447
                if index < int(*lws.Spec.Replicas) && index >= int(lwsPartition) {
×
448
                        partitionedCurrentNonBurstCount++
×
449
                }
×
450

451
                var ready, updated bool
×
452
                if (noWorkerSts || statefulsetutils.StatefulsetReady(sts)) && podutils.PodRunningAndReady(pod) {
×
453
                        ready = true
×
454
                        readyCount++
×
455
                }
×
456
                if (noWorkerSts || revisionutils.GetRevisionKey(&sts) == revisionKey) && revisionutils.GetRevisionKey(&pod) == revisionKey {
×
457
                        updated = true
×
458
                        updatedCount++
×
459
                        if index < int(*lws.Spec.Replicas) && index >= int(lwsPartition) {
×
460
                                // Bursted replicas do not count when determining if rollingUpdate has been completed.
×
461
                                partitionedUpdatedNonBurstCount++
×
462
                        }
×
463
                }
464

465
                if index < int(*lws.Spec.Replicas) {
×
466
                        if ready {
×
467
                                readyNonBurstWorkerCount++
×
468
                        }
×
469
                        if index >= int(lwsPartition) && ready && updated {
×
470
                                partitionedUpdatedAndReadyCount++
×
471
                        }
×
472
                }
473
        }
474

475
        if lws.Status.ReadyReplicas != int32(readyCount) {
×
476
                lws.Status.ReadyReplicas = int32(readyCount)
×
477
                updateStatus = true
×
478
        }
×
479

480
        if lws.Status.UpdatedReplicas != int32(updatedCount) {
×
481
                lws.Status.UpdatedReplicas = int32(updatedCount)
×
482
                updateStatus = true
×
483
        }
×
484

485
        var conditions []metav1.Condition
×
486
        if partitionedUpdatedNonBurstCount < partitionedCurrentNonBurstCount {
×
487
                // upgradeInProgress is true when the upgrade replicas is smaller than the expected
×
488
                // number of total replicas not including the burst replicas
×
NEW
489
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetUpdateInProgress, lws))
×
NEW
490
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetProgressing, lws))
×
491
        } else if readyNonBurstWorkerCount == int(*lws.Spec.Replicas) && partitionedUpdatedAndReadyCount == partitionedCurrentNonBurstCount {
×
NEW
492
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetAvailable, lws))
×
493
        } else {
×
NEW
494
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetProgressing, lws))
×
495
        }
×
496

497
        // updateDone is true when all replicas are updated and ready
498
        updateDone := (lwsPartition == 0) && partitionedUpdatedAndReadyCount == int(*lws.Spec.Replicas)
×
499

×
500
        updateCondition := setConditions(lws, conditions)
×
501
        // if condition changed, record events
×
502
        if updateCondition {
×
503
                r.Record.Eventf(lws, nil, corev1.EventTypeNormal, conditions[0].Reason, Update, conditions[0].Message+fmt.Sprintf(", with %d groups ready of total %d groups", readyCount, int(*lws.Spec.Replicas)))
×
504
        }
×
505
        return updateStatus || updateCondition, updateDone, nil
×
506
}
507

508
// Updates status and condition of LeaderWorkerSet and returns whether or not an update actually occurred.
509
func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, revisionKey string) (bool, error) {
×
510
        updateStatus := false
×
511
        log := ctrl.LoggerFrom(ctx)
×
512

×
513
        // Retrieve the leader StatefulSet.
×
514
        sts := &appsv1.StatefulSet{}
×
515
        if err := r.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, sts); err != nil {
×
516
                log.Error(err, "Error retrieving leader StatefulSet")
×
517
                return false, err
×
518
        }
×
519

520
        // retrieve the current number of replicas -- the number of leaders
521
        replicas := int(sts.Status.Replicas)
×
522
        if lws.Status.Replicas != int32(replicas) {
×
523
                lws.Status.Replicas = int32(replicas)
×
524
                updateStatus = true
×
525
        }
×
526

NEW
527
        if lws.Status.ObservedGeneration != lws.Generation {
×
NEW
528
                lws.Status.ObservedGeneration = lws.Generation
×
NEW
529
                updateStatus = true
×
NEW
530
        }
×
531

532
        if lws.Status.HPAPodSelector == "" {
×
533
                labelSelector := &metav1.LabelSelector{
×
534
                        MatchLabels: map[string]string{
×
535
                                leaderworkerset.SetNameLabelKey:     lws.Name,
×
536
                                leaderworkerset.WorkerIndexLabelKey: "0", // select leaders
×
537
                        },
×
538
                }
×
539
                selector, err := metav1.LabelSelectorAsSelector(labelSelector)
×
540
                if err != nil {
×
541
                        log.Error(err, "Converting label selector to selector")
×
542
                        return false, err
×
543
                }
×
544

545
                lws.Status.HPAPodSelector = selector.String()
×
546
                updateStatus = true
×
547
        }
548

549
        // check if an update is needed
550
        updateConditions, updateDone, err := r.updateConditions(ctx, lws, revisionKey)
×
551
        if err != nil {
×
552
                return false, err
×
553
        }
×
554

555
        if updateStatus || updateConditions {
×
556
                if err := r.Status().Update(ctx, lws); err != nil {
×
557
                        if !apierrors.IsConflict(err) {
×
558
                                log.Error(err, "Updating LeaderWorkerSet status and/or condition.")
×
559
                        }
×
560
                        return false, err
×
561
                }
562
        }
563
        return updateDone, nil
×
564
}
565

566
type replicaState struct {
567
        // ready indicates whether both the leader pod and its worker statefulset (if any) are ready.
568
        ready bool
569
        // updated indicates whether both the leader pod and its worker statefulset (if any) are updated to the latest revision.
570
        updated bool
571
}
572

573
func (r *LeaderWorkerSetReconciler) getReplicaStates(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, stsReplicas int32, revisionKey string) ([]replicaState, error) {
×
574
        states := make([]replicaState, stsReplicas)
×
575

×
576
        podSelector := client.MatchingLabels(map[string]string{
×
577
                leaderworkerset.SetNameLabelKey:     lws.Name,
×
578
                leaderworkerset.WorkerIndexLabelKey: "0",
×
579
        })
×
580
        var leaderPodList corev1.PodList
×
581
        if err := r.List(ctx, &leaderPodList, podSelector, client.InNamespace(lws.Namespace)); err != nil {
×
582
                return nil, err
×
583
        }
×
584

585
        // Get a sorted leader pod list matches with the following sorted statefulsets one by one, which means
586
        // the leader pod and the corresponding worker statefulset has the same index.
587
        sortedPods := utils.SortByIndex(func(pod corev1.Pod) (int, error) {
×
588
                return strconv.Atoi(pod.Labels[leaderworkerset.GroupIndexLabelKey])
×
589
        }, leaderPodList.Items, int(stsReplicas))
×
590

591
        stsSelector := client.MatchingLabels(map[string]string{
×
592
                leaderworkerset.SetNameLabelKey: lws.Name,
×
593
        })
×
594
        var stsList appsv1.StatefulSetList
×
595
        if err := r.List(ctx, &stsList, stsSelector, client.InNamespace(lws.Namespace)); err != nil {
×
596
                return nil, err
×
597
        }
×
598
        sortedSts := utils.SortByIndex(func(sts appsv1.StatefulSet) (int, error) {
×
599
                return strconv.Atoi(sts.Labels[leaderworkerset.GroupIndexLabelKey])
×
600
        }, stsList.Items, int(stsReplicas))
×
601

602
        // Once size==1, no worker statefulSets will be created.
603
        noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1
×
604

×
605
        for idx := int32(0); idx < stsReplicas; idx++ {
×
606
                nominatedName := fmt.Sprintf("%s-%d", lws.Name, idx)
×
607
                // It can happen that the leader pod or the worker statefulset hasn't created yet
×
608
                // or under rebuilding, which also indicates not ready.
×
609
                if nominatedName != sortedPods[idx].Name || (!noWorkerSts && nominatedName != sortedSts[idx].Name) {
×
610
                        states[idx] = replicaState{
×
611
                                ready:   false,
×
612
                                updated: false,
×
613
                        }
×
614
                        continue
×
615
                }
616

617
                leaderUpdated := revisionutils.GetRevisionKey(&sortedPods[idx]) == revisionKey
×
618
                leaderReady := podutils.PodRunningAndReady(sortedPods[idx])
×
619

×
620
                if noWorkerSts {
×
621
                        states[idx] = replicaState{
×
622
                                ready:   leaderReady,
×
623
                                updated: leaderUpdated,
×
624
                        }
×
625
                        continue
×
626
                }
627

628
                workersUpdated := revisionutils.GetRevisionKey(&sortedSts[idx]) == revisionKey
×
629
                workersReady := statefulsetutils.StatefulsetReady(sortedSts[idx])
×
630

×
631
                states[idx] = replicaState{
×
632
                        ready:   leaderReady && workersReady,
×
633
                        updated: leaderUpdated && workersUpdated,
×
634
                }
×
635
        }
636

637
        return states, nil
×
638
}
639

640
func rollingUpdatePartition(states []replicaState, stsReplicas int32, rollingStep int32, currentPartition int32) int32 {
×
641
        continuousReadyReplicas := calculateContinuousReadyReplicas(states)
×
642

×
643
        // Update up to rollingStep replicas at once.
×
644
        var rollingStepPartition = utils.NonZeroValue(stsReplicas - continuousReadyReplicas - rollingStep)
×
645

×
646
        // rollingStepPartition calculation above disregards the state of replicas with idx<rollingStepPartition.
×
647
        // To prevent violating the maxUnavailable, we have to account for these replicas and increase the partition if some are not ready.
×
648
        var unavailable int32
×
649
        for idx := 0; idx < int(rollingStepPartition); idx++ {
×
650
                if !states[idx].ready {
×
651
                        unavailable++
×
652
                }
×
653
        }
654
        var partition = rollingStepPartition + unavailable
×
655

×
656
        // Reduce the partition if replicas are continuously not ready. It is safe since updating these replicas does not impact
×
657
        // the availability of the LWS. This is important to prevent update from getting stuck in case maxUnavailable is already violated
×
658
        // (for example, all replicas are not ready when rolling update is started).
×
659
        // Note that we never drop the partition below rolliingStepPartition.
×
660
        for idx := min(partition, stsReplicas-1); idx >= rollingStepPartition; idx-- {
×
661
                if !states[idx].ready || states[idx].updated {
×
662
                        partition = idx
×
663
                } else {
×
664
                        break
×
665
                }
666
        }
667

668
        // That means Partition moves in one direction to make it simple.
669
        return min(partition, currentPartition)
×
670
}
671

672
func calculateLWSUnreadyReplicas(states []replicaState, lwsReplicas int32) int32 {
×
673
        var unreadyCount int32
×
674
        for idx := int32(0); idx < lwsReplicas; idx++ {
×
675
                if idx >= int32(len(states)) || !states[idx].ready || !states[idx].updated {
×
676
                        unreadyCount++
×
677
                }
×
678
        }
679
        return unreadyCount
×
680
}
681

682
func calculateRollingUpdateReplicas(lwsReplicas int32, maxSurge int32, maxUnavailable int32, unreadyReplicas int32) int32 {
7✔
683
        burstReplicas := lwsReplicas + maxSurge
7✔
684
        if unreadyReplicas <= maxSurge {
13✔
685
                // Keep enough surge replicas to cover any desired replicas that are still
6✔
686
                // unavailable beyond the configured maxUnavailable budget. Once the
6✔
687
                // remaining unready desired replicas fit inside the budget, we can reclaim
6✔
688
                // surge capacity gradually.
6✔
689
                requiredSurgeReplicas := utils.NonZeroValue(unreadyReplicas - maxUnavailable)
6✔
690
                return lwsReplicas + requiredSurgeReplicas
6✔
691
        }
6✔
692
        return burstReplicas
1✔
693
}
694

695
func calculateContinuousReadyReplicas(states []replicaState) int32 {
×
696
        // Count ready replicas at tail (from last index down)
×
697
        var continuousReadyCount int32
×
698
        for idx := len(states) - 1; idx >= 0; idx-- {
×
699
                if !states[idx].ready || !states[idx].updated {
×
700
                        break
×
701
                }
702
                continuousReadyCount++
×
703
        }
704
        return continuousReadyCount
×
705
}
706

707
func (r *LeaderWorkerSetReconciler) getLeaderStatefulSet(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) (*appsv1.StatefulSet, error) {
×
708
        sts := &appsv1.StatefulSet{}
×
709
        err := r.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, sts)
×
710
        if err != nil {
×
711
                if apierrors.IsNotFound(err) {
×
712
                        return nil, nil
×
713
                }
×
714
                return nil, err
×
715
        }
716
        return sts, nil
×
717
}
718

719
func (r *LeaderWorkerSetReconciler) getOrCreateRevisionIfNonExist(ctx context.Context, sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet, recorder events.EventRecorder) (*appsv1.ControllerRevision, error) {
×
720
        revisionKey := ""
×
721
        if sts != nil {
×
722
                // Uses the hash in the leader sts to avoid detecting update in the case where LWS controller is upgraded from a version where
×
723
                // the revisionKey was used to detect update instead of controller revision.
×
724
                revisionKey = revisionutils.GetRevisionKey(sts)
×
725
        }
×
726
        if stsRevision, err := revisionutils.GetRevision(ctx, r.Client, lws, revisionKey); stsRevision != nil || err != nil {
×
727
                return stsRevision, err
×
728
        }
×
729
        revision, err := revisionutils.NewRevision(ctx, r.Client, lws, revisionKey)
×
730
        if err != nil {
×
731
                return nil, err
×
732
        }
×
733
        newRevision, err := revisionutils.CreateRevision(ctx, r.Client, revision)
×
734
        if err == nil {
×
735
                message := fmt.Sprintf("Creating revision with key %s for a newly created LeaderWorkerSet", revision.Labels[leaderworkerset.RevisionKey])
×
736
                if revisionKey != "" {
×
737
                        message = fmt.Sprintf("Creating missing revision with key %s for existing LeaderWorkerSet", revision.Labels[leaderworkerset.RevisionKey])
×
738
                }
×
739
                recorder.Eventf(lws, newRevision, corev1.EventTypeNormal, CreatingRevision, Create, message)
×
740
        }
741
        return newRevision, err
×
742
}
743

744
func (r *LeaderWorkerSetReconciler) getUpdatedRevision(ctx context.Context, sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet, revision *appsv1.ControllerRevision) (*appsv1.ControllerRevision, error) {
4✔
745
        if sts == nil {
5✔
746
                return nil, nil
1✔
747
        }
1✔
748

749
        currentRevision, err := revisionutils.NewRevision(ctx, r.Client, lws, "")
3✔
750
        if err != nil {
3✔
751
                return nil, err
×
752
        }
×
753

754
        if !revisionutils.EqualRevision(currentRevision, revision) {
5✔
755
                // If raw bytes differ but the revision is semantically equivalent, avoid triggering a spurious rolling update.
2✔
756
                if revisionutils.SetMatchesRevision(lws, currentRevision, revision, r.revisionEqualityCache) {
3✔
757
                        return nil, nil
1✔
758
                }
1✔
759
                return currentRevision, nil
1✔
760
        }
761

762
        return nil, nil
1✔
763
}
764

765
// constructLeaderStatefulSetApplyConfiguration constructs the applied configuration for the leader StatefulSet
766
func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32, revisionKey string) (*appsapplyv1.StatefulSetApplyConfiguration, error) {
9✔
767
        var podTemplateSpec corev1.PodTemplateSpec
9✔
768
        if lws.Spec.LeaderWorkerTemplate.LeaderTemplate != nil {
11✔
769
                podTemplateSpec = *lws.Spec.LeaderWorkerTemplate.LeaderTemplate.DeepCopy()
2✔
770
        } else {
9✔
771
                podTemplateSpec = *lws.Spec.LeaderWorkerTemplate.WorkerTemplate.DeepCopy()
7✔
772
        }
7✔
773
        // construct pod template spec configuration
774
        obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&podTemplateSpec)
9✔
775
        if err != nil {
9✔
776
                return nil, err
×
777
        }
×
778
        var podTemplateApplyConfiguration coreapplyv1.PodTemplateSpecApplyConfiguration
9✔
779
        err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj, &podTemplateApplyConfiguration)
9✔
780
        if err != nil {
9✔
781
                return nil, err
×
782
        }
×
783

784
        podTemplateApplyConfiguration.WithLabels(map[string]string{
9✔
785
                leaderworkerset.WorkerIndexLabelKey: "0",
9✔
786
                leaderworkerset.SetNameLabelKey:     lws.Name,
9✔
787
                leaderworkerset.RevisionKey:         revisionKey,
9✔
788
        })
9✔
789
        podAnnotations := make(map[string]string)
9✔
790
        podAnnotations[leaderworkerset.SizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.Size))
9✔
791
        if lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey] != "" {
11✔
792
                podAnnotations[leaderworkerset.ExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey]
2✔
793
        }
2✔
794
        if lws.Spec.LeaderWorkerTemplate.SubGroupPolicy != nil {
10✔
795
                podAnnotations[leaderworkerset.SubGroupPolicyTypeAnnotationKey] = (string(*lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.Type))
1✔
796
                podAnnotations[leaderworkerset.SubGroupSizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.SubGroupSize))
1✔
797
                if lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] != "" {
2✔
798
                        podAnnotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey]
1✔
799
                }
1✔
800
        }
801

802
        if lws.Spec.NetworkConfig != nil && *lws.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainUniquePerReplica {
9✔
803
                podAnnotations[leaderworkerset.SubdomainPolicyAnnotationKey] = string(leaderworkerset.SubdomainUniquePerReplica)
×
804
        }
×
805

806
        podTemplateApplyConfiguration.WithAnnotations(podAnnotations)
9✔
807

9✔
808
        lwsReplicas := int(*lws.Spec.Replicas)
9✔
809
        lwsMaxUnavailable, err := intstr.GetScaledValueFromIntOrPercent(&lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxUnavailable, lwsReplicas, false)
9✔
810
        if err != nil {
9✔
811
                return nil, err
×
812
        }
×
813
        lwsMaxSurge, err := intstr.GetScaledValueFromIntOrPercent(&lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxSurge, lwsReplicas, true)
9✔
814
        if err != nil {
9✔
815
                return nil, err
×
816
        }
×
817
        if lwsMaxSurge > lwsReplicas {
10✔
818
                lwsMaxSurge = lwsReplicas
1✔
819
        }
1✔
820
        stsMaxUnavailableInt := int32(lwsMaxUnavailable + lwsMaxSurge)
9✔
821
        // lwsMaxUnavailable=0 and lwsMaxSurge=0 together should be blocked by webhook,
9✔
822
        // but just in case, we'll make sure that stsMaxUnavailable is at least 1.
9✔
823
        // This also handles the case when lws.Spec.Replicas is 0.
9✔
824
        if stsMaxUnavailableInt < 1 {
10✔
825
                stsMaxUnavailableInt = 1
1✔
826
        }
1✔
827
        stsMaxUnavailable := intstr.FromInt32(stsMaxUnavailableInt)
9✔
828

9✔
829
        // construct statefulset apply configuration
9✔
830
        statefulSetConfig := appsapplyv1.StatefulSet(lws.Name, lws.Namespace).
9✔
831
                WithSpec(appsapplyv1.StatefulSetSpec().
9✔
832
                        WithServiceName(lws.Name).
9✔
833
                        WithReplicas(replicas).
9✔
834
                        WithPodManagementPolicy(appsv1.ParallelPodManagement).
9✔
835
                        WithTemplate(&podTemplateApplyConfiguration).
9✔
836
                        WithUpdateStrategy(appsapplyv1.StatefulSetUpdateStrategy().WithType(appsv1.StatefulSetUpdateStrategyType(lws.Spec.RolloutStrategy.Type)).WithRollingUpdate(
9✔
837
                                appsapplyv1.RollingUpdateStatefulSetStrategy().WithMaxUnavailable(stsMaxUnavailable).WithPartition(partition),
9✔
838
                        )).
9✔
839
                        WithSelector(metaapplyv1.LabelSelector().
9✔
840
                                WithMatchLabels(map[string]string{
9✔
841
                                        leaderworkerset.SetNameLabelKey:     lws.Name,
9✔
842
                                        leaderworkerset.WorkerIndexLabelKey: "0",
9✔
843
                                }))).
9✔
844
                WithLabels(map[string]string{
9✔
845
                        leaderworkerset.SetNameLabelKey: lws.Name,
9✔
846
                        leaderworkerset.RevisionKey:     revisionKey,
9✔
847
                }).
9✔
848
                WithAnnotations(map[string]string{
9✔
849
                        leaderworkerset.ReplicasAnnotationKey: strconv.Itoa(int(*lws.Spec.Replicas)),
9✔
850
                })
9✔
851

9✔
852
        pvcApplyConfiguration := controllerutils.GetPVCApplyConfiguration(lws)
9✔
853
        if len(pvcApplyConfiguration) > 0 {
10✔
854
                statefulSetConfig.Spec.WithVolumeClaimTemplates(pvcApplyConfiguration...)
1✔
855
        }
1✔
856

857
        if lws.Spec.LeaderWorkerTemplate.PersistentVolumeClaimRetentionPolicy != nil {
10✔
858
                pvcRetentionPolicy := &appsapplyv1.StatefulSetPersistentVolumeClaimRetentionPolicyApplyConfiguration{
1✔
859
                        WhenDeleted: &lws.Spec.LeaderWorkerTemplate.PersistentVolumeClaimRetentionPolicy.WhenDeleted,
1✔
860
                        WhenScaled:  &lws.Spec.LeaderWorkerTemplate.PersistentVolumeClaimRetentionPolicy.WhenScaled,
1✔
861
                }
1✔
862
                statefulSetConfig.Spec.WithPersistentVolumeClaimRetentionPolicy(pvcRetentionPolicy)
1✔
863
        }
1✔
864
        return statefulSetConfig, nil
9✔
865
}
866

NEW
867
func makeCondition(conditionType leaderworkerset.LeaderWorkerSetConditionType, lws *leaderworkerset.LeaderWorkerSet) metav1.Condition {
×
868
        var condtype, reason, message string
×
869
        switch conditionType {
×
870
        case leaderworkerset.LeaderWorkerSetAvailable:
×
871
                condtype = string(leaderworkerset.LeaderWorkerSetAvailable)
×
872
                reason = "AllGroupsReady"
×
873
                message = "All replicas are ready"
×
874
        case leaderworkerset.LeaderWorkerSetUpdateInProgress:
×
875
                condtype = string(leaderworkerset.LeaderWorkerSetUpdateInProgress)
×
876
                reason = GroupsUpdating
×
877
                message = "Rolling Upgrade is in progress"
×
878
        default:
×
879
                condtype = string(leaderworkerset.LeaderWorkerSetProgressing)
×
880
                reason = GroupsProgressing
×
881
                message = "Replicas are progressing"
×
882
        }
883

884
        condition := metav1.Condition{
×
885
                Type:               condtype,
×
886
                Status:             metav1.ConditionStatus(corev1.ConditionTrue),
×
887
                LastTransitionTime: metav1.Now(),
×
NEW
888
                ObservedGeneration: lws.Generation,
×
889
                Reason:             reason,
×
890
                Message:            message,
×
891
        }
×
892
        return condition
×
893
}
894

895
func setConditions(lws *leaderworkerset.LeaderWorkerSet, conditions []metav1.Condition) bool {
×
896
        shouldUpdate := false
×
897
        for _, condition := range conditions {
×
898
                shouldUpdate = shouldUpdate || setCondition(lws, condition)
×
899
        }
×
900

NEW
901
        for i := range lws.Status.Conditions {
×
NEW
902
                if lws.Status.Conditions[i].ObservedGeneration != lws.Generation {
×
NEW
903
                        lws.Status.Conditions[i].ObservedGeneration = lws.Generation
×
NEW
904
                        shouldUpdate = true
×
NEW
905
                }
×
906
        }
907

UNCOV
908
        return shouldUpdate
×
909
}
910

911
func setCondition(lws *leaderworkerset.LeaderWorkerSet, newCondition metav1.Condition) bool {
5✔
912
        newCondition.LastTransitionTime = metav1.Now()
5✔
913
        found := false
5✔
914
        shouldUpdate := false
5✔
915

5✔
916
        // Precondition: newCondition has status true.
5✔
917
        for i, curCondition := range lws.Status.Conditions {
9✔
918
                if newCondition.Type == curCondition.Type {
7✔
919
                        if newCondition.Status != curCondition.Status ||
3✔
920
                                newCondition.ObservedGeneration != curCondition.ObservedGeneration {
5✔
921
                                // the conditions match but one is true and one is false. Update the stored condition
2✔
922
                                // with the new condition.
2✔
923
                                lws.Status.Conditions[i] = newCondition
2✔
924
                                shouldUpdate = true
2✔
925
                        }
2✔
926
                        // if both are true or both are false, do nothing.
927
                        found = true
3✔
928
                } else {
1✔
929
                        // if the conditions are not of the same type, do nothing unless one is Progressing and one is
1✔
930
                        // Available and both are true. Must be mutually exclusive.
1✔
931
                        if exclusiveConditionTypes(curCondition, newCondition) &&
1✔
932
                                (newCondition.Status == metav1.ConditionTrue) && (curCondition.Status == metav1.ConditionTrue) {
1✔
UNCOV
933
                                lws.Status.Conditions[i].Status = metav1.ConditionFalse
×
NEW
934
                                lws.Status.Conditions[i].LastTransitionTime = metav1.Now()
×
NEW
935
                                lws.Status.Conditions[i].ObservedGeneration = newCondition.ObservedGeneration
×
UNCOV
936
                                shouldUpdate = true
×
UNCOV
937
                        }
×
938
                }
939
        }
940
        // condition doesn't exist, update only if the status is true
941
        if newCondition.Status == metav1.ConditionTrue && !found {
7✔
942
                lws.Status.Conditions = append(lws.Status.Conditions, newCondition)
2✔
943
                shouldUpdate = true
2✔
944
        }
2✔
945
        return shouldUpdate
5✔
946
}
947

948
func exclusiveConditionTypes(condition1 metav1.Condition, condition2 metav1.Condition) bool {
8✔
949
        if (condition1.Type == string(leaderworkerset.LeaderWorkerSetAvailable) && condition2.Type == string(leaderworkerset.LeaderWorkerSetProgressing)) ||
8✔
950
                (condition1.Type == string(leaderworkerset.LeaderWorkerSetProgressing) && condition2.Type == string(leaderworkerset.LeaderWorkerSetAvailable)) {
11✔
951
                return true
3✔
952
        }
3✔
953

954
        if (condition1.Type == string(leaderworkerset.LeaderWorkerSetAvailable) && condition2.Type == string(leaderworkerset.LeaderWorkerSetUpdateInProgress)) ||
5✔
955
                (condition1.Type == string(leaderworkerset.LeaderWorkerSetUpdateInProgress) && condition2.Type == string(leaderworkerset.LeaderWorkerSetAvailable)) {
7✔
956
                return true
2✔
957
        }
2✔
958

959
        return false
3✔
960
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc