• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / lws / 26164378256

20 May 2026 01:03PM UTC coverage: 44.386% (-0.03%) from 44.419%
26164378256

Pull #851

github

Mostafahassen1
feat(lws): improve feedback for malformed LWS resources
Pull Request #851: fix: surface error message in FailedCreate events for leader and worker statefulsets

0 of 4 new or added lines in 2 files covered. (0.0%)

80 existing lines in 1 file now uncovered.

1182 of 2663 relevant lines covered (44.39%)

2.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

26.96
/pkg/controllers/leaderworkerset_controller.go
1
/*
2
Copyright 2023.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controllers
18

19
import (
20
        "context"
21
        "fmt"
22
        "strconv"
23
        "time"
24

25
        appsv1 "k8s.io/api/apps/v1"
26
        corev1 "k8s.io/api/core/v1"
27
        apierrors "k8s.io/apimachinery/pkg/api/errors"
28
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
        "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        "k8s.io/apimachinery/pkg/util/intstr"
33
        appsapplyv1 "k8s.io/client-go/applyconfigurations/apps/v1"
34
        coreapplyv1 "k8s.io/client-go/applyconfigurations/core/v1"
35
        metaapplyv1 "k8s.io/client-go/applyconfigurations/meta/v1"
36
        "k8s.io/client-go/tools/events"
37
        "k8s.io/klog/v2"
38
        "k8s.io/utils/lru"
39
        "k8s.io/utils/ptr"
40
        ctrl "sigs.k8s.io/controller-runtime"
41
        "sigs.k8s.io/controller-runtime/pkg/client"
42
        "sigs.k8s.io/controller-runtime/pkg/handler"
43
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
44

45
        leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1"
46
        "sigs.k8s.io/lws/pkg/utils"
47
        controllerutils "sigs.k8s.io/lws/pkg/utils/controller"
48
        podutils "sigs.k8s.io/lws/pkg/utils/pod"
49
        revisionutils "sigs.k8s.io/lws/pkg/utils/revision"
50
        statefulsetutils "sigs.k8s.io/lws/pkg/utils/statefulset"
51
)
52

53
// LeaderWorkerSetReconciler reconciles a LeaderWorkerSet object
54
type LeaderWorkerSetReconciler struct {
55
        client.Client
56
        Scheme *runtime.Scheme
57
        Record events.EventRecorder
58

59
        revisionEqualityCache *lru.Cache
60
}
61

62
var (
63
        apiGVStr = leaderworkerset.GroupVersion.String()
64
)
65

66
const (
67
        lwsOwnerKey  = ".metadata.controller"
68
        fieldManager = "lws"
69
)
70

71
const (
72
        // FailedCreate Event reason used when a resource creation fails.
73
        // The event uses the error(s) as the reason.
74
        FailedCreate      = "FailedCreate"
75
        GroupsProgressing = "GroupsProgressing"
76
        GroupsUpdating    = "GroupsUpdating"
77
        CreatingRevision  = "CreatingRevision"
78

79
        // Event actions
80
        Create = "Create"
81
        Update = "Update"
82
        Delete = "Delete"
83
)
84

85
// maxRevisionEqualityCacheEntries is the cache size for semantic revision equality results.
86
const maxRevisionEqualityCacheEntries = 10_000
87

UNCOV
88
func NewLeaderWorkerSetReconciler(client client.Client, scheme *runtime.Scheme, record events.EventRecorder) *LeaderWorkerSetReconciler {
×
89
        return &LeaderWorkerSetReconciler{
×
90
                Client:                client,
×
91
                Scheme:                scheme,
×
92
                Record:                record,
×
93
                revisionEqualityCache: lru.New(maxRevisionEqualityCacheEntries),
×
94
        }
×
95
}
×
96

97
//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch;get;list
98
//+kubebuilder:rbac:groups=events.k8s.io,resources=events,verbs=create;watch;update;patch;get;list
99
//+kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets,verbs=get;list;watch;create;update;patch;delete
100
//+kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets/status,verbs=get;update;patch
101
//+kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets/finalizers,verbs=update
102
//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
103
//+kubebuilder:rbac:groups=apps,resources=statefulsets/status,verbs=get;update;patch
104
//+kubebuilder:rbac:groups=apps,resources=statefulsets/finalizers,verbs=update
105
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
106
//+kubebuilder:rbac:groups=apps,resources=controllerrevisions,verbs=get;list;watch;create;update;patch;delete
107
//+kubebuilder:rbac:groups=apps,resources=controllerrevisions/status,verbs=get;update;patch
108
//+kubebuilder:rbac:groups=apps,resources=controllerrevisions/finalizers,verbs=update
109

UNCOV
110
func (r *LeaderWorkerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
×
111
        // Get leaderworkerset object
×
112
        lws := &leaderworkerset.LeaderWorkerSet{}
×
113
        if err := r.Get(ctx, types.NamespacedName{Name: req.Name, Namespace: req.Namespace}, lws); err != nil {
×
114
                return ctrl.Result{}, client.IgnoreNotFound(err)
×
115
        }
×
116

UNCOV
117
        if lws.DeletionTimestamp != nil {
×
118
                return ctrl.Result{}, nil
×
119
        }
×
120

UNCOV
121
        log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws))
×
122
        ctx = ctrl.LoggerInto(ctx, log)
×
123

×
124
        leaderSts, err := r.getLeaderStatefulSet(ctx, lws)
×
125
        if err != nil {
×
126
                log.Error(err, "Fetching leader statefulset")
×
127
                return ctrl.Result{}, err
×
128
        }
×
129

UNCOV
130
        if leaderSts != nil && leaderSts.DeletionTimestamp != nil {
×
131
                return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
×
132
        }
×
133

134
        // Handles two cases:
135
        // Case 1: Upgrading the LWS controller from a version that doesn't support controller revision
136
        // Case 2: Creating the controller revision for a newly created LWS object
UNCOV
137
        revision, err := r.getOrCreateRevisionIfNonExist(ctx, leaderSts, lws, r.Record)
×
138
        if err != nil {
×
139
                log.Error(err, "Creating controller revision")
×
140
                return ctrl.Result{}, err
×
141
        }
×
142

UNCOV
143
        updatedRevision, err := r.getUpdatedRevision(ctx, leaderSts, lws, revision)
×
144
        if err != nil {
×
145
                log.Error(err, "Validating if LWS has been updated")
×
146
                return ctrl.Result{}, err
×
147
        }
×
148
        lwsUpdated := updatedRevision != nil
×
149
        if lwsUpdated {
×
150
                revision, err = revisionutils.CreateRevision(ctx, r.Client, updatedRevision)
×
151
                if err != nil {
×
152
                        log.Error(err, "Creating revision for updated LWS")
×
153
                        return ctrl.Result{}, err
×
154
                }
×
155
                r.Record.Eventf(lws, revision, corev1.EventTypeNormal, CreatingRevision, Create, fmt.Sprintf("Creating revision with key %s for updated LWS", revisionutils.GetRevisionKey(revision)))
×
156
        }
157

UNCOV
158
        partition, replicas, err := r.rollingUpdateParameters(ctx, lws, leaderSts, revisionutils.GetRevisionKey(revision), lwsUpdated)
×
159
        if err != nil {
×
160
                log.Error(err, "Rolling partition error")
×
161
                return ctrl.Result{}, err
×
162
        }
×
163

UNCOV
164
        if err := r.SSAWithStatefulset(ctx, lws, partition, replicas, revisionutils.GetRevisionKey(revision)); err != nil {
×
165
                if leaderSts == nil {
×
NEW
166
                        r.Record.Eventf(lws, nil, corev1.EventTypeWarning, FailedCreate, Create, fmt.Sprintf("Failed to create leader statefulset %s: %v", lws.Name, err))
×
NEW
167
                } else {
×
NEW
168
                        r.Record.Eventf(lws, nil, corev1.EventTypeWarning, FailedCreate, Update, fmt.Sprintf("Failed to update leader statefulset %s: %v", lws.Name, err))
×
169
                }
×
170
                return ctrl.Result{}, err
×
171
        }
172

UNCOV
173
        if leaderSts == nil {
×
174
                // An event is logged to track sts creation.
×
175
                r.Record.Eventf(lws, revision, corev1.EventTypeNormal, GroupsProgressing, Create, fmt.Sprintf("Created leader statefulset %s", lws.Name))
×
176
        } else if !lwsUpdated && partition != *leaderSts.Spec.UpdateStrategy.RollingUpdate.Partition {
×
177
                // An event is logged to track update progress.
×
178
                oldPartition := *leaderSts.Spec.UpdateStrategy.RollingUpdate.Partition
×
179
                var updateMsg string
×
180
                if oldPartition-1 == partition {
×
181
                        updateMsg = fmt.Sprintf("Updating replica %d", partition)
×
182
                } else {
×
183
                        updateMsg = fmt.Sprintf("Updating replicas %d to %d (inclusive)", partition, oldPartition-1)
×
184
                }
×
185
                r.Record.Eventf(lws, revision, corev1.EventTypeNormal, GroupsUpdating, Update, updateMsg)
×
186
        }
187

188
        // Create headless service if it does not exist.
UNCOV
189
        if err := r.reconcileHeadlessServices(ctx, lws); err != nil {
×
190
                log.Error(err, "Creating headless service.")
×
191
                r.Record.Eventf(lws, nil, corev1.EventTypeWarning, FailedCreate, Create, fmt.Sprintf("Failed to create headless service for error: %v", err))
×
192
                return ctrl.Result{}, err
×
193
        }
×
194

UNCOV
195
        updateDone, err := r.updateStatus(ctx, lws, revisionutils.GetRevisionKey(revision))
×
196
        if err != nil {
×
197
                if apierrors.IsConflict(err) {
×
198
                        return ctrl.Result{Requeue: true}, nil
×
199
                }
×
200
                return ctrl.Result{}, err
×
201
        }
202

UNCOV
203
        if updateDone {
×
204
                if err := revisionutils.TruncateRevisions(ctx, r.Client, lws, revisionutils.GetRevisionKey(revision)); err != nil {
×
205
                        return ctrl.Result{}, err
×
206
                }
×
207
        }
UNCOV
208
        log.V(2).Info("Leader Reconcile completed.")
×
209
        return ctrl.Result{}, nil
×
210
}
211

UNCOV
212
func (r *LeaderWorkerSetReconciler) reconcileHeadlessServices(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) error {
×
213
        if lws.Spec.NetworkConfig == nil || *lws.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainShared {
×
214
                if err := controllerutils.CreateHeadlessServiceIfNotExists(ctx, r.Client, r.Scheme, lws, lws.Name, map[string]string{leaderworkerset.SetNameLabelKey: lws.Name}, lws); err != nil {
×
215
                        return err
×
216
                }
×
217
                return nil
×
218
        }
UNCOV
219
        return nil
×
220
}
221

222
// SetupWithManager sets up the controller with the Manager.
UNCOV
223
func (r *LeaderWorkerSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
×
224
        return ctrl.NewControllerManagedBy(mgr).
×
225
                For(&leaderworkerset.LeaderWorkerSet{}).
×
226
                Owns(&appsv1.StatefulSet{}).
×
227
                Owns(&corev1.Service{}).
×
228
                Watches(&appsv1.StatefulSet{},
×
229
                        handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request {
×
230
                                return []reconcile.Request{
×
231
                                        {NamespacedName: types.NamespacedName{
×
232
                                                Name:      a.GetLabels()[leaderworkerset.SetNameLabelKey],
×
233
                                                Namespace: a.GetNamespace(),
×
234
                                        }},
×
235
                                }
×
236
                        })).
×
237
                Complete(r)
238
}
239

UNCOV
240
func SetupIndexes(indexer client.FieldIndexer) error {
×
241
        return indexer.IndexField(context.Background(), &appsv1.StatefulSet{}, lwsOwnerKey, func(rawObj client.Object) []string {
×
242
                // grab the statefulSet object, extract the owner...
×
243
                statefulSet := rawObj.(*appsv1.StatefulSet)
×
244
                owner := metav1.GetControllerOf(statefulSet)
×
245
                if owner == nil {
×
246
                        return nil
×
247
                }
×
248
                // ...make sure it's a LeaderWorkerSet...
UNCOV
249
                if owner.APIVersion != apiGVStr || owner.Kind != "LeaderWorkerSet" {
×
250
                        return nil
×
251
                }
×
252
                // ...and if so, return it
UNCOV
253
                return []string{owner.Name}
×
254
        })
255
}
256

257
// Rolling update will always wait for the former replica to be ready then process the next one,
258
// we didn't consider rollout strategy type here since we only support rollingUpdate now,
259
// once we have more policies, we should update the logic here.
260
// Possible scenarios for Partition:
261
//   - When sts is under creation, partition is always 0 because pods are created in parallel, rolling update is not relevant here.
262
//   - When sts is in rolling update, the partition will start from the last index to the index 0 processing in maxUnavailable step.
263
//   - When sts is in rolling update, and Replicas increases, we'll delay the rolling update until the scaling up is done,
264
//     Partition will not change, new replicas are created using the new template from the get go.
265
//   - When sts is rolling update, and Replicas decreases, the partition will not change until new Replicas < Partition,
266
//     in which case Partition will be reset to the new Replicas value.
267
//   - When sts is ready for a rolling update and Replicas increases at the same time, we'll delay the rolling update until
268
//     the scaling up is done.
269
//   - When sts is ready for a rolling update and Replicas decreases at the same time, we'll start the rolling update
270
//     together with scaling down.
271
//
272
// At rest, Partition should always be zero.
273
//
274
// For Replicas:
275
//   - When rolling update, Replicas is equal to (spec.Replicas+maxSurge)
276
//   - Otherwise, Replicas is equal to spec.Replicas
277
//   - One exception here is when unready replicas of leaderWorkerSet is equal to MaxSurge,
278
//     we should reclaim the extra replicas gradually to accommodate for the new replicas.
279
func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, sts *appsv1.StatefulSet, revisionKey string, leaderWorkerSetUpdated bool) (stsPartition int32, replicas int32, err error) {
3✔
280
        log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws))
3✔
281
        ctx = ctrl.LoggerInto(ctx, log)
3✔
282
        lwsReplicas := *lws.Spec.Replicas
3✔
283

3✔
284
        defer func() {
6✔
285
                // Limit the replicas with less than lwsPartition will not be updated.
3✔
286
                stsPartition = max(stsPartition, *lws.Spec.RolloutStrategy.RollingUpdateConfiguration.Partition)
3✔
287
        }()
3✔
288

289
        // Case 1:
290
        // If sts not created yet, all partitions should be updated,
291
        // replicas should not change.
292
        if sts == nil {
3✔
UNCOV
293
                return 0, lwsReplicas, nil
×
294
        }
×
295

296
        stsReplicas := *sts.Spec.Replicas
3✔
297
        maxSurge, err := intstr.GetScaledValueFromIntOrPercent(&lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxSurge, int(lwsReplicas), true)
3✔
298
        if err != nil {
3✔
UNCOV
299
                return 0, 0, err
×
300
        }
×
301
        maxUnavailable, err := intstr.GetScaledValueFromIntOrPercent(&lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxUnavailable, int(lwsReplicas), false)
3✔
302
        if err != nil {
3✔
UNCOV
303
                return 0, 0, err
×
304
        }
×
305
        // No need to burst more than the replicas.
306
        if maxSurge > int(lwsReplicas) {
3✔
UNCOV
307
                maxSurge = int(lwsReplicas)
×
308
        }
×
309
        burstReplicas := lwsReplicas + int32(maxSurge)
3✔
310

3✔
311
        // wantReplicas calculates the final replicas if needed.
3✔
312
        wantReplicas := func(unreadyReplicas int32) int32 {
4✔
313
                finalReplicas := calculateRollingUpdateReplicas(lwsReplicas, int32(maxSurge), int32(maxUnavailable), unreadyReplicas)
1✔
314
                if finalReplicas == stsReplicas-1 {
1✔
UNCOV
315
                        r.Record.Eventf(lws, nil, corev1.EventTypeNormal, GroupsProgressing, Delete, fmt.Sprintf("deleting surge replica %s-%d", lws.Name, finalReplicas))
×
316
                } else if finalReplicas < stsReplicas {
1✔
UNCOV
317
                        r.Record.Eventf(lws, nil, corev1.EventTypeNormal, GroupsProgressing, Delete, fmt.Sprintf("deleting surge replicas from %s-%d to %s-%d", lws.Name, finalReplicas, lws.Name, stsReplicas-1))
×
318
                }
×
319
                return finalReplicas
1✔
320
        }
321

322
        // Case 2:
323
        // Indicates a new rolling update here.
324
        if leaderWorkerSetUpdated {
5✔
325
                // Processing scaling up/down first prior to rolling update.
2✔
326
                partition := min(lwsReplicas, stsReplicas)
2✔
327
                if stsReplicas < lwsReplicas {
3✔
328
                        return partition, lwsReplicas, nil
1✔
329
                }
1✔
330
                return partition, wantReplicas(lwsReplicas), nil
1✔
331
        }
332

333
        partition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition
1✔
334
        rollingUpdateCompleted := partition == 0 && stsReplicas == lwsReplicas
1✔
335
        // Case 3:
1✔
336
        // In normal cases, return the values directly.
1✔
337
        if rollingUpdateCompleted {
1✔
UNCOV
338
                return 0, lwsReplicas, nil
×
339
        }
×
340
        if stsReplicas < lwsReplicas {
2✔
341
                return partition, lwsReplicas, nil
1✔
342
        }
1✔
343

UNCOV
344
        states, err := r.getReplicaStates(ctx, lws, stsReplicas, revisionKey)
×
345
        if err != nil {
×
346
                return 0, 0, err
×
347
        }
×
348
        lwsUnreadyReplicas := calculateLWSUnreadyReplicas(states, lwsReplicas)
×
349

×
350
        originalLwsReplicas, err := strconv.Atoi(sts.Annotations[leaderworkerset.ReplicasAnnotationKey])
×
351
        if err != nil {
×
352
                return 0, 0, err
×
353
        }
×
354
        replicasUpdated := originalLwsReplicas != int(*lws.Spec.Replicas)
×
355
        // Case 4:
×
356
        // Replicas changed during rolling update.
×
357
        if replicasUpdated {
×
358
                partition := min(partition, burstReplicas)
×
359
                return partition, wantReplicas(lwsUnreadyReplicas), nil
×
360
        }
×
361

362
        // Case 5:
363
        // Calculating the Partition during rolling update, no leaderWorkerSet updates happens.
364

UNCOV
365
        rollingStep := maxUnavailable
×
366
        // Make sure that we always respect the maxUnavailable, or
×
367
        // we'll violate it when reclaiming bursted replicas.
×
368
        rollingStep += maxSurge - (int(burstReplicas) - int(stsReplicas))
×
369

×
370
        partition = rollingUpdatePartition(states, stsReplicas, int32(rollingStep), partition)
×
371
        return partition, wantReplicas(lwsUnreadyReplicas), nil
×
372
}
373

UNCOV
374
func (r *LeaderWorkerSetReconciler) SSAWithStatefulset(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32, revisionKey string) error {
×
375
        log := ctrl.LoggerFrom(ctx)
×
376

×
377
        // construct the statefulset apply configuration
×
378
        leaderStatefulSetApplyConfig, err := constructLeaderStatefulSetApplyConfiguration(lws, partition, replicas, revisionKey)
×
379
        if err != nil {
×
380
                log.Error(err, "Constructing StatefulSet apply configuration.")
×
381
                return err
×
382
        }
×
383
        if err := setControllerReferenceWithStatefulSet(lws, leaderStatefulSetApplyConfig, r.Scheme); err != nil {
×
384
                log.Error(err, "Setting controller reference.")
×
385
                return err
×
386
        }
×
387
        obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(leaderStatefulSetApplyConfig)
×
388
        if err != nil {
×
389
                log.Error(err, "Converting StatefulSet configuration to json.")
×
390
                return err
×
391
        }
×
392
        patch := &unstructured.Unstructured{
×
393
                Object: obj,
×
394
        }
×
395
        // Use server side apply and add fieldmanager to the lws owned fields
×
396
        // If there are conflicts in the fields owned by the lws controller, lws will obtain the ownership and force override
×
397
        // these fields to the ones desired by the lws controller
×
398
        // TODO b/316776287 add E2E test for SSA
×
399
        // TODO: Deprecated: Use client.Client.Apply() and client.Client.SubResource("subrsource").Apply() instead.
×
400
        err = r.Patch(ctx, patch, client.Apply, &client.PatchOptions{ //nolint
×
401
                FieldManager: fieldManager,
×
402
                Force:        ptr.To[bool](true),
×
403
        })
×
404
        if err != nil {
×
405
                log.Error(err, "Using server side apply to update leader statefulset")
×
406
                return err
×
407
        }
×
408

UNCOV
409
        return nil
×
410
}
411

412
// updates the condition of the leaderworkerset to either Progressing or Available.
UNCOV
413
func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, revisionKey string) (bool, bool, error) {
×
414
        log := ctrl.LoggerFrom(ctx)
×
415
        podSelector := client.MatchingLabels(map[string]string{
×
416
                leaderworkerset.SetNameLabelKey:     lws.Name,
×
417
                leaderworkerset.WorkerIndexLabelKey: "0",
×
418
        })
×
419
        leaderPodList := &corev1.PodList{}
×
420
        if err := r.List(ctx, leaderPodList, podSelector, client.InNamespace(lws.Namespace)); err != nil {
×
421
                log.Error(err, "Fetching leaderPods")
×
422
                return false, false, err
×
423
        }
×
424

UNCOV
425
        updateStatus := false
×
426
        readyCount, updatedCount, readyNonBurstWorkerCount := 0, 0, 0
×
427
        partitionedUpdatedNonBurstCount, partitionedCurrentNonBurstCount, partitionedUpdatedAndReadyCount := 0, 0, 0
×
428
        noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1
×
429
        lwsPartition := *lws.Spec.RolloutStrategy.RollingUpdateConfiguration.Partition
×
430

×
431
        // Iterate through all leaderPods.
×
432
        for _, pod := range leaderPodList.Items {
×
433
                index, err := strconv.Atoi(pod.Labels[leaderworkerset.GroupIndexLabelKey])
×
434
                if err != nil {
×
435
                        return false, false, err
×
436
                }
×
437

UNCOV
438
                var sts appsv1.StatefulSet
×
439
                if !noWorkerSts {
×
440
                        if err := r.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: pod.Name}, &sts); err != nil {
×
441
                                if client.IgnoreNotFound(err) != nil {
×
442
                                        log.Error(err, "Fetching worker statefulSet")
×
443
                                        return false, false, err
×
444
                                }
×
445
                                continue
×
446
                        }
447
                }
448

UNCOV
449
                if index < int(*lws.Spec.Replicas) && index >= int(lwsPartition) {
×
450
                        partitionedCurrentNonBurstCount++
×
451
                }
×
452

UNCOV
453
                var ready, updated bool
×
454
                if (noWorkerSts || statefulsetutils.StatefulsetReady(sts)) && podutils.PodRunningAndReady(pod) {
×
455
                        ready = true
×
456
                        readyCount++
×
457
                }
×
458
                if (noWorkerSts || revisionutils.GetRevisionKey(&sts) == revisionKey) && revisionutils.GetRevisionKey(&pod) == revisionKey {
×
459
                        updated = true
×
460
                        updatedCount++
×
461
                        if index < int(*lws.Spec.Replicas) && index >= int(lwsPartition) {
×
462
                                // Bursted replicas do not count when determining if rollingUpdate has been completed.
×
463
                                partitionedUpdatedNonBurstCount++
×
464
                        }
×
465
                }
466

UNCOV
467
                if index < int(*lws.Spec.Replicas) {
×
468
                        if ready {
×
469
                                readyNonBurstWorkerCount++
×
470
                        }
×
471
                        if index >= int(lwsPartition) && ready && updated {
×
472
                                partitionedUpdatedAndReadyCount++
×
473
                        }
×
474
                }
475
        }
476

UNCOV
477
        if lws.Status.ReadyReplicas != int32(readyCount) {
×
478
                lws.Status.ReadyReplicas = int32(readyCount)
×
479
                updateStatus = true
×
480
        }
×
481

UNCOV
482
        if lws.Status.UpdatedReplicas != int32(updatedCount) {
×
483
                lws.Status.UpdatedReplicas = int32(updatedCount)
×
484
                updateStatus = true
×
485
        }
×
486

UNCOV
487
        var conditions []metav1.Condition
×
488
        if partitionedUpdatedNonBurstCount < partitionedCurrentNonBurstCount {
×
489
                // upgradeInProgress is true when the upgrade replicas is smaller than the expected
×
490
                // number of total replicas not including the burst replicas
×
491
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetUpdateInProgress))
×
492
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetProgressing))
×
493
        } else if readyNonBurstWorkerCount == int(*lws.Spec.Replicas) && partitionedUpdatedAndReadyCount == partitionedCurrentNonBurstCount {
×
494
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetAvailable))
×
495
        } else {
×
496
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetProgressing))
×
497
        }
×
498

499
        // updateDone is true when all replicas are updated and ready
UNCOV
500
        updateDone := (lwsPartition == 0) && partitionedUpdatedAndReadyCount == int(*lws.Spec.Replicas)
×
501

×
502
        updateCondition := setConditions(lws, conditions)
×
503
        // if condition changed, record events
×
504
        if updateCondition {
×
505
                r.Record.Eventf(lws, nil, corev1.EventTypeNormal, conditions[0].Reason, Update, conditions[0].Message+fmt.Sprintf(", with %d groups ready of total %d groups", readyCount, int(*lws.Spec.Replicas)))
×
506
        }
×
507
        return updateStatus || updateCondition, updateDone, nil
×
508
}
509

510
// Updates status and condition of LeaderWorkerSet and returns whether or not an update actually occurred.
UNCOV
511
func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, revisionKey string) (bool, error) {
×
512
        updateStatus := false
×
513
        log := ctrl.LoggerFrom(ctx)
×
514

×
515
        // Retrieve the leader StatefulSet.
×
516
        sts := &appsv1.StatefulSet{}
×
517
        if err := r.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, sts); err != nil {
×
518
                log.Error(err, "Error retrieving leader StatefulSet")
×
519
                return false, err
×
520
        }
×
521

522
        // retrieve the current number of replicas -- the number of leaders
UNCOV
523
        replicas := int(sts.Status.Replicas)
×
524
        if lws.Status.Replicas != int32(replicas) {
×
525
                lws.Status.Replicas = int32(replicas)
×
526
                updateStatus = true
×
527
        }
×
528

UNCOV
529
        if lws.Status.HPAPodSelector == "" {
×
530
                labelSelector := &metav1.LabelSelector{
×
531
                        MatchLabels: map[string]string{
×
532
                                leaderworkerset.SetNameLabelKey:     lws.Name,
×
533
                                leaderworkerset.WorkerIndexLabelKey: "0", // select leaders
UNCOV
534
                        },
×
535
                }
×
536
                selector, err := metav1.LabelSelectorAsSelector(labelSelector)
×
537
                if err != nil {
×
538
                        log.Error(err, "Converting label selector to selector")
×
539
                        return false, err
×
540
                }
×
541

×
542
                lws.Status.HPAPodSelector = selector.String()
×
543
                updateStatus = true
×
544
        }
×
545

×
546
        // check if an update is needed
UNCOV
547
        updateConditions, updateDone, err := r.updateConditions(ctx, lws, revisionKey)
×
548
        if err != nil {
×
549
                return false, err
550
        }
551

UNCOV
552
        if updateStatus || updateConditions {
×
553
                if err := r.Status().Update(ctx, lws); err != nil {
×
554
                        if !apierrors.IsConflict(err) {
×
555
                                log.Error(err, "Updating LeaderWorkerSet status and/or condition.")
×
556
                        }
UNCOV
557
                        return false, err
×
558
                }
×
559
        }
×
560
        return updateDone, nil
×
561
}
×
562

×
563
type replicaState struct {
564
        // ready indicates whether both the leader pod and its worker statefulset (if any) are ready.
UNCOV
565
        ready bool
×
566
        // updated indicates whether both the leader pod and its worker statefulset (if any) are updated to the latest revision.
567
        updated bool
568
}
569

570
func (r *LeaderWorkerSetReconciler) getReplicaStates(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, stsReplicas int32, revisionKey string) ([]replicaState, error) {
571
        states := make([]replicaState, stsReplicas)
572

573
        podSelector := client.MatchingLabels(map[string]string{
574
                leaderworkerset.SetNameLabelKey:     lws.Name,
UNCOV
575
                leaderworkerset.WorkerIndexLabelKey: "0",
×
576
        })
×
577
        var leaderPodList corev1.PodList
×
578
        if err := r.List(ctx, &leaderPodList, podSelector, client.InNamespace(lws.Namespace)); err != nil {
×
579
                return nil, err
×
580
        }
×
581

×
582
        // Get a sorted leader pod list matches with the following sorted statefulsets one by one, which means
×
583
        // the leader pod and the corresponding worker statefulset has the same index.
×
584
        sortedPods := utils.SortByIndex(func(pod corev1.Pod) (int, error) {
×
585
                return strconv.Atoi(pod.Labels[leaderworkerset.GroupIndexLabelKey])
×
586
        }, leaderPodList.Items, int(stsReplicas))
587

588
        stsSelector := client.MatchingLabels(map[string]string{
UNCOV
589
                leaderworkerset.SetNameLabelKey: lws.Name,
×
590
        })
×
591
        var stsList appsv1.StatefulSetList
×
592
        if err := r.List(ctx, &stsList, stsSelector, client.InNamespace(lws.Namespace)); err != nil {
UNCOV
593
                return nil, err
×
594
        }
×
595
        sortedSts := utils.SortByIndex(func(sts appsv1.StatefulSet) (int, error) {
×
596
                return strconv.Atoi(sts.Labels[leaderworkerset.GroupIndexLabelKey])
×
597
        }, stsList.Items, int(stsReplicas))
×
598

×
599
        // Once size==1, no worker statefulSets will be created.
×
600
        noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1
×
601

×
602
        for idx := int32(0); idx < stsReplicas; idx++ {
×
603
                nominatedName := fmt.Sprintf("%s-%d", lws.Name, idx)
604
                // It can happen that the leader pod or the worker statefulset hasn't created yet
UNCOV
605
                // or under rebuilding, which also indicates not ready.
×
606
                if nominatedName != sortedPods[idx].Name || (!noWorkerSts && nominatedName != sortedSts[idx].Name) {
×
607
                        states[idx] = replicaState{
×
608
                                ready:   false,
×
609
                                updated: false,
×
610
                        }
×
611
                        continue
×
612
                }
×
613

×
614
                leaderUpdated := revisionutils.GetRevisionKey(&sortedPods[idx]) == revisionKey
×
615
                leaderReady := podutils.PodRunningAndReady(sortedPods[idx])
×
616

×
617
                if noWorkerSts {
618
                        states[idx] = replicaState{
UNCOV
619
                                ready:   leaderReady,
×
620
                                updated: leaderUpdated,
×
621
                        }
×
622
                        continue
×
623
                }
×
624

×
625
                workersUpdated := revisionutils.GetRevisionKey(&sortedSts[idx]) == revisionKey
×
626
                workersReady := statefulsetutils.StatefulsetReady(sortedSts[idx])
×
627

×
628
                states[idx] = replicaState{
629
                        ready:   leaderReady && workersReady,
UNCOV
630
                        updated: leaderUpdated && workersUpdated,
×
631
                }
×
632
        }
×
633

×
634
        return states, nil
×
635
}
×
636

×
637
func rollingUpdatePartition(states []replicaState, stsReplicas int32, rollingStep int32, currentPartition int32) int32 {
638
        continuousReadyReplicas := calculateContinuousReadyReplicas(states)
UNCOV
639

×
640
        // Update up to rollingStep replicas at once.
641
        var rollingStepPartition = utils.NonZeroValue(stsReplicas - continuousReadyReplicas - rollingStep)
UNCOV
642

×
643
        // rollingStepPartition calculation above disregards the state of replicas with idx<rollingStepPartition.
×
644
        // To prevent violating the maxUnavailable, we have to account for these replicas and increase the partition if some are not ready.
×
645
        var unavailable int32
×
646
        for idx := 0; idx < int(rollingStepPartition); idx++ {
×
647
                if !states[idx].ready {
×
648
                        unavailable++
×
649
                }
×
650
        }
×
651
        var partition = rollingStepPartition + unavailable
×
652

×
653
        // Reduce the partition if replicas are continuously not ready. It is safe since updating these replicas does not impact
×
654
        // the availability of the LWS. This is important to prevent update from getting stuck in case maxUnavailable is already violated
×
655
        // (for example, all replicas are not ready when rolling update is started).
UNCOV
656
        // Note that we never drop the partition below rolliingStepPartition.
×
657
        for idx := min(partition, stsReplicas-1); idx >= rollingStepPartition; idx-- {
×
658
                if !states[idx].ready || states[idx].updated {
×
659
                        partition = idx
×
660
                } else {
×
661
                        break
×
662
                }
×
663
        }
×
664

×
665
        // That means Partition moves in one direction to make it simple.
×
666
        return min(partition, currentPartition)
×
667
}
668

669
func calculateLWSUnreadyReplicas(states []replicaState, lwsReplicas int32) int32 {
670
        var unreadyCount int32
UNCOV
671
        for idx := int32(0); idx < lwsReplicas; idx++ {
×
672
                if idx >= int32(len(states)) || !states[idx].ready || !states[idx].updated {
673
                        unreadyCount++
UNCOV
674
                }
×
675
        }
×
676
        return unreadyCount
×
677
}
×
678

×
679
func calculateRollingUpdateReplicas(lwsReplicas int32, maxSurge int32, maxUnavailable int32, unreadyReplicas int32) int32 {
×
680
        burstReplicas := lwsReplicas + maxSurge
UNCOV
681
        if unreadyReplicas <= maxSurge {
×
682
                // Keep enough surge replicas to cover any desired replicas that are still
683
                // unavailable beyond the configured maxUnavailable budget. Once the
684
                // remaining unready desired replicas fit inside the budget, we can reclaim
7✔
685
                // surge capacity gradually.
7✔
686
                requiredSurgeReplicas := utils.NonZeroValue(unreadyReplicas - maxUnavailable)
13✔
687
                return lwsReplicas + requiredSurgeReplicas
6✔
688
        }
6✔
689
        return burstReplicas
6✔
690
}
6✔
691

6✔
692
func calculateContinuousReadyReplicas(states []replicaState) int32 {
6✔
693
        // Count ready replicas at tail (from last index down)
6✔
694
        var continuousReadyCount int32
1✔
695
        for idx := len(states) - 1; idx >= 0; idx-- {
696
                if !states[idx].ready || !states[idx].updated {
UNCOV
697
                        break
×
698
                }
×
699
                continuousReadyCount++
×
700
        }
×
701
        return continuousReadyCount
×
702
}
×
703

UNCOV
704
func (r *LeaderWorkerSetReconciler) getLeaderStatefulSet(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) (*appsv1.StatefulSet, error) {
×
705
        sts := &appsv1.StatefulSet{}
UNCOV
706
        err := r.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, sts)
×
707
        if err != nil {
708
                if apierrors.IsNotFound(err) {
UNCOV
709
                        return nil, nil
×
710
                }
×
711
                return nil, err
×
712
        }
×
713
        return sts, nil
×
714
}
×
715

×
716
func (r *LeaderWorkerSetReconciler) getOrCreateRevisionIfNonExist(ctx context.Context, sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet, recorder events.EventRecorder) (*appsv1.ControllerRevision, error) {
×
717
        revisionKey := ""
UNCOV
718
        if sts != nil {
×
719
                // Uses the hash in the leader sts to avoid detecting update in the case where LWS controller is upgraded from a version where
720
                // the revisionKey was used to detect update instead of controller revision.
UNCOV
721
                revisionKey = revisionutils.GetRevisionKey(sts)
×
722
        }
×
723
        if stsRevision, err := revisionutils.GetRevision(ctx, r.Client, lws, revisionKey); stsRevision != nil || err != nil {
×
724
                return stsRevision, err
×
725
        }
×
726
        revision, err := revisionutils.NewRevision(ctx, r.Client, lws, revisionKey)
×
727
        if err != nil {
×
728
                return nil, err
×
729
        }
×
730
        newRevision, err := revisionutils.CreateRevision(ctx, r.Client, revision)
×
731
        if err == nil {
×
732
                message := fmt.Sprintf("Creating revision with key %s for a newly created LeaderWorkerSet", revision.Labels[leaderworkerset.RevisionKey])
×
733
                if revisionKey != "" {
×
734
                        message = fmt.Sprintf("Creating missing revision with key %s for existing LeaderWorkerSet", revision.Labels[leaderworkerset.RevisionKey])
×
735
                }
×
736
                recorder.Eventf(lws, newRevision, corev1.EventTypeNormal, CreatingRevision, Create, message)
×
737
        }
×
738
        return newRevision, err
×
739
}
×
740

×
741
func (r *LeaderWorkerSetReconciler) getUpdatedRevision(ctx context.Context, sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet, revision *appsv1.ControllerRevision) (*appsv1.ControllerRevision, error) {
×
742
        if sts == nil {
UNCOV
743
                return nil, nil
×
744
        }
745

746
        currentRevision, err := revisionutils.NewRevision(ctx, r.Client, lws, "")
4✔
747
        if err != nil {
5✔
748
                return nil, err
1✔
749
        }
1✔
750

751
        if !revisionutils.EqualRevision(currentRevision, revision) {
3✔
752
                // If raw bytes differ but the revision is semantically equivalent, avoid triggering a spurious rolling update.
3✔
UNCOV
753
                if revisionutils.SetMatchesRevision(lws, currentRevision, revision, r.revisionEqualityCache) {
×
754
                        return nil, nil
×
755
                }
756
                return currentRevision, nil
5✔
757
        }
2✔
758

3✔
759
        return nil, nil
1✔
760
}
1✔
761

1✔
762
// constructLeaderStatefulSetApplyConfiguration constructs the applied configuration for the leader StatefulSet
763
func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32, revisionKey string) (*appsapplyv1.StatefulSetApplyConfiguration, error) {
764
        var podTemplateSpec corev1.PodTemplateSpec
1✔
765
        if lws.Spec.LeaderWorkerTemplate.LeaderTemplate != nil {
766
                podTemplateSpec = *lws.Spec.LeaderWorkerTemplate.LeaderTemplate.DeepCopy()
767
        } else {
768
                podTemplateSpec = *lws.Spec.LeaderWorkerTemplate.WorkerTemplate.DeepCopy()
9✔
769
        }
9✔
770
        // construct pod template spec configuration
11✔
771
        obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&podTemplateSpec)
2✔
772
        if err != nil {
9✔
773
                return nil, err
7✔
774
        }
7✔
775
        var podTemplateApplyConfiguration coreapplyv1.PodTemplateSpecApplyConfiguration
776
        err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj, &podTemplateApplyConfiguration)
9✔
777
        if err != nil {
9✔
UNCOV
778
                return nil, err
×
779
        }
×
780

9✔
781
        podTemplateApplyConfiguration.WithLabels(map[string]string{
9✔
782
                leaderworkerset.WorkerIndexLabelKey: "0",
9✔
UNCOV
783
                leaderworkerset.SetNameLabelKey:     lws.Name,
×
784
                leaderworkerset.RevisionKey:         revisionKey,
×
785
        })
786
        podAnnotations := make(map[string]string)
9✔
787
        podAnnotations[leaderworkerset.SizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.Size))
9✔
788
        if lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey] != "" {
9✔
789
                podAnnotations[leaderworkerset.ExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey]
9✔
790
        }
9✔
791
        if lws.Spec.LeaderWorkerTemplate.SubGroupPolicy != nil {
9✔
792
                podAnnotations[leaderworkerset.SubGroupPolicyTypeAnnotationKey] = (string(*lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.Type))
9✔
793
                podAnnotations[leaderworkerset.SubGroupSizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.SubGroupSize))
11✔
794
                if lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] != "" {
2✔
795
                        podAnnotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey]
2✔
796
                }
10✔
797
        }
1✔
798

1✔
799
        if lws.Spec.NetworkConfig != nil && *lws.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainUniquePerReplica {
2✔
800
                podAnnotations[leaderworkerset.SubdomainPolicyAnnotationKey] = string(leaderworkerset.SubdomainUniquePerReplica)
1✔
801
        }
1✔
802

803
        podTemplateApplyConfiguration.WithAnnotations(podAnnotations)
804

9✔
UNCOV
805
        lwsReplicas := int(*lws.Spec.Replicas)
×
806
        lwsMaxUnavailable, err := intstr.GetScaledValueFromIntOrPercent(&lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxUnavailable, lwsReplicas, false)
×
807
        if err != nil {
808
                return nil, err
9✔
809
        }
9✔
810
        lwsMaxSurge, err := intstr.GetScaledValueFromIntOrPercent(&lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxSurge, lwsReplicas, true)
9✔
811
        if err != nil {
9✔
812
                return nil, err
9✔
UNCOV
813
        }
×
814
        if lwsMaxSurge > lwsReplicas {
×
815
                lwsMaxSurge = lwsReplicas
9✔
816
        }
9✔
UNCOV
817
        stsMaxUnavailableInt := int32(lwsMaxUnavailable + lwsMaxSurge)
×
818
        // lwsMaxUnavailable=0 and lwsMaxSurge=0 together should be blocked by webhook,
×
819
        // but just in case, we'll make sure that stsMaxUnavailable is at least 1.
10✔
820
        // This also handles the case when lws.Spec.Replicas is 0.
1✔
821
        if stsMaxUnavailableInt < 1 {
1✔
822
                stsMaxUnavailableInt = 1
9✔
823
        }
9✔
824
        stsMaxUnavailable := intstr.FromInt32(stsMaxUnavailableInt)
9✔
825

9✔
826
        // construct statefulset apply configuration
10✔
827
        statefulSetConfig := appsapplyv1.StatefulSet(lws.Name, lws.Namespace).
1✔
828
                WithSpec(appsapplyv1.StatefulSetSpec().
1✔
829
                        WithServiceName(lws.Name).
9✔
830
                        WithReplicas(replicas).
9✔
831
                        WithPodManagementPolicy(appsv1.ParallelPodManagement).
9✔
832
                        WithTemplate(&podTemplateApplyConfiguration).
9✔
833
                        WithUpdateStrategy(appsapplyv1.StatefulSetUpdateStrategy().WithType(appsv1.StatefulSetUpdateStrategyType(lws.Spec.RolloutStrategy.Type)).WithRollingUpdate(
9✔
834
                                appsapplyv1.RollingUpdateStatefulSetStrategy().WithMaxUnavailable(stsMaxUnavailable).WithPartition(partition),
9✔
835
                        )).
9✔
836
                        WithSelector(metaapplyv1.LabelSelector().
9✔
837
                                WithMatchLabels(map[string]string{
9✔
838
                                        leaderworkerset.SetNameLabelKey:     lws.Name,
9✔
839
                                        leaderworkerset.WorkerIndexLabelKey: "0",
9✔
840
                                }))).
9✔
841
                WithLabels(map[string]string{
9✔
842
                        leaderworkerset.SetNameLabelKey: lws.Name,
9✔
843
                        leaderworkerset.RevisionKey:     revisionKey,
9✔
844
                }).
9✔
845
                WithAnnotations(map[string]string{
9✔
846
                        leaderworkerset.ReplicasAnnotationKey: strconv.Itoa(int(*lws.Spec.Replicas)),
9✔
847
                })
9✔
848

9✔
849
        pvcApplyConfiguration := controllerutils.GetPVCApplyConfiguration(lws)
9✔
850
        if len(pvcApplyConfiguration) > 0 {
9✔
851
                statefulSetConfig.Spec.WithVolumeClaimTemplates(pvcApplyConfiguration...)
9✔
852
        }
9✔
853

9✔
854
        if lws.Spec.LeaderWorkerTemplate.PersistentVolumeClaimRetentionPolicy != nil {
9✔
855
                pvcRetentionPolicy := &appsapplyv1.StatefulSetPersistentVolumeClaimRetentionPolicyApplyConfiguration{
10✔
856
                        WhenDeleted: &lws.Spec.LeaderWorkerTemplate.PersistentVolumeClaimRetentionPolicy.WhenDeleted,
1✔
857
                        WhenScaled:  &lws.Spec.LeaderWorkerTemplate.PersistentVolumeClaimRetentionPolicy.WhenScaled,
1✔
858
                }
859
                statefulSetConfig.Spec.WithPersistentVolumeClaimRetentionPolicy(pvcRetentionPolicy)
10✔
860
        }
1✔
861
        return statefulSetConfig, nil
1✔
862
}
1✔
863

1✔
864
func makeCondition(conditionType leaderworkerset.LeaderWorkerSetConditionType) metav1.Condition {
1✔
865
        var condtype, reason, message string
1✔
866
        switch conditionType {
9✔
867
        case leaderworkerset.LeaderWorkerSetAvailable:
868
                condtype = string(leaderworkerset.LeaderWorkerSetAvailable)
UNCOV
869
                reason = "AllGroupsReady"
×
870
                message = "All replicas are ready"
×
871
        case leaderworkerset.LeaderWorkerSetUpdateInProgress:
×
872
                condtype = string(leaderworkerset.LeaderWorkerSetUpdateInProgress)
×
873
                reason = GroupsUpdating
×
874
                message = "Rolling Upgrade is in progress"
×
875
        default:
×
876
                condtype = string(leaderworkerset.LeaderWorkerSetProgressing)
×
877
                reason = GroupsProgressing
×
878
                message = "Replicas are progressing"
×
879
        }
×
880

×
881
        condition := metav1.Condition{
×
882
                Type:               condtype,
×
883
                Status:             metav1.ConditionStatus(corev1.ConditionTrue),
×
884
                LastTransitionTime: metav1.Now(),
885
                Reason:             reason,
UNCOV
886
                Message:            message,
×
887
        }
×
888
        return condition
×
889
}
×
890

×
891
func setConditions(lws *leaderworkerset.LeaderWorkerSet, conditions []metav1.Condition) bool {
×
892
        shouldUpdate := false
×
893
        for _, condition := range conditions {
×
894
                shouldUpdate = shouldUpdate || setCondition(lws, condition)
×
895
        }
896

UNCOV
897
        return shouldUpdate
×
898
}
×
899

×
900
func setCondition(lws *leaderworkerset.LeaderWorkerSet, newCondition metav1.Condition) bool {
×
901
        newCondition.LastTransitionTime = metav1.Now()
×
902
        found := false
UNCOV
903
        shouldUpdate := false
×
904

×
905
        // Precondition: newCondition has status true.
×
906
        for i, curCondition := range lws.Status.Conditions {
×
907
                if newCondition.Type == curCondition.Type {
×
908
                        if newCondition.Status != curCondition.Status {
909
                                // the conditions match but one is true and one is false. Update the stored condition
UNCOV
910
                                // with the new condition.
×
911
                                lws.Status.Conditions[i] = newCondition
912
                                shouldUpdate = true
913
                        }
5✔
914
                        // if both are true or both are false, do nothing.
5✔
915
                        found = true
5✔
916
                } else {
5✔
917
                        // if the conditions are not of the same type, do nothing unless one is Progressing and one is
5✔
918
                        // Available and both are true. Must be mutually exclusive.
5✔
919
                        if exclusiveConditionTypes(curCondition, newCondition) &&
9✔
920
                                (newCondition.Status == metav1.ConditionTrue) && (curCondition.Status == metav1.ConditionTrue) {
7✔
921
                                // Progressing is true and Available is true. Prevent this.
3✔
922
                                lws.Status.Conditions[i].Status = metav1.ConditionFalse
5✔
923
                                shouldUpdate = true
2✔
924
                        }
2✔
925
                }
2✔
926
        }
2✔
927
        // condition doesn't exist, update only if the status is true
2✔
928
        if newCondition.Status == metav1.ConditionTrue && !found {
929
                lws.Status.Conditions = append(lws.Status.Conditions, newCondition)
3✔
930
                shouldUpdate = true
1✔
931
        }
1✔
932
        return shouldUpdate
1✔
933
}
1✔
934

1✔
UNCOV
935
func exclusiveConditionTypes(condition1 metav1.Condition, condition2 metav1.Condition) bool {
×
936
        if (condition1.Type == string(leaderworkerset.LeaderWorkerSetAvailable) && condition2.Type == string(leaderworkerset.LeaderWorkerSetProgressing)) ||
×
937
                (condition1.Type == string(leaderworkerset.LeaderWorkerSetProgressing) && condition2.Type == string(leaderworkerset.LeaderWorkerSetAvailable)) {
×
938
                return true
×
939
        }
×
940

941
        if (condition1.Type == string(leaderworkerset.LeaderWorkerSetAvailable) && condition2.Type == string(leaderworkerset.LeaderWorkerSetUpdateInProgress)) ||
942
                (condition1.Type == string(leaderworkerset.LeaderWorkerSetUpdateInProgress) && condition2.Type == string(leaderworkerset.LeaderWorkerSetAvailable)) {
943
                return true
7✔
944
        }
2✔
945

2✔
946
        return false
2✔
947
}
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc