• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / lws / 26164378256

20 May 2026 01:03PM UTC coverage: 44.386% (-0.03%) from 44.419%
26164378256

Pull #851

github

Mostafahassen1
feat(lws): improve feedback for malformed LWS resources
Pull Request #851: fix: surface error message in FailedCreate events for leader and worker statefulsets

0 of 4 new or added lines in 2 files covered. (0.0%)

80 existing lines in 1 file now uncovered.

1182 of 2663 relevant lines covered (44.39%)

2.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.0
/pkg/controllers/pod_controller.go
1
/*
2
Copyright 2023.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controllers
18

19
import (
20
        "context"
21
        "errors"
22
        "fmt"
23
        "strconv"
24
        "time"
25

26
        appsv1 "k8s.io/api/apps/v1"
27
        corev1 "k8s.io/api/core/v1"
28
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
        "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        appsapplyv1 "k8s.io/client-go/applyconfigurations/apps/v1"
33
        coreapplyv1 "k8s.io/client-go/applyconfigurations/core/v1"
34
        metaapplyv1 "k8s.io/client-go/applyconfigurations/meta/v1"
35
        "k8s.io/client-go/tools/events"
36
        "k8s.io/klog/v2"
37
        ctrl "sigs.k8s.io/controller-runtime"
38
        "sigs.k8s.io/controller-runtime/pkg/client"
39
        "sigs.k8s.io/controller-runtime/pkg/client/apiutil"
40
        "sigs.k8s.io/controller-runtime/pkg/predicate"
41

42
        leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1"
43
        "sigs.k8s.io/lws/pkg/schedulerprovider"
44
        acceleratorutils "sigs.k8s.io/lws/pkg/utils/accelerators"
45
        controllerutils "sigs.k8s.io/lws/pkg/utils/controller"
46
        podutils "sigs.k8s.io/lws/pkg/utils/pod"
47
        revisionutils "sigs.k8s.io/lws/pkg/utils/revision"
48
        statefulsetutils "sigs.k8s.io/lws/pkg/utils/statefulset"
49
)
50

51
// PodReconciler reconciles a LeaderWorkerSet object
52
type PodReconciler struct {
53
        client.Client
54
        Scheme            *runtime.Scheme
55
        Record            events.EventRecorder
56
        SchedulerProvider schedulerprovider.SchedulerProvider
57
}
58

59
func NewPodReconciler(client client.Client, schema *runtime.Scheme, record events.EventRecorder, sp schedulerprovider.SchedulerProvider) *PodReconciler {
×
60
        return &PodReconciler{Client: client, Scheme: schema, Record: record, SchedulerProvider: sp}
×
61
}
×
62

63
//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch
64
//+kubebuilder:rbac:groups=events.k8s.io,resources=events,verbs=create;watch;update;patch
65
//+kubebuilder:rbac:groups=core,resources=pods,verbs=create;delete;get;list;patch;update;watch
66
//+kubebuilder:rbac:groups=core,resources=pods/finalizers,verbs=update
67
//+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch;update;patch
68

69
func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
×
70
        var pod corev1.Pod
×
71
        if err := r.Get(ctx, types.NamespacedName{Name: req.Name, Namespace: req.Namespace}, &pod); err != nil {
×
72
                return ctrl.Result{}, client.IgnoreNotFound(err)
×
73
        }
×
74
        log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(&pod))
×
75
        ctx = ctrl.LoggerInto(ctx, log)
×
76

×
77
        // get the leaderWorkerSet name
×
78
        lwsName := pod.Labels[leaderworkerset.SetNameLabelKey]
×
79
        if lwsName == "" {
×
80
                return ctrl.Result{}, errors.New("leaderworkerset.sigs.k8s.io/name label is unexpected missing")
×
81
        }
×
82
        if _, exist := pod.Labels[leaderworkerset.WorkerIndexLabelKey]; !exist {
×
83
                return ctrl.Result{}, errors.New("leaderworkerset.sigs.k8s.io/worker-index label is unexpected missing")
×
84
        }
×
85
        // get the leaderWorkerSet object
86
        var leaderWorkerSet leaderworkerset.LeaderWorkerSet
×
87
        if err := r.Get(ctx, types.NamespacedName{Name: lwsName, Namespace: pod.Namespace}, &leaderWorkerSet); err != nil {
×
88
                // If lws not found, it's mostly because deleted, ignore the error as Pods will be GCed finally.
×
89
                return ctrl.Result{}, client.IgnoreNotFound(err)
×
90
        }
×
91
        leaderDeleted, err := r.handleRestartPolicy(ctx, pod, leaderWorkerSet)
×
92
        if err != nil {
×
93
                return ctrl.Result{}, err
×
94
        }
×
95
        if leaderDeleted {
×
96
                return ctrl.Result{}, nil
×
97
        }
×
98

99
        // worker pods' reconciliation is only done to handle restart policy
100
        if !podutils.LeaderPod(pod) {
×
101
                return ctrl.Result{}, nil
×
102
        }
×
103

104
        // validate leader's annotations to prevent infinite StatefulSet creation loops
105
        // see issue: https://github.com/kubernetes-sigs/lws/issues/391
106
        if pod.Annotations[leaderworkerset.LeaderPodNameAnnotationKey] != "" {
×
107
                errMsg := fmt.Sprintf("leader pod %s/%s contains mistake annotation '%s': requires Kubernetes ≥v1.27 or v1.26 with StatefulSetStartOrdinal feature",
×
108
                        pod.Namespace,
×
109
                        pod.Name,
×
110
                        leaderworkerset.LeaderPodNameAnnotationKey)
×
111
                log.Error(errors.New(errMsg), "validate leader's annotations")
×
112
                r.Record.Eventf(&leaderWorkerSet, &pod, corev1.EventTypeWarning, FailedCreate, Create, errMsg)
×
113
                return ctrl.Result{}, nil
×
114
        }
×
115

116
        if leaderWorkerSet.Spec.NetworkConfig != nil && *leaderWorkerSet.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainUniquePerReplica {
×
117
                if err := controllerutils.CreateHeadlessServiceIfNotExists(ctx, r.Client, r.Scheme, &leaderWorkerSet, pod.Name, map[string]string{leaderworkerset.SetNameLabelKey: leaderWorkerSet.Name, leaderworkerset.GroupIndexLabelKey: pod.Labels[leaderworkerset.GroupIndexLabelKey]}, &pod); err != nil {
×
118
                        return ctrl.Result{}, err
×
119
                }
×
120
        }
121

122
        // if it's not leader pod or leader pod is being deleted, we should not create the worker statefulset
123
        // this is critical to avoid race condition in all-or-nothing restart where the worker sts may be created
124
        // when the leader pod is being deleted
125
        if pod.DeletionTimestamp != nil {
×
126
                log.V(2).Info("skip creating the worker sts since the leader pod is being deleted")
×
127
                return ctrl.Result{}, nil
×
128
        }
×
129

130
        if r.SchedulerProvider != nil {
×
131
                err = r.SchedulerProvider.CreatePodGroupIfNotExists(ctx, &leaderWorkerSet, &pod)
×
132
                if err != nil {
×
133
                        return ctrl.Result{}, err
×
134
                }
×
135
        }
136

137
        // Once size = 1, no need to create worker statefulSets.
138
        if *leaderWorkerSet.Spec.LeaderWorkerTemplate.Size == 1 {
×
139
                return ctrl.Result{}, nil
×
140
        }
×
141

142
        // logic for handling leader pod
143
        if leaderWorkerSet.Spec.StartupPolicy == leaderworkerset.LeaderReadyStartupPolicy && !podutils.IsPodReady(&pod) {
×
144
                log.V(2).Info("defer the creation of the worker statefulset because leader pod is not ready.")
×
145
                return ctrl.Result{}, nil
×
146
        }
×
147
        revision, err := revisionutils.GetRevision(ctx, r.Client, &leaderWorkerSet, revisionutils.GetRevisionKey(&pod))
×
148
        if err != nil {
×
149
                log.Error(err, "Getting lws revisions")
×
150
                return ctrl.Result{}, err
×
151
        }
×
152
        if revision == nil {
×
153
                log.V(2).Info(fmt.Sprintf("Revision has not been created yet, requeing reconciler for pod %s", pod.Name))
×
154
                return ctrl.Result{Requeue: true, RequeueAfter: time.Second}, nil
×
155
        }
×
156
        statefulSet, err := constructWorkerStatefulSetApplyConfiguration(pod, leaderWorkerSet, revision)
×
157
        if err != nil {
×
158
                return ctrl.Result{}, err
×
159
        }
×
160

161
        // if exclusive placement is enabled but leader pod is not scheduled, don't create the worker sts
162
        if topologyKey, found := leaderWorkerSet.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey]; found {
×
163
                // check if the leader pod is scheduled.
×
164
                if pod.Spec.NodeName == "" {
×
165
                        log.V(2).Info(fmt.Sprintf("Pod %q is not scheduled yet", pod.Name))
×
166
                        return ctrl.Result{}, nil
×
167
                }
×
168
                if err := r.setNodeSelectorForWorkerPods(ctx, &pod, statefulSet, topologyKey); err != nil {
×
169
                        log.Error(err, "setting node selector for worker pods")
×
170
                        return ctrl.Result{}, err
×
171
                }
×
172
        }
173

174
        if err := setControllerReferenceWithStatefulSet(&pod, statefulSet, r.Scheme); err != nil {
×
175
                log.Error(err, "Setting controller reference.")
×
176
                return ctrl.Result{}, nil
×
177
        }
×
178

179
        obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(statefulSet)
×
180
        if err != nil {
×
181
                return ctrl.Result{}, err
×
182
        }
×
183
        workerStatefulSet := &unstructured.Unstructured{
×
184
                Object: obj,
×
185
        }
×
186

×
187
        var workerSts appsv1.StatefulSet
×
188
        if err := r.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: leaderWorkerSet.Namespace}, &workerSts); err != nil {
×
189
                if client.IgnoreNotFound(err) != nil {
×
190
                        return ctrl.Result{}, err
×
191
                }
×
192
                if err = r.Create(ctx, workerStatefulSet); err != nil {
×
193
                        if client.IgnoreAlreadyExists(err) != nil {
×
NEW
194
                                r.Record.Eventf(&leaderWorkerSet, &pod, corev1.EventTypeWarning, FailedCreate, Create, fmt.Sprintf("Failed to create worker statefulset for leader pod %s: %v", pod.Name, err))
×
195
                        }
×
196
                        return ctrl.Result{}, client.IgnoreAlreadyExists(err)
×
197
                }
198
                r.Record.Eventf(&leaderWorkerSet, &pod, corev1.EventTypeNormal, GroupsProgressing, Create, fmt.Sprintf("Created worker statefulset for leader pod %s", pod.Name))
×
199
        }
200
        log.V(2).Info("Worker Reconcile completed.")
×
201
        return ctrl.Result{}, nil
×
202
}
203

204
func (r *PodReconciler) handleRestartPolicy(ctx context.Context, pod corev1.Pod, leaderWorkerSet leaderworkerset.LeaderWorkerSet) (bool, error) {
2✔
205
        log := ctrl.LoggerFrom(ctx)
2✔
206
        policy := leaderWorkerSet.Spec.LeaderWorkerTemplate.RestartPolicy
2✔
207
        if policy != leaderworkerset.RecreateGroupOnPodRestart && policy != leaderworkerset.RecreateGroupAfterStart {
2✔
208
                return false, nil
×
209
        }
×
210
        // the leader pod will be deleted if the worker pod is deleted or any container was restarted
211
        if !podutils.ContainerRestarted(pod) && !podutils.PodDeleted(pod) {
2✔
212
                return false, nil
×
213
        }
×
214

215
        pendingPods, err := r.pendingPodsInGroup(ctx, pod, int(*leaderWorkerSet.Spec.LeaderWorkerTemplate.Size))
2✔
216
        if err != nil {
2✔
217
                return false, err
×
218
        }
×
219

220
        _, hasRecreateGroupAfterStartAnnotation := leaderWorkerSet.Annotations[leaderworkerset.RecreateGroupAfterStartAnnotationKey]
2✔
221

2✔
222
        if pendingPods && (policy == leaderworkerset.RecreateGroupAfterStart || hasRecreateGroupAfterStartAnnotation) {
2✔
223
                log.V(2).Info(fmt.Sprintf("Skipping group recreation because there is a pod pending: %s", pod.Name))
×
224
                return false, nil
×
225
        }
×
226

227
        var leader corev1.Pod
2✔
228
        if !podutils.LeaderPod(pod) {
4✔
229
                leaderPodName, ordinal := statefulsetutils.GetParentNameAndOrdinal(pod.Name)
2✔
230
                if ordinal == -1 {
2✔
231
                        return false, fmt.Errorf("parsing pod name for pod %s", pod.Name)
×
232
                }
×
233
                if err := r.Get(ctx, types.NamespacedName{Name: leaderPodName, Namespace: pod.Namespace}, &leader); err != nil {
2✔
234
                        // If the error is not found, it is likely caused by the fact that the leader was deleted but the worker statefulset
×
235
                        // deletion hasn't deleted all the worker pods
×
236
                        return false, client.IgnoreNotFound(err)
×
237
                }
×
238
                // Different revision key means that this pod will be deleted soon and alternative will be created with the matching key
239
                if revisionutils.GetRevisionKey(&leader) != revisionutils.GetRevisionKey(&pod) {
2✔
240
                        return false, nil
×
241
                }
×
242
                // Ignore worker pods from a stale worker StatefulSet (or test-owned direct pod) so
243
                // background deletion of the previous group does not recreate the replacement leader again.
244
                currentGroupWorkerPod, err := r.workerPodBelongsToLeader(ctx, pod, leader)
2✔
245
                if err != nil {
2✔
246
                        return false, err
×
247
                }
×
248
                if !currentGroupWorkerPod {
3✔
249
                        return false, nil
1✔
250
                }
1✔
251
        } else {
×
252
                leader = pod
×
253
        }
×
254
        // if the leader pod is being deleted, we don't need to send deletion requests
255
        if leader.DeletionTimestamp != nil {
1✔
256
                return true, nil
×
257
        }
×
258
        deletionOpt := metav1.DeletePropagationForeground
1✔
259
        if err := r.Delete(ctx, &leader, &client.DeleteOptions{
1✔
260
                PropagationPolicy: &deletionOpt,
1✔
261
        }); err != nil {
1✔
262
                return false, err
×
263
        }
×
264
        r.Record.Eventf(&leaderWorkerSet, &leader, corev1.EventTypeNormal, "RecreateGroup", Delete, fmt.Sprintf("Worker pod %s failed, deleted leader pod %s to recreate group %s", pod.Name, leader.Name, leader.Labels[leaderworkerset.GroupIndexLabelKey]))
1✔
265
        return true, nil
1✔
266
}
267

268
func (r *PodReconciler) workerPodBelongsToLeader(ctx context.Context, pod corev1.Pod, leader corev1.Pod) (bool, error) {
2✔
269
        owner := metav1.GetControllerOf(&pod)
2✔
270
        if owner == nil {
2✔
271
                return false, nil
×
272
        }
×
273

274
        if owner.Kind == "Pod" {
2✔
275
                return owner.Name == leader.Name && owner.UID == leader.UID, nil
×
276
        }
×
277

278
        if owner.Kind != "StatefulSet" {
2✔
279
                return false, nil
×
280
        }
×
281

282
        var workerSts appsv1.StatefulSet
2✔
283
        if err := r.Get(ctx, types.NamespacedName{Name: owner.Name, Namespace: pod.Namespace}, &workerSts); err != nil {
2✔
284
                return false, client.IgnoreNotFound(err)
×
285
        }
×
286
        if workerSts.UID != owner.UID {
3✔
287
                return false, nil
1✔
288
        }
1✔
289

290
        stsOwner := metav1.GetControllerOf(&workerSts)
1✔
291
        if stsOwner == nil {
1✔
292
                return false, nil
×
293
        }
×
294
        return stsOwner.Kind == "Pod" && stsOwner.Name == leader.Name && stsOwner.UID == leader.UID, nil
1✔
295
}
296

297
func (r *PodReconciler) setNodeSelectorForWorkerPods(ctx context.Context, pod *corev1.Pod, sts *appsapplyv1.StatefulSetApplyConfiguration, topologyKey string) error {
×
298

×
299
        log := ctrl.LoggerFrom(ctx)
×
300
        topologyValue, err := r.topologyValueFromPod(ctx, pod, topologyKey)
×
301
        if err != nil {
×
302
                log.Error(err, "getting topology from leader pod")
×
303
                return err
×
304
        }
×
305

306
        // set node selector for worker pods, if worker pods already scheduled to different topology value
307
        // the following applying logic will automatically update it to match the leader pods, so we don't
308
        // need to verify if they have the same topology value
309
        sts.Spec.Template.Spec.WithNodeSelector(map[string]string{
×
310
                topologyKey: topologyValue,
×
311
        })
×
312
        return nil
×
313
}
314

315
func (r *PodReconciler) topologyValueFromPod(ctx context.Context, pod *corev1.Pod, topologyKey string) (string, error) {
×
316
        log := ctrl.LoggerFrom(ctx)
×
317

×
318
        nodeName := pod.Spec.NodeName
×
319
        ns := pod.Namespace
×
320

×
321
        // Get node the leader pod is running on.
×
322
        var node corev1.Node
×
323
        if err := r.Get(ctx, types.NamespacedName{Name: nodeName, Namespace: ns}, &node); err != nil {
×
324
                // We'll ignore not-found errors, since there is nothing we can do here.
×
325
                // A node may not exist temporarily due to a maintenance event or other scenarios.
×
326
                log.Error(err, fmt.Sprintf("getting node %s", nodeName))
×
327
                return "", client.IgnoreNotFound(err)
×
328
        }
×
329

330
        // Get topology (e.g. node pool name) from node labels.
331
        topology, exists := node.Labels[topologyKey]
×
332
        if !exists {
×
333
                return "", fmt.Errorf("node does not have topology label: %s", topology)
×
334
        }
×
335
        return topology, nil
×
336
}
337

338
func (r *PodReconciler) pendingPodsInGroup(ctx context.Context, pod corev1.Pod, groupSize int) (bool, error) {
2✔
339
        groupIndex := pod.Labels[leaderworkerset.GroupIndexLabelKey]
2✔
340
        lwsName := pod.Labels[leaderworkerset.SetNameLabelKey]
2✔
341

2✔
342
        podSelector := client.MatchingLabels(map[string]string{
2✔
343
                leaderworkerset.SetNameLabelKey:    lwsName,
2✔
344
                leaderworkerset.GroupIndexLabelKey: groupIndex,
2✔
345
        })
2✔
346

2✔
347
        var podList corev1.PodList
2✔
348
        if err := r.List(ctx, &podList, podSelector, client.InNamespace(pod.Namespace)); err != nil {
2✔
349
                return false, err
×
350
        }
×
351

352
        if groupSize != len(podList.Items) {
2✔
353
                return true, nil
×
354
        }
×
355

356
        for _, groupPod := range podList.Items {
6✔
357
                if groupPod.Status.Phase == corev1.PodPending {
4✔
358
                        return true, nil
×
359
                }
×
360
        }
361
        return false, nil
2✔
362
}
363

364
// setControllerReferenceWithStatefulSet set controller reference for the StatefulSet
365
func setControllerReferenceWithStatefulSet(owner metav1.Object, sts *appsapplyv1.StatefulSetApplyConfiguration, scheme *runtime.Scheme) error {
×
366
        // Validate the owner.
×
367
        ro, ok := owner.(runtime.Object)
×
368
        if !ok {
×
369
                return fmt.Errorf("%T is not a runtime.Object, cannot call SetOwnerReference", owner)
×
370
        }
×
371
        gvk, err := apiutil.GVKForObject(ro, scheme)
×
372
        if err != nil {
×
373
                return err
×
374
        }
×
375
        sts.WithOwnerReferences(metaapplyv1.OwnerReference().
×
376
                WithAPIVersion(gvk.GroupVersion().String()).
×
377
                WithKind(gvk.Kind).
×
378
                WithName(owner.GetName()).
×
379
                WithUID(owner.GetUID()).
×
380
                WithBlockOwnerDeletion(true).
×
381
                WithController(true))
×
382
        return nil
×
383
}
384

385
// constructWorkerStatefulSetApplyConfiguration constructs the applied configuration for the leader StatefulSet
386
func constructWorkerStatefulSetApplyConfiguration(leaderPod corev1.Pod, lws leaderworkerset.LeaderWorkerSet, currentRevision *appsv1.ControllerRevision) (*appsapplyv1.StatefulSetApplyConfiguration, error) {
4✔
387
        currentLws, err := revisionutils.ApplyRevision(&lws, currentRevision)
4✔
388
        if err != nil {
4✔
389
                return nil, err
×
390
        }
×
391
        podTemplateSpec := *currentLws.Spec.LeaderWorkerTemplate.WorkerTemplate.DeepCopy()
4✔
392
        // construct pod template spec configuration
4✔
393
        obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&podTemplateSpec)
4✔
394
        if err != nil {
4✔
395
                return nil, err
×
396
        }
×
397
        var podTemplateApplyConfiguration coreapplyv1.PodTemplateSpecApplyConfiguration
4✔
398
        err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj, &podTemplateApplyConfiguration)
4✔
399
        if err != nil {
4✔
400
                return nil, err
×
401
        }
×
402
        selectorMap := map[string]string{
4✔
403
                leaderworkerset.GroupIndexLabelKey:      leaderPod.Labels[leaderworkerset.GroupIndexLabelKey],
4✔
404
                leaderworkerset.SetNameLabelKey:         lws.Name,
4✔
405
                leaderworkerset.GroupUniqueHashLabelKey: leaderPod.Labels[leaderworkerset.GroupUniqueHashLabelKey],
4✔
406
        }
4✔
407
        labelMap := map[string]string{
4✔
408
                leaderworkerset.GroupIndexLabelKey:      leaderPod.Labels[leaderworkerset.GroupIndexLabelKey],
4✔
409
                leaderworkerset.SetNameLabelKey:         lws.Name,
4✔
410
                leaderworkerset.GroupUniqueHashLabelKey: leaderPod.Labels[leaderworkerset.GroupUniqueHashLabelKey],
4✔
411
                leaderworkerset.RevisionKey:             revisionutils.GetRevisionKey(&leaderPod),
4✔
412
        }
4✔
413

4✔
414
        podTemplateApplyConfiguration.WithLabels(labelMap)
4✔
415
        podAnnotations := make(map[string]string)
4✔
416
        podAnnotations[leaderworkerset.SizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.Size))
4✔
417
        podAnnotations[leaderworkerset.LeaderPodNameAnnotationKey] = leaderPod.Name
4✔
418
        if lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey] != "" {
5✔
419
                podAnnotations[leaderworkerset.ExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey]
1✔
420
        }
1✔
421
        if lws.Spec.LeaderWorkerTemplate.SubGroupPolicy != nil {
5✔
422
                podAnnotations[leaderworkerset.SubGroupSizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.SubGroupSize))
1✔
423
                if lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] != "" {
2✔
424
                        podAnnotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey]
1✔
425
                }
1✔
426
        }
427
        acceleratorutils.AddTPUAnnotations(leaderPod, podAnnotations)
4✔
428
        podTemplateApplyConfiguration.WithAnnotations(podAnnotations)
4✔
429
        serviceName := leaderPod.Name
4✔
430
        if lws.Spec.NetworkConfig == nil || *lws.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainShared {
8✔
431
                serviceName = lws.Name
4✔
432
        }
4✔
433
        // construct statefulset apply configuration
434
        statefulSetConfig := appsapplyv1.StatefulSet(leaderPod.Name, leaderPod.Namespace).
4✔
435
                WithSpec(appsapplyv1.StatefulSetSpec().
4✔
436
                        WithServiceName(serviceName).
4✔
437
                        WithReplicas(*lws.Spec.LeaderWorkerTemplate.Size - 1).
4✔
438
                        WithPodManagementPolicy(appsv1.ParallelPodManagement).
4✔
439
                        WithTemplate(&podTemplateApplyConfiguration).
4✔
440
                        WithOrdinals(appsapplyv1.StatefulSetOrdinals().WithStart(1)).
4✔
441
                        WithSelector(metaapplyv1.LabelSelector().
4✔
442
                                WithMatchLabels(selectorMap))).
4✔
443
                WithLabels(labelMap)
4✔
444

4✔
445
        pvcApplyConfiguration := controllerutils.GetPVCApplyConfiguration(&lws)
4✔
446
        if len(pvcApplyConfiguration) > 0 {
5✔
447
                statefulSetConfig.Spec.WithVolumeClaimTemplates(pvcApplyConfiguration...)
1✔
448
        }
1✔
449

450
        if lws.Spec.LeaderWorkerTemplate.PersistentVolumeClaimRetentionPolicy != nil {
5✔
451
                pvcRetentionPolicy := &appsapplyv1.StatefulSetPersistentVolumeClaimRetentionPolicyApplyConfiguration{
1✔
452
                        WhenDeleted: &lws.Spec.LeaderWorkerTemplate.PersistentVolumeClaimRetentionPolicy.WhenDeleted,
1✔
453
                        WhenScaled:  &lws.Spec.LeaderWorkerTemplate.PersistentVolumeClaimRetentionPolicy.WhenScaled,
1✔
454
                }
1✔
455
                statefulSetConfig.Spec.WithPersistentVolumeClaimRetentionPolicy(pvcRetentionPolicy)
1✔
456
        }
1✔
457
        return statefulSetConfig, nil
4✔
458
}
459

460
func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
×
461
        return ctrl.NewControllerManagedBy(mgr).
×
462
                For(&corev1.Pod{}).
×
463
                WithEventFilter(predicate.NewPredicateFuncs(func(object client.Object) bool {
×
464
                        if pod, ok := object.(*corev1.Pod); ok {
×
465
                                _, exist := pod.Labels[leaderworkerset.SetNameLabelKey]
×
466
                                return exist
×
467
                        }
×
468
                        if statefulSet, ok := object.(*appsv1.StatefulSet); ok {
×
469
                                _, exist := statefulSet.Labels[leaderworkerset.SetNameLabelKey]
×
470
                                return exist
×
471
                        }
×
472
                        return false
×
473
                })).Owns(&appsv1.StatefulSet{}).Complete(r)
474
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc