• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / lws / 22606993655

03 Mar 2026 03:30AM UTC coverage: 38.223% (+0.03%) from 38.191%
22606993655

push

github

web-flow
Fix odd number of arguments in log call for pending pod skip (#763)

Wrap the log message with fmt.Sprintf to properly format the pod name into the message string, instead of passing it as a separate argument to the structured logger. This fixes the DPANIC crash caused by an odd number of key-value pair arguments when RecreateGroupAfterStart/RecreateGroupAfterRestart is skipped due to a pending pod.

0 of 1 new or added line in 1 file covered. (0.0%)

175 existing lines in 3 files now uncovered.

912 of 2386 relevant lines covered (38.22%)

2.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.21
/pkg/controllers/leaderworkerset_controller.go
1
/*
2
Copyright 2023.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controllers
18

19
import (
20
        "context"
21
        "fmt"
22
        "strconv"
23
        "time"
24

25
        appsv1 "k8s.io/api/apps/v1"
26
        corev1 "k8s.io/api/core/v1"
27
        apierrors "k8s.io/apimachinery/pkg/api/errors"
28
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
        "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        "k8s.io/apimachinery/pkg/util/intstr"
33
        appsapplyv1 "k8s.io/client-go/applyconfigurations/apps/v1"
34
        coreapplyv1 "k8s.io/client-go/applyconfigurations/core/v1"
35
        metaapplyv1 "k8s.io/client-go/applyconfigurations/meta/v1"
36
        "k8s.io/client-go/tools/events"
37
        "k8s.io/klog/v2"
38
        "k8s.io/utils/ptr"
39
        ctrl "sigs.k8s.io/controller-runtime"
40
        "sigs.k8s.io/controller-runtime/pkg/client"
41
        "sigs.k8s.io/controller-runtime/pkg/handler"
42
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
43

44
        leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1"
45
        "sigs.k8s.io/lws/pkg/utils"
46
        controllerutils "sigs.k8s.io/lws/pkg/utils/controller"
47
        podutils "sigs.k8s.io/lws/pkg/utils/pod"
48
        revisionutils "sigs.k8s.io/lws/pkg/utils/revision"
49
        statefulsetutils "sigs.k8s.io/lws/pkg/utils/statefulset"
50
)
51

52
// LeaderWorkerSetReconciler reconciles a LeaderWorkerSet object
53
type LeaderWorkerSetReconciler struct {
54
        client.Client
55
        Scheme *runtime.Scheme
56
        Record events.EventRecorder
57
}
58

59
var (
60
        apiGVStr = leaderworkerset.GroupVersion.String()
61
)
62

63
const (
64
        lwsOwnerKey  = ".metadata.controller"
65
        fieldManager = "lws"
66
)
67

68
const (
69
        // FailedCreate Event reason used when a resource creation fails.
70
        // The event uses the error(s) as the reason.
71
        FailedCreate      = "FailedCreate"
72
        GroupsProgressing = "GroupsProgressing"
73
        GroupsUpdating    = "GroupsUpdating"
74
        CreatingRevision  = "CreatingRevision"
75

76
        // Event actions
77
        Create = "Create"
78
        Update = "Update"
79
        Delete = "Delete"
80
)
81

82
func NewLeaderWorkerSetReconciler(client client.Client, scheme *runtime.Scheme, record events.EventRecorder) *LeaderWorkerSetReconciler {
×
83
        return &LeaderWorkerSetReconciler{
×
UNCOV
84
                Client: client,
×
UNCOV
85
                Scheme: scheme,
×
UNCOV
86
                Record: record,
×
UNCOV
87
        }
×
UNCOV
88
}
×
89

90
//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch;get;list
91
//+kubebuilder:rbac:groups=events.k8s.io,resources=events,verbs=create;watch;update;patch;get;list
92
//+kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets,verbs=get;list;watch;create;update;patch;delete
93
//+kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets/status,verbs=get;update;patch
94
//+kubebuilder:rbac:groups=leaderworkerset.x-k8s.io,resources=leaderworkersets/finalizers,verbs=update
95
//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
96
//+kubebuilder:rbac:groups=apps,resources=statefulsets/status,verbs=get;update;patch
97
//+kubebuilder:rbac:groups=apps,resources=statefulsets/finalizers,verbs=update
98
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
99
//+kubebuilder:rbac:groups=apps,resources=controllerrevisions,verbs=get;list;watch;create;update;patch;delete
100
//+kubebuilder:rbac:groups=apps,resources=controllerrevisions/status,verbs=get;update;patch
101
//+kubebuilder:rbac:groups=apps,resources=controllerrevisions/finalizers,verbs=update
102

103
func (r *LeaderWorkerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
×
UNCOV
104
        // Get leaderworkerset object
×
105
        lws := &leaderworkerset.LeaderWorkerSet{}
×
106
        if err := r.Get(ctx, types.NamespacedName{Name: req.Name, Namespace: req.Namespace}, lws); err != nil {
×
107
                return ctrl.Result{}, client.IgnoreNotFound(err)
×
UNCOV
108
        }
×
109

110
        if lws.DeletionTimestamp != nil {
×
111
                return ctrl.Result{}, nil
×
112
        }
×
113

114
        log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws))
×
115
        ctx = ctrl.LoggerInto(ctx, log)
×
116

×
UNCOV
117
        leaderSts, err := r.getLeaderStatefulSet(ctx, lws)
×
118
        if err != nil {
×
119
                log.Error(err, "Fetching leader statefulset")
×
120
                return ctrl.Result{}, err
×
UNCOV
121
        }
×
122

UNCOV
123
        if leaderSts != nil && leaderSts.DeletionTimestamp != nil {
×
UNCOV
124
                return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
×
125
        }
×
126

127
        // Handles two cases:
128
        // Case 1: Upgrading the LWS controller from a version that doesn't support controller revision
129
        // Case 2: Creating the controller revision for a newly created LWS object
UNCOV
130
        revision, err := r.getOrCreateRevisionIfNonExist(ctx, leaderSts, lws, r.Record)
×
131
        if err != nil {
×
132
                log.Error(err, "Creating controller revision")
×
133
                return ctrl.Result{}, err
×
134
        }
×
135

136
        updatedRevision, err := r.getUpdatedRevision(ctx, leaderSts, lws, revision)
×
137
        if err != nil {
×
138
                log.Error(err, "Validating if LWS has been updated")
×
139
                return ctrl.Result{}, err
×
140
        }
×
141
        lwsUpdated := updatedRevision != nil
×
142
        if lwsUpdated {
×
143
                revision, err = revisionutils.CreateRevision(ctx, r.Client, updatedRevision)
×
UNCOV
144
                if err != nil {
×
UNCOV
145
                        log.Error(err, "Creating revision for updated LWS")
×
146
                        return ctrl.Result{}, err
×
147
                }
×
148
                r.Record.Eventf(lws, revision, corev1.EventTypeNormal, CreatingRevision, Create, fmt.Sprintf("Creating revision with key %s for updated LWS", revisionutils.GetRevisionKey(revision)))
×
149
        }
150

UNCOV
151
        partition, replicas, err := r.rollingUpdateParameters(ctx, lws, leaderSts, revisionutils.GetRevisionKey(revision), lwsUpdated)
×
152
        if err != nil {
×
153
                log.Error(err, "Rolling partition error")
×
154
                return ctrl.Result{}, err
×
155
        }
×
156

UNCOV
157
        if err := r.SSAWithStatefulset(ctx, lws, partition, replicas, revisionutils.GetRevisionKey(revision)); err != nil {
×
UNCOV
158
                if leaderSts == nil {
×
159
                        r.Record.Eventf(lws, nil, corev1.EventTypeWarning, FailedCreate, Create, fmt.Sprintf("Failed to create leader statefulset %s", lws.Name))
×
160
                }
×
161
                return ctrl.Result{}, err
×
162
        }
163

164
        if leaderSts == nil {
×
165
                // An event is logged to track sts creation.
×
UNCOV
166
                r.Record.Eventf(lws, revision, corev1.EventTypeNormal, GroupsProgressing, Create, fmt.Sprintf("Created leader statefulset %s", lws.Name))
×
UNCOV
167
        } else if !lwsUpdated && partition != *leaderSts.Spec.UpdateStrategy.RollingUpdate.Partition {
×
168
                // An event is logged to track update progress.
×
169
                r.Record.Eventf(lws, revision, corev1.EventTypeNormal, GroupsUpdating, Update, fmt.Sprintf("Updating replicas %d to %d", *leaderSts.Spec.UpdateStrategy.RollingUpdate.Partition, partition))
×
170
        }
×
171

172
        // Create headless service if it does not exist.
173
        if err := r.reconcileHeadlessServices(ctx, lws); err != nil {
×
UNCOV
174
                log.Error(err, "Creating headless service.")
×
175
                r.Record.Eventf(lws, nil, corev1.EventTypeWarning, FailedCreate, Create, fmt.Sprintf("Failed to create headless service for error: %v", err))
×
176
                return ctrl.Result{}, err
×
177
        }
×
178

179
        updateDone, err := r.updateStatus(ctx, lws, revisionutils.GetRevisionKey(revision))
×
180
        if err != nil {
×
UNCOV
181
                if apierrors.IsConflict(err) {
×
UNCOV
182
                        return ctrl.Result{Requeue: true}, nil
×
183
                }
×
184
                return ctrl.Result{}, err
×
185
        }
186

UNCOV
187
        if updateDone {
×
188
                if err := revisionutils.TruncateRevisions(ctx, r.Client, lws, revisionutils.GetRevisionKey(revision)); err != nil {
×
189
                        return ctrl.Result{}, err
×
UNCOV
190
                }
×
191
        }
192
        log.V(2).Info("Leader Reconcile completed.")
×
193
        return ctrl.Result{}, nil
×
194
}
195

196
func (r *LeaderWorkerSetReconciler) reconcileHeadlessServices(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) error {
×
197
        if lws.Spec.NetworkConfig == nil || *lws.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainShared {
×
UNCOV
198
                if err := controllerutils.CreateHeadlessServiceIfNotExists(ctx, r.Client, r.Scheme, lws, lws.Name, map[string]string{leaderworkerset.SetNameLabelKey: lws.Name}, lws); err != nil {
×
199
                        return err
×
UNCOV
200
                }
×
UNCOV
201
                return nil
×
202
        }
203
        return nil
×
204
}
205

206
// SetupWithManager sets up the controller with the Manager.
207
func (r *LeaderWorkerSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
×
208
        return ctrl.NewControllerManagedBy(mgr).
×
209
                For(&leaderworkerset.LeaderWorkerSet{}).
×
210
                Owns(&appsv1.StatefulSet{}).
×
211
                Owns(&corev1.Service{}).
×
212
                Watches(&appsv1.StatefulSet{},
×
213
                        handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, a client.Object) []reconcile.Request {
×
214
                                return []reconcile.Request{
×
215
                                        {NamespacedName: types.NamespacedName{
×
216
                                                Name:      a.GetLabels()[leaderworkerset.SetNameLabelKey],
×
UNCOV
217
                                                Namespace: a.GetNamespace(),
×
UNCOV
218
                                        }},
×
UNCOV
219
                                }
×
220
                        })).
×
221
                Complete(r)
222
}
223

224
func SetupIndexes(indexer client.FieldIndexer) error {
×
225
        return indexer.IndexField(context.Background(), &appsv1.StatefulSet{}, lwsOwnerKey, func(rawObj client.Object) []string {
×
226
                // grab the statefulSet object, extract the owner...
×
227
                statefulSet := rawObj.(*appsv1.StatefulSet)
×
UNCOV
228
                owner := metav1.GetControllerOf(statefulSet)
×
229
                if owner == nil {
×
230
                        return nil
×
231
                }
×
232
                // ...make sure it's a LeaderWorkerSet...
233
                if owner.APIVersion != apiGVStr || owner.Kind != "LeaderWorkerSet" {
×
UNCOV
234
                        return nil
×
UNCOV
235
                }
×
236
                // ...and if so, return it
UNCOV
237
                return []string{owner.Name}
×
238
        })
239
}
240

241
// Rolling update will always wait for the former replica to be ready then process the next one,
242
// we didn't consider rollout strategy type here since we only support rollingUpdate now,
243
// once we have more policies, we should update the logic here.
244
// Possible scenarios for Partition:
245
//   - When sts is under creation, partition is always 0 because pods are created in parallel, rolling update is not relevant here.
246
//   - When sts is in rolling update, the partition will start from the last index to the index 0 processing in maxUnavailable step.
247
//   - When sts is in rolling update, and Replicas increases, we'll delay the rolling update until the scaling up is done,
248
//     Partition will not change, new replicas are created using the new template from the get go.
249
//   - When sts is rolling update, and Replicas decreases, the partition will not change until new Replicas < Partition,
250
//     in which case Partition will be reset to the new Replicas value.
251
//   - When sts is ready for a rolling update and Replicas increases at the same time, we'll delay the rolling update until
252
//     the scaling up is done.
253
//   - When sts is ready for a rolling update and Replicas decreases at the same time, we'll start the rolling update
254
//     together with scaling down.
255
//
256
// At rest, Partition should always be zero.
257
//
258
// For Replicas:
259
//   - When rolling update, Replicas is equal to (spec.Replicas+maxSurge)
260
//   - Otherwise, Replicas is equal to spec.Replicas
261
//   - One exception here is when unready replicas of leaderWorkerSet is equal to MaxSurge,
262
//     we should reclaim the extra replicas gradually to accommodate for the new replicas.
263
func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, sts *appsv1.StatefulSet, revisionKey string, leaderWorkerSetUpdated bool) (stsPartition int32, replicas int32, err error) {
×
264
        log := ctrl.LoggerFrom(ctx).WithValues("leaderworkerset", klog.KObj(lws))
×
265
        ctx = ctrl.LoggerInto(ctx, log)
×
266
        lwsReplicas := *lws.Spec.Replicas
×
267

×
UNCOV
268
        defer func() {
×
UNCOV
269
                // Limit the replicas with less than lwsPartition will not be updated.
×
UNCOV
270
                stsPartition = max(stsPartition, *lws.Spec.RolloutStrategy.RollingUpdateConfiguration.Partition)
×
UNCOV
271
        }()
×
272

273
        // Case 1:
274
        // If sts not created yet, all partitions should be updated,
275
        // replicas should not change.
276
        if sts == nil {
×
277
                return 0, lwsReplicas, nil
×
278
        }
×
279

280
        stsReplicas := *sts.Spec.Replicas
×
UNCOV
281
        maxSurge, err := intstr.GetScaledValueFromIntOrPercent(&lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxSurge, int(lwsReplicas), true)
×
282
        if err != nil {
×
283
                return 0, 0, err
×
284
        }
×
285
        // No need to burst more than the replicas.
286
        if maxSurge > int(lwsReplicas) {
×
287
                maxSurge = int(lwsReplicas)
×
288
        }
×
289
        burstReplicas := lwsReplicas + int32(maxSurge)
×
290

×
291
        // wantReplicas calculates the final replicas if needed.
×
292
        wantReplicas := func(unreadyReplicas int32) int32 {
×
293
                if unreadyReplicas <= int32(maxSurge) {
×
294
                        // When we have n unready replicas and n bursted replicas, we should
×
295
                        // start to release the burst replica gradually for the accommodation of
×
296
                        // the unready ones.
×
297
                        finalReplicas := lwsReplicas + utils.NonZeroValue(int32(unreadyReplicas)-1)
×
UNCOV
298
                        r.Record.Eventf(lws, nil, corev1.EventTypeNormal, GroupsProgressing, Delete, fmt.Sprintf("deleting surge replica %s-%d", lws.Name, finalReplicas))
×
UNCOV
299
                        return finalReplicas
×
UNCOV
300
                }
×
UNCOV
301
                return burstReplicas
×
302
        }
303

304
        // Case 2:
305
        // Indicates a new rolling update here.
UNCOV
306
        if leaderWorkerSetUpdated {
×
307
                // Processing scaling up/down first prior to rolling update.
×
308
                return min(lwsReplicas, stsReplicas), wantReplicas(lwsReplicas), nil
×
309
        }
×
310

311
        partition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition
×
312
        rollingUpdateCompleted := partition == 0 && stsReplicas == lwsReplicas
×
313
        // Case 3:
×
UNCOV
314
        // In normal cases, return the values directly.
×
315
        if rollingUpdateCompleted {
×
316
                return 0, lwsReplicas, nil
×
317
        }
×
318

319
        states, err := r.getReplicaStates(ctx, lws, stsReplicas, revisionKey)
×
320
        if err != nil {
×
321
                return 0, 0, err
×
322
        }
×
323
        lwsUnreadyReplicas := calculateLWSUnreadyReplicas(states, lwsReplicas)
×
324

×
325
        originalLwsReplicas, err := strconv.Atoi(sts.Annotations[leaderworkerset.ReplicasAnnotationKey])
×
326
        if err != nil {
×
327
                return 0, 0, err
×
328
        }
×
329
        replicasUpdated := originalLwsReplicas != int(*lws.Spec.Replicas)
×
330
        // Case 4:
×
UNCOV
331
        // Replicas changed during rolling update.
×
UNCOV
332
        if replicasUpdated {
×
UNCOV
333
                return min(partition, burstReplicas), wantReplicas(lwsUnreadyReplicas), nil
×
UNCOV
334
        }
×
335

336
        // Case 5:
337
        // Calculating the Partition during rolling update, no leaderWorkerSet updates happens.
338

UNCOV
339
        rollingStep, err := intstr.GetScaledValueFromIntOrPercent(&lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxUnavailable, int(lwsReplicas), false)
×
UNCOV
340
        if err != nil {
×
341
                return 0, 0, err
×
342
        }
×
343
        // Make sure that we always respect the maxUnavailable, or
344
        // we'll violate it when reclaiming bursted replicas.
UNCOV
345
        rollingStep += maxSurge - (int(burstReplicas) - int(stsReplicas))
×
346

×
347
        return rollingUpdatePartition(states, stsReplicas, int32(rollingStep), partition), wantReplicas(lwsUnreadyReplicas), nil
×
348
}
349

350
func (r *LeaderWorkerSetReconciler) SSAWithStatefulset(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32, revisionKey string) error {
×
351
        log := ctrl.LoggerFrom(ctx)
×
352

×
353
        // construct the statefulset apply configuration
×
354
        leaderStatefulSetApplyConfig, err := constructLeaderStatefulSetApplyConfiguration(lws, partition, replicas, revisionKey)
×
355
        if err != nil {
×
356
                log.Error(err, "Constructing StatefulSet apply configuration.")
×
357
                return err
×
358
        }
×
359
        if err := setControllerReferenceWithStatefulSet(lws, leaderStatefulSetApplyConfig, r.Scheme); err != nil {
×
360
                log.Error(err, "Setting controller reference.")
×
361
                return err
×
362
        }
×
363
        obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(leaderStatefulSetApplyConfig)
×
364
        if err != nil {
×
365
                log.Error(err, "Converting StatefulSet configuration to json.")
×
366
                return err
×
367
        }
×
368
        patch := &unstructured.Unstructured{
×
369
                Object: obj,
×
370
        }
×
371
        // Use server side apply and add fieldmanager to the lws owned fields
×
372
        // If there are conflicts in the fields owned by the lws controller, lws will obtain the ownership and force override
×
373
        // these fields to the ones desired by the lws controller
×
374
        // TODO b/316776287 add E2E test for SSA
×
375
        // TODO: Deprecated: Use client.Client.Apply() and client.Client.SubResource("subrsource").Apply() instead.
×
376
        err = r.Patch(ctx, patch, client.Apply, &client.PatchOptions{ //nolint
×
377
                FieldManager: fieldManager,
×
378
                Force:        ptr.To[bool](true),
×
379
        })
×
UNCOV
380
        if err != nil {
×
381
                log.Error(err, "Using server side apply to update leader statefulset")
×
UNCOV
382
                return err
×
UNCOV
383
        }
×
384

385
        return nil
×
386
}
387

388
// updates the condition of the leaderworkerset to either Progressing or Available.
389
func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, revisionKey string) (bool, bool, error) {
×
390
        log := ctrl.LoggerFrom(ctx)
×
391
        podSelector := client.MatchingLabels(map[string]string{
×
392
                leaderworkerset.SetNameLabelKey:     lws.Name,
×
393
                leaderworkerset.WorkerIndexLabelKey: "0",
×
394
        })
×
395
        leaderPodList := &corev1.PodList{}
×
UNCOV
396
        if err := r.List(ctx, leaderPodList, podSelector, client.InNamespace(lws.Namespace)); err != nil {
×
397
                log.Error(err, "Fetching leaderPods")
×
398
                return false, false, err
×
399
        }
×
400

401
        updateStatus := false
×
402
        readyCount, updatedCount, readyNonBurstWorkerCount := 0, 0, 0
×
403
        partitionedUpdatedNonBurstCount, partitionedCurrentNonBurstCount, partitionedUpdatedAndReadyCount := 0, 0, 0
×
404
        noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1
×
405
        lwsPartition := *lws.Spec.RolloutStrategy.RollingUpdateConfiguration.Partition
×
406

×
407
        // Iterate through all leaderPods.
×
408
        for _, pod := range leaderPodList.Items {
×
UNCOV
409
                index, err := strconv.Atoi(pod.Labels[leaderworkerset.GroupIndexLabelKey])
×
410
                if err != nil {
×
411
                        return false, false, err
×
412
                }
×
413

414
                var sts appsv1.StatefulSet
×
415
                if !noWorkerSts {
×
416
                        if err := r.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: pod.Name}, &sts); err != nil {
×
417
                                if client.IgnoreNotFound(err) != nil {
×
UNCOV
418
                                        log.Error(err, "Fetching worker statefulSet")
×
UNCOV
419
                                        return false, false, err
×
UNCOV
420
                                }
×
421
                                continue
×
422
                        }
423
                }
424

425
                if index < int(*lws.Spec.Replicas) && index >= int(lwsPartition) {
×
426
                        partitionedCurrentNonBurstCount++
×
427
                }
×
428

429
                var ready, updated bool
×
430
                if (noWorkerSts || statefulsetutils.StatefulsetReady(sts)) && podutils.PodRunningAndReady(pod) {
×
431
                        ready = true
×
432
                        readyCount++
×
433
                }
×
434
                if (noWorkerSts || revisionutils.GetRevisionKey(&sts) == revisionKey) && revisionutils.GetRevisionKey(&pod) == revisionKey {
×
435
                        updated = true
×
436
                        updatedCount++
×
UNCOV
437
                        if index < int(*lws.Spec.Replicas) && index >= int(lwsPartition) {
×
UNCOV
438
                                // Bursted replicas do not count when determining if rollingUpdate has been completed.
×
439
                                partitionedUpdatedNonBurstCount++
×
440
                        }
×
441
                }
442

443
                if index < int(*lws.Spec.Replicas) {
×
444
                        if ready {
×
445
                                readyNonBurstWorkerCount++
×
UNCOV
446
                        }
×
UNCOV
447
                        if index >= int(lwsPartition) && ready && updated {
×
UNCOV
448
                                partitionedUpdatedAndReadyCount++
×
449
                        }
×
450
                }
451
        }
452

UNCOV
453
        if lws.Status.ReadyReplicas != int32(readyCount) {
×
454
                lws.Status.ReadyReplicas = int32(readyCount)
×
455
                updateStatus = true
×
456
        }
×
457

UNCOV
458
        if lws.Status.UpdatedReplicas != int32(updatedCount) {
×
459
                lws.Status.UpdatedReplicas = int32(updatedCount)
×
460
                updateStatus = true
×
461
        }
×
462

463
        var conditions []metav1.Condition
×
464
        if partitionedUpdatedNonBurstCount < partitionedCurrentNonBurstCount {
×
465
                // upgradeInProgress is true when the upgrade replicas is smaller than the expected
×
466
                // number of total replicas not including the burst replicas
×
467
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetUpdateInProgress))
×
468
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetProgressing))
×
469
        } else if readyNonBurstWorkerCount == int(*lws.Spec.Replicas) && partitionedUpdatedAndReadyCount == partitionedCurrentNonBurstCount {
×
UNCOV
470
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetAvailable))
×
UNCOV
471
        } else {
×
472
                conditions = append(conditions, makeCondition(leaderworkerset.LeaderWorkerSetProgressing))
×
473
        }
×
474

475
        // updateDone is true when all replicas are updated and ready
476
        updateDone := (lwsPartition == 0) && partitionedUpdatedAndReadyCount == int(*lws.Spec.Replicas)
×
477

×
478
        updateCondition := setConditions(lws, conditions)
×
479
        // if condition changed, record events
×
UNCOV
480
        if updateCondition {
×
UNCOV
481
                r.Record.Eventf(lws, nil, corev1.EventTypeNormal, conditions[0].Reason, Update, conditions[0].Message+fmt.Sprintf(", with %d groups ready of total %d groups", readyCount, int(*lws.Spec.Replicas)))
×
UNCOV
482
        }
×
483
        return updateStatus || updateCondition, updateDone, nil
×
484
}
485

486
// Updates status and condition of LeaderWorkerSet and returns whether or not an update actually occurred.
487
func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, revisionKey string) (bool, error) {
×
488
        updateStatus := false
×
489
        log := ctrl.LoggerFrom(ctx)
×
490

×
491
        // Retrieve the leader StatefulSet.
×
492
        sts := &appsv1.StatefulSet{}
×
UNCOV
493
        if err := r.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, sts); err != nil {
×
UNCOV
494
                log.Error(err, "Error retrieving leader StatefulSet")
×
495
                return false, err
×
496
        }
×
497

498
        // retrieve the current number of replicas -- the number of leaders
499
        replicas := int(sts.Status.Replicas)
×
UNCOV
500
        if lws.Status.Replicas != int32(replicas) {
×
501
                lws.Status.Replicas = int32(replicas)
×
502
                updateStatus = true
×
503
        }
×
504

505
        if lws.Status.HPAPodSelector == "" {
×
506
                labelSelector := &metav1.LabelSelector{
×
507
                        MatchLabels: map[string]string{
×
508
                                leaderworkerset.SetNameLabelKey:     lws.Name,
×
509
                                leaderworkerset.WorkerIndexLabelKey: "0", // select leaders
×
510
                        },
×
511
                }
×
512
                selector, err := metav1.LabelSelectorAsSelector(labelSelector)
×
UNCOV
513
                if err != nil {
×
514
                        log.Error(err, "Converting label selector to selector")
×
515
                        return false, err
×
UNCOV
516
                }
×
517

UNCOV
518
                lws.Status.HPAPodSelector = selector.String()
×
519
                updateStatus = true
×
520
        }
521

522
        // check if an update is needed
UNCOV
523
        updateConditions, updateDone, err := r.updateConditions(ctx, lws, revisionKey)
×
524
        if err != nil {
×
525
                return false, err
×
526
        }
×
527

528
        if updateStatus || updateConditions {
×
529
                if err := r.Status().Update(ctx, lws); err != nil {
×
UNCOV
530
                        if !apierrors.IsConflict(err) {
×
UNCOV
531
                                log.Error(err, "Updating LeaderWorkerSet status and/or condition.")
×
532
                        }
×
UNCOV
533
                        return false, err
×
534
                }
535
        }
UNCOV
536
        return updateDone, nil
×
537
}
538

539
type replicaState struct {
540
        // ready indicates whether both the leader pod and its worker statefulset (if any) are ready.
541
        ready bool
542
        // updated indicates whether both the leader pod and its worker statefulset (if any) are updated to the latest revision.
543
        updated bool
544
}
545

546
func (r *LeaderWorkerSetReconciler) getReplicaStates(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet, stsReplicas int32, revisionKey string) ([]replicaState, error) {
×
547
        states := make([]replicaState, stsReplicas)
×
548

×
549
        podSelector := client.MatchingLabels(map[string]string{
×
550
                leaderworkerset.SetNameLabelKey:     lws.Name,
×
551
                leaderworkerset.WorkerIndexLabelKey: "0",
×
552
        })
×
UNCOV
553
        var leaderPodList corev1.PodList
×
UNCOV
554
        if err := r.List(ctx, &leaderPodList, podSelector, client.InNamespace(lws.Namespace)); err != nil {
×
UNCOV
555
                return nil, err
×
556
        }
×
557

558
        // Get a sorted leader pod list matches with the following sorted statefulsets one by one, which means
559
        // the leader pod and the corresponding worker statefulset has the same index.
560
        sortedPods := utils.SortByIndex(func(pod corev1.Pod) (int, error) {
×
561
                return strconv.Atoi(pod.Labels[leaderworkerset.GroupIndexLabelKey])
×
562
        }, leaderPodList.Items, int(stsReplicas))
×
563

564
        stsSelector := client.MatchingLabels(map[string]string{
×
565
                leaderworkerset.SetNameLabelKey: lws.Name,
×
566
        })
×
567
        var stsList appsv1.StatefulSetList
×
568
        if err := r.List(ctx, &stsList, stsSelector, client.InNamespace(lws.Namespace)); err != nil {
×
569
                return nil, err
×
UNCOV
570
        }
×
UNCOV
571
        sortedSts := utils.SortByIndex(func(sts appsv1.StatefulSet) (int, error) {
×
572
                return strconv.Atoi(sts.Labels[leaderworkerset.GroupIndexLabelKey])
×
573
        }, stsList.Items, int(stsReplicas))
×
574

575
        // Once size==1, no worker statefulSets will be created.
576
        noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1
×
577

×
578
        for idx := int32(0); idx < stsReplicas; idx++ {
×
579
                nominatedName := fmt.Sprintf("%s-%d", lws.Name, idx)
×
580
                // It can happen that the leader pod or the worker statefulset hasn't created yet
×
581
                // or under rebuilding, which also indicates not ready.
×
582
                if nominatedName != sortedPods[idx].Name || (!noWorkerSts && nominatedName != sortedSts[idx].Name) {
×
583
                        states[idx] = replicaState{
×
UNCOV
584
                                ready:   false,
×
UNCOV
585
                                updated: false,
×
586
                        }
×
587
                        continue
×
588
                }
589

590
                leaderUpdated := revisionutils.GetRevisionKey(&sortedPods[idx]) == revisionKey
×
591
                leaderReady := podutils.PodRunningAndReady(sortedPods[idx])
×
592

×
593
                if noWorkerSts {
×
594
                        states[idx] = replicaState{
×
UNCOV
595
                                ready:   leaderReady,
×
UNCOV
596
                                updated: leaderUpdated,
×
597
                        }
×
598
                        continue
×
599
                }
600

601
                workersUpdated := revisionutils.GetRevisionKey(&sortedSts[idx]) == revisionKey
×
602
                workersReady := statefulsetutils.StatefulsetReady(sortedSts[idx])
×
603

×
UNCOV
604
                states[idx] = replicaState{
×
UNCOV
605
                        ready:   leaderReady && workersReady,
×
606
                        updated: leaderUpdated && workersUpdated,
×
UNCOV
607
                }
×
608
        }
609

610
        return states, nil
×
611
}
612

613
func rollingUpdatePartition(states []replicaState, stsReplicas int32, rollingStep int32, currentPartition int32) int32 {
×
614
        continuousReadyReplicas := calculateContinuousReadyReplicas(states)
×
615

×
616
        // Update up to rollingStep replicas at once.
×
617
        var rollingStepPartition = utils.NonZeroValue(stsReplicas - continuousReadyReplicas - rollingStep)
×
618

×
619
        // rollingStepPartition calculation above disregards the state of replicas with idx<rollingStepPartition.
×
620
        // To prevent violating the maxUnavailable, we have to account for these replicas and increase the partition if some are not ready.
×
621
        var unavailable int32
×
UNCOV
622
        for idx := 0; idx < int(rollingStepPartition); idx++ {
×
623
                if !states[idx].ready {
×
624
                        unavailable++
×
625
                }
×
626
        }
627
        var partition = rollingStepPartition + unavailable
×
628

×
629
        // Reduce the partition if replicas are continuously not ready. It is safe since updating these replicas does not impact
×
630
        // the availability of the LWS. This is important to prevent update from getting stuck in case maxUnavailable is already violated
×
631
        // (for example, all replicas are not ready when rolling update is started).
×
632
        // Note that we never drop the partition below rolliingStepPartition.
×
633
        for idx := min(partition, stsReplicas-1); idx >= rollingStepPartition; idx-- {
×
UNCOV
634
                if !states[idx].ready || states[idx].updated {
×
UNCOV
635
                        partition = idx
×
UNCOV
636
                } else {
×
UNCOV
637
                        break
×
638
                }
639
        }
640

641
        // That means Partition moves in one direction to make it simple.
642
        return min(partition, currentPartition)
×
643
}
644

645
func calculateLWSUnreadyReplicas(states []replicaState, lwsReplicas int32) int32 {
×
646
        var unreadyCount int32
×
UNCOV
647
        for idx := int32(0); idx < lwsReplicas; idx++ {
×
648
                if idx >= int32(len(states)) || !states[idx].ready || !states[idx].updated {
×
UNCOV
649
                        unreadyCount++
×
UNCOV
650
                }
×
651
        }
652
        return unreadyCount
×
653
}
654

655
func calculateContinuousReadyReplicas(states []replicaState) int32 {
×
656
        // Count ready replicas at tail (from last index down)
×
UNCOV
657
        var continuousReadyCount int32
×
658
        for idx := len(states) - 1; idx >= 0; idx-- {
×
UNCOV
659
                if !states[idx].ready || !states[idx].updated {
×
660
                        break
×
661
                }
UNCOV
662
                continuousReadyCount++
×
663
        }
664
        return continuousReadyCount
×
665
}
666

667
func (r *LeaderWorkerSetReconciler) getLeaderStatefulSet(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) (*appsv1.StatefulSet, error) {
×
668
        sts := &appsv1.StatefulSet{}
×
669
        err := r.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, sts)
×
670
        if err != nil {
×
UNCOV
671
                if apierrors.IsNotFound(err) {
×
672
                        return nil, nil
×
UNCOV
673
                }
×
UNCOV
674
                return nil, err
×
675
        }
676
        return sts, nil
×
677
}
678

679
func (r *LeaderWorkerSetReconciler) getOrCreateRevisionIfNonExist(ctx context.Context, sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet, recorder events.EventRecorder) (*appsv1.ControllerRevision, error) {
×
680
        revisionKey := ""
×
681
        if sts != nil {
×
682
                // Uses the hash in the leader sts to avoid detecting update in the case where LWS controller is upgraded from a version where
×
683
                // the revisionKey was used to detect update instead of controller revision.
×
684
                revisionKey = revisionutils.GetRevisionKey(sts)
×
685
        }
×
686
        if stsRevision, err := revisionutils.GetRevision(ctx, r.Client, lws, revisionKey); stsRevision != nil || err != nil {
×
687
                return stsRevision, err
×
688
        }
×
689
        revision, err := revisionutils.NewRevision(ctx, r.Client, lws, revisionKey)
×
690
        if err != nil {
×
691
                return nil, err
×
692
        }
×
693
        newRevision, err := revisionutils.CreateRevision(ctx, r.Client, revision)
×
694
        if err == nil {
×
695
                message := fmt.Sprintf("Creating revision with key %s for a newly created LeaderWorkerSet", revision.Labels[leaderworkerset.RevisionKey])
×
UNCOV
696
                if revisionKey != "" {
×
697
                        message = fmt.Sprintf("Creating missing revision with key %s for existing LeaderWorkerSet", revision.Labels[leaderworkerset.RevisionKey])
×
UNCOV
698
                }
×
UNCOV
699
                recorder.Eventf(lws, newRevision, corev1.EventTypeNormal, CreatingRevision, Create, message)
×
700
        }
701
        return newRevision, err
×
702
}
703

UNCOV
704
func (r *LeaderWorkerSetReconciler) getUpdatedRevision(ctx context.Context, sts *appsv1.StatefulSet, lws *leaderworkerset.LeaderWorkerSet, revision *appsv1.ControllerRevision) (*appsv1.ControllerRevision, error) {
×
705
        if sts == nil {
×
706
                return nil, nil
×
707
        }
×
708

UNCOV
709
        currentRevision, err := revisionutils.NewRevision(ctx, r.Client, lws, "")
×
710
        if err != nil {
×
711
                return nil, err
×
712
        }
×
713

714
        if !revisionutils.EqualRevision(currentRevision, revision) {
×
UNCOV
715
                return currentRevision, nil
×
UNCOV
716
        }
×
717

UNCOV
718
        return nil, nil
×
719
}
720

721
// constructLeaderStatefulSetApplyConfiguration constructs the applied configuration for the leader StatefulSet
722
func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWorkerSet, partition, replicas int32, revisionKey string) (*appsapplyv1.StatefulSetApplyConfiguration, error) {
6✔
723
        var podTemplateSpec corev1.PodTemplateSpec
6✔
724
        if lws.Spec.LeaderWorkerTemplate.LeaderTemplate != nil {
8✔
725
                podTemplateSpec = *lws.Spec.LeaderWorkerTemplate.LeaderTemplate.DeepCopy()
2✔
726
        } else {
6✔
727
                podTemplateSpec = *lws.Spec.LeaderWorkerTemplate.WorkerTemplate.DeepCopy()
4✔
728
        }
4✔
729
        // construct pod template spec configuration
730
        obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&podTemplateSpec)
6✔
731
        if err != nil {
6✔
UNCOV
732
                return nil, err
×
733
        }
×
734
        var podTemplateApplyConfiguration coreapplyv1.PodTemplateSpecApplyConfiguration
6✔
735
        err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj, &podTemplateApplyConfiguration)
6✔
736
        if err != nil {
6✔
UNCOV
737
                return nil, err
×
UNCOV
738
        }
×
739

740
        podTemplateApplyConfiguration.WithLabels(map[string]string{
6✔
741
                leaderworkerset.WorkerIndexLabelKey: "0",
6✔
742
                leaderworkerset.SetNameLabelKey:     lws.Name,
6✔
743
                leaderworkerset.RevisionKey:         revisionKey,
6✔
744
        })
6✔
745
        podAnnotations := make(map[string]string)
6✔
746
        podAnnotations[leaderworkerset.SizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.Size))
6✔
747
        if lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey] != "" {
8✔
748
                podAnnotations[leaderworkerset.ExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey]
2✔
749
        }
2✔
750
        if lws.Spec.LeaderWorkerTemplate.SubGroupPolicy != nil {
7✔
751
                podAnnotations[leaderworkerset.SubGroupPolicyTypeAnnotationKey] = (string(*lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.Type))
1✔
752
                podAnnotations[leaderworkerset.SubGroupSizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.SubGroupSize))
1✔
753
                if lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] != "" {
2✔
754
                        podAnnotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey]
1✔
755
                }
1✔
756
        }
757

758
        if lws.Spec.NetworkConfig != nil && *lws.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainUniquePerReplica {
6✔
UNCOV
759
                podAnnotations[leaderworkerset.SubdomainPolicyAnnotationKey] = string(leaderworkerset.SubdomainUniquePerReplica)
×
UNCOV
760
        }
×
761

762
        podTemplateApplyConfiguration.WithAnnotations(podAnnotations)
6✔
763

6✔
764
        // construct statefulset apply configuration
6✔
765
        statefulSetConfig := appsapplyv1.StatefulSet(lws.Name, lws.Namespace).
6✔
766
                WithSpec(appsapplyv1.StatefulSetSpec().
6✔
767
                        WithServiceName(lws.Name).
6✔
768
                        WithReplicas(replicas).
6✔
769
                        WithPodManagementPolicy(appsv1.ParallelPodManagement).
6✔
770
                        WithTemplate(&podTemplateApplyConfiguration).
6✔
771
                        WithUpdateStrategy(appsapplyv1.StatefulSetUpdateStrategy().WithType(appsv1.StatefulSetUpdateStrategyType(lws.Spec.RolloutStrategy.Type)).WithRollingUpdate(
6✔
772
                                appsapplyv1.RollingUpdateStatefulSetStrategy().WithMaxUnavailable(lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxUnavailable).WithPartition(partition),
6✔
773
                        )).
6✔
774
                        WithSelector(metaapplyv1.LabelSelector().
6✔
775
                                WithMatchLabels(map[string]string{
6✔
776
                                        leaderworkerset.SetNameLabelKey:     lws.Name,
6✔
777
                                        leaderworkerset.WorkerIndexLabelKey: "0",
6✔
778
                                }))).
6✔
779
                WithLabels(map[string]string{
6✔
780
                        leaderworkerset.SetNameLabelKey: lws.Name,
6✔
781
                        leaderworkerset.RevisionKey:     revisionKey,
6✔
782
                }).
6✔
783
                WithAnnotations(map[string]string{
6✔
784
                        leaderworkerset.ReplicasAnnotationKey: strconv.Itoa(int(*lws.Spec.Replicas)),
6✔
785
                })
6✔
786

6✔
787
        pvcApplyConfiguration := controllerutils.GetPVCApplyConfiguration(lws)
6✔
788
        if len(pvcApplyConfiguration) > 0 {
7✔
789
                statefulSetConfig.Spec.WithVolumeClaimTemplates(pvcApplyConfiguration...)
1✔
790
        }
1✔
791

792
        if lws.Spec.LeaderWorkerTemplate.PersistentVolumeClaimRetentionPolicy != nil {
7✔
793
                pvcRetentionPolicy := &appsapplyv1.StatefulSetPersistentVolumeClaimRetentionPolicyApplyConfiguration{
1✔
794
                        WhenDeleted: &lws.Spec.LeaderWorkerTemplate.PersistentVolumeClaimRetentionPolicy.WhenDeleted,
1✔
795
                        WhenScaled:  &lws.Spec.LeaderWorkerTemplate.PersistentVolumeClaimRetentionPolicy.WhenScaled,
1✔
796
                }
1✔
797
                statefulSetConfig.Spec.WithPersistentVolumeClaimRetentionPolicy(pvcRetentionPolicy)
1✔
798
        }
1✔
799
        return statefulSetConfig, nil
6✔
800
}
801

802
func makeCondition(conditionType leaderworkerset.LeaderWorkerSetConditionType) metav1.Condition {
×
803
        var condtype, reason, message string
×
804
        switch conditionType {
×
805
        case leaderworkerset.LeaderWorkerSetAvailable:
×
806
                condtype = string(leaderworkerset.LeaderWorkerSetAvailable)
×
807
                reason = "AllGroupsReady"
×
808
                message = "All replicas are ready"
×
809
        case leaderworkerset.LeaderWorkerSetUpdateInProgress:
×
810
                condtype = string(leaderworkerset.LeaderWorkerSetUpdateInProgress)
×
811
                reason = GroupsUpdating
×
812
                message = "Rolling Upgrade is in progress"
×
UNCOV
813
        default:
×
UNCOV
814
                condtype = string(leaderworkerset.LeaderWorkerSetProgressing)
×
815
                reason = GroupsProgressing
×
816
                message = "Replicas are progressing"
×
817
        }
818

819
        condition := metav1.Condition{
×
820
                Type:               condtype,
×
821
                Status:             metav1.ConditionStatus(corev1.ConditionTrue),
×
822
                LastTransitionTime: metav1.Now(),
×
UNCOV
823
                Reason:             reason,
×
UNCOV
824
                Message:            message,
×
825
        }
×
826
        return condition
×
827
}
828

829
func setConditions(lws *leaderworkerset.LeaderWorkerSet, conditions []metav1.Condition) bool {
×
UNCOV
830
        shouldUpdate := false
×
831
        for _, condition := range conditions {
×
UNCOV
832
                shouldUpdate = shouldUpdate || setCondition(lws, condition)
×
UNCOV
833
        }
×
834

UNCOV
835
        return shouldUpdate
×
836
}
837

838
func setCondition(lws *leaderworkerset.LeaderWorkerSet, newCondition metav1.Condition) bool {
6✔
839
        newCondition.LastTransitionTime = metav1.Now()
6✔
840
        found := false
6✔
841
        shouldUpdate := false
6✔
842

6✔
843
        // Precondition: newCondition has status true.
6✔
844
        for i, curCondition := range lws.Status.Conditions {
11✔
845
                if newCondition.Type == curCondition.Type {
7✔
846
                        if newCondition.Status != curCondition.Status {
3✔
847
                                // the conditions match but one is true and one is false. Update the stored condition
1✔
848
                                // with the new condition.
1✔
849
                                lws.Status.Conditions[i] = newCondition
1✔
850
                                shouldUpdate = true
1✔
851
                        }
1✔
852
                        // if both are true or both are false, do nothing.
853
                        found = true
2✔
854
                } else {
3✔
855
                        // if the conditions are not of the same type, do nothing unless one is Progressing and one is
3✔
856
                        // Available and both are true. Must be mutually exclusive.
3✔
857
                        if exclusiveConditionTypes(curCondition, newCondition) &&
3✔
858
                                (newCondition.Status == metav1.ConditionTrue) && (curCondition.Status == metav1.ConditionTrue) {
4✔
859
                                // Progressing is true and Available is true. Prevent this.
1✔
860
                                lws.Status.Conditions[i].Status = metav1.ConditionFalse
1✔
861
                                shouldUpdate = true
1✔
862
                        }
1✔
863
                }
864
        }
865
        // condition doesn't exist, update only if the status is true
866
        if newCondition.Status == metav1.ConditionTrue && !found {
9✔
867
                lws.Status.Conditions = append(lws.Status.Conditions, newCondition)
3✔
868
                shouldUpdate = true
3✔
869
        }
3✔
870
        return shouldUpdate
6✔
871
}
872

873
func exclusiveConditionTypes(condition1 metav1.Condition, condition2 metav1.Condition) bool {
10✔
874
        if (condition1.Type == string(leaderworkerset.LeaderWorkerSetAvailable) && condition2.Type == string(leaderworkerset.LeaderWorkerSetProgressing)) ||
10✔
875
                (condition1.Type == string(leaderworkerset.LeaderWorkerSetProgressing) && condition2.Type == string(leaderworkerset.LeaderWorkerSetAvailable)) {
15✔
876
                return true
5✔
877
        }
5✔
878

879
        if (condition1.Type == string(leaderworkerset.LeaderWorkerSetAvailable) && condition2.Type == string(leaderworkerset.LeaderWorkerSetUpdateInProgress)) ||
5✔
880
                (condition1.Type == string(leaderworkerset.LeaderWorkerSetUpdateInProgress) && condition2.Type == string(leaderworkerset.LeaderWorkerSetAvailable)) {
7✔
881
                return true
2✔
882
        }
2✔
883

884
        return false
3✔
885
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc