• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / lws / 17902435856

22 Sep 2025 02:06AM UTC coverage: 35.096% (+1.3%) from 33.767%
17902435856

Pull #649

github

andyzhangx
fix comments
Pull Request #649: doc: add volumeClaimTemplates example

803 of 2288 relevant lines covered (35.1%)

2.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

22.83
/pkg/controllers/pod_controller.go
1
/*
2
Copyright 2023.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package controllers
18

19
import (
20
        "context"
21
        "errors"
22
        "fmt"
23
        "strconv"
24
        "time"
25

26
        appsv1 "k8s.io/api/apps/v1"
27
        corev1 "k8s.io/api/core/v1"
28
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
        "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
30
        "k8s.io/apimachinery/pkg/runtime"
31
        "k8s.io/apimachinery/pkg/types"
32
        appsapplyv1 "k8s.io/client-go/applyconfigurations/apps/v1"
33
        coreapplyv1 "k8s.io/client-go/applyconfigurations/core/v1"
34
        metaapplyv1 "k8s.io/client-go/applyconfigurations/meta/v1"
35
        "k8s.io/client-go/tools/record"
36
        "k8s.io/klog/v2"
37
        ctrl "sigs.k8s.io/controller-runtime"
38
        "sigs.k8s.io/controller-runtime/pkg/client"
39
        "sigs.k8s.io/controller-runtime/pkg/client/apiutil"
40
        "sigs.k8s.io/controller-runtime/pkg/predicate"
41

42
        leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1"
43
        "sigs.k8s.io/lws/pkg/schedulerprovider"
44
        acceleratorutils "sigs.k8s.io/lws/pkg/utils/accelerators"
45
        controllerutils "sigs.k8s.io/lws/pkg/utils/controller"
46
        podutils "sigs.k8s.io/lws/pkg/utils/pod"
47
        revisionutils "sigs.k8s.io/lws/pkg/utils/revision"
48
        statefulsetutils "sigs.k8s.io/lws/pkg/utils/statefulset"
49
)
50

51
// PodReconciler reconciles a LeaderWorkerSet object
52
type PodReconciler struct {
53
        client.Client
54
        Scheme            *runtime.Scheme
55
        Record            record.EventRecorder
56
        SchedulerProvider schedulerprovider.SchedulerProvider
57
}
58

59
func NewPodReconciler(client client.Client, schema *runtime.Scheme, record record.EventRecorder, sp schedulerprovider.SchedulerProvider) *PodReconciler {
×
60
        return &PodReconciler{Client: client, Scheme: schema, Record: record, SchedulerProvider: sp}
×
61
}
×
62

63
//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch
64
//+kubebuilder:rbac:groups=core,resources=pods,verbs=create;delete;get;list;patch;update;watch
65
//+kubebuilder:rbac:groups=core,resources=pods/finalizers,verbs=update
66
//+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch;update;patch
67

68
func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
×
69
        var pod corev1.Pod
×
70
        if err := r.Get(ctx, types.NamespacedName{Name: req.Name, Namespace: req.Namespace}, &pod); err != nil {
×
71
                return ctrl.Result{}, client.IgnoreNotFound(err)
×
72
        }
×
73
        log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(&pod))
×
74
        ctx = ctrl.LoggerInto(ctx, log)
×
75

×
76
        // get the leaderWorkerSet name
×
77
        lwsName := pod.Labels[leaderworkerset.SetNameLabelKey]
×
78
        if lwsName == "" {
×
79
                return ctrl.Result{}, errors.New("leaderworkerset.sigs.k8s.io/name label is unexpected missing")
×
80
        }
×
81
        if _, exist := pod.Labels[leaderworkerset.WorkerIndexLabelKey]; !exist {
×
82
                return ctrl.Result{}, errors.New("leaderworkerset.sigs.k8s.io/worker-index label is unexpected missing")
×
83
        }
×
84
        // get the leaderWorkerSet object
85
        var leaderWorkerSet leaderworkerset.LeaderWorkerSet
×
86
        if err := r.Get(ctx, types.NamespacedName{Name: lwsName, Namespace: pod.Namespace}, &leaderWorkerSet); err != nil {
×
87
                // If lws not found, it's mostly because deleted, ignore the error as Pods will be GCed finally.
×
88
                return ctrl.Result{}, client.IgnoreNotFound(err)
×
89
        }
×
90
        leaderDeleted, err := r.handleRestartPolicy(ctx, pod, leaderWorkerSet)
×
91
        if err != nil {
×
92
                return ctrl.Result{}, err
×
93
        }
×
94
        if leaderDeleted {
×
95
                return ctrl.Result{}, nil
×
96
        }
×
97

98
        // worker pods' reconciliation is only done to handle restart policy
99
        if !podutils.LeaderPod(pod) {
×
100
                return ctrl.Result{}, nil
×
101
        }
×
102

103
        // validate leader's annotations to prevent infinite StatefulSet creation loops
104
        // see issue: https://github.com/kubernetes-sigs/lws/issues/391
105
        if pod.Annotations[leaderworkerset.LeaderPodNameAnnotationKey] != "" {
×
106
                errMsg := fmt.Sprintf("leader pod %s/%s contains mistake annotation '%s': requires Kubernetes ≥v1.27 or v1.26 with StatefulSetStartOrdinal feature",
×
107
                        pod.Namespace,
×
108
                        pod.Name,
×
109
                        leaderworkerset.LeaderPodNameAnnotationKey)
×
110
                log.Error(errors.New(errMsg), "validate leader's annotations")
×
111
                r.Record.Eventf(&leaderWorkerSet, corev1.EventTypeWarning, FailedCreate, errMsg)
×
112
                return ctrl.Result{}, nil
×
113
        }
×
114

115
        if leaderWorkerSet.Spec.NetworkConfig != nil && *leaderWorkerSet.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainUniquePerReplica {
×
116
                if err := controllerutils.CreateHeadlessServiceIfNotExists(ctx, r.Client, r.Scheme, &leaderWorkerSet, pod.Name, map[string]string{leaderworkerset.SetNameLabelKey: leaderWorkerSet.Name, leaderworkerset.GroupIndexLabelKey: pod.Labels[leaderworkerset.GroupIndexLabelKey]}, &pod); err != nil {
×
117
                        return ctrl.Result{}, err
×
118
                }
×
119
        }
120

121
        // if it's not leader pod or leader pod is being deleted, we should not create the worker statefulset
122
        // this is critical to avoid race condition in all-or-nothing restart where the worker sts may be created
123
        // when the leader pod is being deleted
124
        if pod.DeletionTimestamp != nil {
×
125
                log.V(2).Info("skip creating the worker sts since the leader pod is being deleted")
×
126
                return ctrl.Result{}, nil
×
127
        }
×
128

129
        if r.SchedulerProvider != nil {
×
130
                err = r.SchedulerProvider.CreatePodGroupIfNotExists(ctx, &leaderWorkerSet, &pod)
×
131
                if err != nil {
×
132
                        return ctrl.Result{}, err
×
133
                }
×
134
        }
135

136
        // Once size = 1, no need to create worker statefulSets.
137
        if *leaderWorkerSet.Spec.LeaderWorkerTemplate.Size == 1 {
×
138
                return ctrl.Result{}, nil
×
139
        }
×
140

141
        // logic for handling leader pod
142
        if leaderWorkerSet.Spec.StartupPolicy == leaderworkerset.LeaderReadyStartupPolicy && !podutils.IsPodReady(&pod) {
×
143
                log.V(2).Info("defer the creation of the worker statefulset because leader pod is not ready.")
×
144
                return ctrl.Result{}, nil
×
145
        }
×
146
        revision, err := revisionutils.GetRevision(ctx, r.Client, &leaderWorkerSet, revisionutils.GetRevisionKey(&pod))
×
147
        if err != nil {
×
148
                log.Error(err, "Getting lws revisions")
×
149
                return ctrl.Result{}, err
×
150
        }
×
151
        if revision == nil {
×
152
                log.V(2).Info(fmt.Sprintf("Revision has not been created yet, requeing reconciler for pod %s", pod.Name))
×
153
                return ctrl.Result{Requeue: true, RequeueAfter: time.Second}, nil
×
154
        }
×
155
        statefulSet, err := constructWorkerStatefulSetApplyConfiguration(pod, leaderWorkerSet, revision)
×
156
        if err != nil {
×
157
                return ctrl.Result{}, err
×
158
        }
×
159

160
        // if exclusive placement is enabled but leader pod is not scheduled, don't create the worker sts
161
        if topologyKey, found := leaderWorkerSet.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey]; found {
×
162
                // check if the leader pod is scheduled.
×
163
                if pod.Spec.NodeName == "" {
×
164
                        log.V(2).Info(fmt.Sprintf("Pod %q is not scheduled yet", pod.Name))
×
165
                        return ctrl.Result{}, nil
×
166
                }
×
167
                if err := r.setNodeSelectorForWorkerPods(ctx, &pod, statefulSet, topologyKey); err != nil {
×
168
                        log.Error(err, "setting node selector for worker pods")
×
169
                        return ctrl.Result{}, err
×
170
                }
×
171
        }
172

173
        if err := setControllerReferenceWithStatefulSet(&pod, statefulSet, r.Scheme); err != nil {
×
174
                log.Error(err, "Setting controller reference.")
×
175
                return ctrl.Result{}, nil
×
176
        }
×
177

178
        obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(statefulSet)
×
179
        if err != nil {
×
180
                return ctrl.Result{}, err
×
181
        }
×
182
        workerStatefulSet := &unstructured.Unstructured{
×
183
                Object: obj,
×
184
        }
×
185

×
186
        var workerSts appsv1.StatefulSet
×
187
        if err := r.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: leaderWorkerSet.Namespace}, &workerSts); err != nil {
×
188
                if client.IgnoreNotFound(err) != nil {
×
189
                        return ctrl.Result{}, err
×
190
                }
×
191
                if err = r.Create(ctx, workerStatefulSet); err != nil {
×
192
                        r.Record.Eventf(&leaderWorkerSet, corev1.EventTypeWarning, FailedCreate, fmt.Sprintf("Failed to create worker statefulset for leader pod %s", pod.Name))
×
193
                        return ctrl.Result{}, client.IgnoreAlreadyExists(err)
×
194
                }
×
195
                r.Record.Eventf(&leaderWorkerSet, corev1.EventTypeNormal, GroupsProgressing, fmt.Sprintf("Created worker statefulset for leader pod %s", pod.Name))
×
196
        }
197
        log.V(2).Info("Worker Reconcile completed.")
×
198
        return ctrl.Result{}, nil
×
199
}
200

201
func (r *PodReconciler) handleRestartPolicy(ctx context.Context, pod corev1.Pod, leaderWorkerSet leaderworkerset.LeaderWorkerSet) (bool, error) {
×
202
        if leaderWorkerSet.Spec.LeaderWorkerTemplate.RestartPolicy != leaderworkerset.RecreateGroupOnPodRestart {
×
203
                return false, nil
×
204
        }
×
205
        // the leader pod will be deleted if the worker pod is deleted or any containes were restarted
206
        if !podutils.ContainerRestarted(pod) && !podutils.PodDeleted(pod) {
×
207
                return false, nil
×
208
        }
×
209
        var leader corev1.Pod
×
210
        if !podutils.LeaderPod(pod) {
×
211
                leaderPodName, ordinal := statefulsetutils.GetParentNameAndOrdinal(pod.Name)
×
212
                if ordinal == -1 {
×
213
                        return false, fmt.Errorf("parsing pod name for pod %s", pod.Name)
×
214
                }
×
215
                if err := r.Get(ctx, types.NamespacedName{Name: leaderPodName, Namespace: pod.Namespace}, &leader); err != nil {
×
216
                        // If the error is not found, it is likely caused by the fact that the leader was deleted but the worker statefulset
×
217
                        // deletion hasn't deleted all the worker pods
×
218
                        return false, client.IgnoreNotFound(err)
×
219
                }
×
220
                // Different revision key means that this pod will be deleted soon and alternative will be created with the matching key
221
                if revisionutils.GetRevisionKey(&leader) != revisionutils.GetRevisionKey(&pod) {
×
222
                        return false, nil
×
223
                }
×
224
        } else {
×
225
                leader = pod
×
226
        }
×
227
        // if the leader pod is being deleted, we don't need to send deletion requests
228
        if leader.DeletionTimestamp != nil {
×
229
                return true, nil
×
230
        }
×
231
        deletionOpt := metav1.DeletePropagationForeground
×
232
        if err := r.Delete(ctx, &leader, &client.DeleteOptions{
×
233
                PropagationPolicy: &deletionOpt,
×
234
        }); err != nil {
×
235
                return false, err
×
236
        }
×
237
        r.Record.Eventf(&leaderWorkerSet, corev1.EventTypeNormal, "RecreateGroupOnPodRestart", fmt.Sprintf("Worker pod %s failed, deleted leader pod %s to recreate group %s", pod.Name, leader.Name, leader.Labels[leaderworkerset.GroupIndexLabelKey]))
×
238
        return true, nil
×
239
}
240

241
func (r *PodReconciler) setNodeSelectorForWorkerPods(ctx context.Context, pod *corev1.Pod, sts *appsapplyv1.StatefulSetApplyConfiguration, topologyKey string) error {
×
242

×
243
        log := ctrl.LoggerFrom(ctx)
×
244
        topologyValue, err := r.topologyValueFromPod(ctx, pod, topologyKey)
×
245
        if err != nil {
×
246
                log.Error(err, "getting topology from leader pod")
×
247
                return err
×
248
        }
×
249

250
        // set node selector for worker pods, if worker pods already scheduled to different topology value
251
        // the following applying logic will automatically update it to match the leader pods, so we don't
252
        // need to verify if they have the same topology value
253
        sts.Spec.Template.Spec.WithNodeSelector(map[string]string{
×
254
                topologyKey: topologyValue,
×
255
        })
×
256
        return nil
×
257
}
258

259
func (r *PodReconciler) topologyValueFromPod(ctx context.Context, pod *corev1.Pod, topologyKey string) (string, error) {
×
260
        log := ctrl.LoggerFrom(ctx)
×
261

×
262
        nodeName := pod.Spec.NodeName
×
263
        ns := pod.Namespace
×
264

×
265
        // Get node the leader pod is running on.
×
266
        var node corev1.Node
×
267
        if err := r.Get(ctx, types.NamespacedName{Name: nodeName, Namespace: ns}, &node); err != nil {
×
268
                // We'll ignore not-found errors, since there is nothing we can do here.
×
269
                // A node may not exist temporarily due to a maintenance event or other scenarios.
×
270
                log.Error(err, fmt.Sprintf("getting node %s", nodeName))
×
271
                return "", client.IgnoreNotFound(err)
×
272
        }
×
273

274
        // Get topology (e.g. node pool name) from node labels.
275
        topology, exists := node.Labels[topologyKey]
×
276
        if !exists {
×
277
                return "", fmt.Errorf("node does not have topology label: %s", topology)
×
278
        }
×
279
        return topology, nil
×
280
}
281

282
// setControllerReferenceWithStatefulSet set controller reference for the StatefulSet
283
func setControllerReferenceWithStatefulSet(owner metav1.Object, sts *appsapplyv1.StatefulSetApplyConfiguration, scheme *runtime.Scheme) error {
×
284
        // Validate the owner.
×
285
        ro, ok := owner.(runtime.Object)
×
286
        if !ok {
×
287
                return fmt.Errorf("%T is not a runtime.Object, cannot call SetOwnerReference", owner)
×
288
        }
×
289
        gvk, err := apiutil.GVKForObject(ro, scheme)
×
290
        if err != nil {
×
291
                return err
×
292
        }
×
293
        sts.WithOwnerReferences(metaapplyv1.OwnerReference().
×
294
                WithAPIVersion(gvk.GroupVersion().String()).
×
295
                WithKind(gvk.Kind).
×
296
                WithName(owner.GetName()).
×
297
                WithUID(owner.GetUID()).
×
298
                WithBlockOwnerDeletion(true).
×
299
                WithController(true))
×
300
        return nil
×
301
}
302

303
// constructWorkerStatefulSetApplyConfiguration constructs the applied configuration for the leader StatefulSet
304
func constructWorkerStatefulSetApplyConfiguration(leaderPod corev1.Pod, lws leaderworkerset.LeaderWorkerSet, currentRevision *appsv1.ControllerRevision) (*appsapplyv1.StatefulSetApplyConfiguration, error) {
4✔
305
        currentLws, err := revisionutils.ApplyRevision(&lws, currentRevision)
4✔
306
        if err != nil {
4✔
307
                return nil, err
×
308
        }
×
309
        podTemplateSpec := *currentLws.Spec.LeaderWorkerTemplate.WorkerTemplate.DeepCopy()
4✔
310
        // construct pod template spec configuration
4✔
311
        obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&podTemplateSpec)
4✔
312
        if err != nil {
4✔
313
                return nil, err
×
314
        }
×
315
        var podTemplateApplyConfiguration coreapplyv1.PodTemplateSpecApplyConfiguration
4✔
316
        err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj, &podTemplateApplyConfiguration)
4✔
317
        if err != nil {
4✔
318
                return nil, err
×
319
        }
×
320
        selectorMap := map[string]string{
4✔
321
                leaderworkerset.GroupIndexLabelKey:      leaderPod.Labels[leaderworkerset.GroupIndexLabelKey],
4✔
322
                leaderworkerset.SetNameLabelKey:         lws.Name,
4✔
323
                leaderworkerset.GroupUniqueHashLabelKey: leaderPod.Labels[leaderworkerset.GroupUniqueHashLabelKey],
4✔
324
        }
4✔
325
        labelMap := map[string]string{
4✔
326
                leaderworkerset.GroupIndexLabelKey:      leaderPod.Labels[leaderworkerset.GroupIndexLabelKey],
4✔
327
                leaderworkerset.SetNameLabelKey:         lws.Name,
4✔
328
                leaderworkerset.GroupUniqueHashLabelKey: leaderPod.Labels[leaderworkerset.GroupUniqueHashLabelKey],
4✔
329
                leaderworkerset.RevisionKey:             revisionutils.GetRevisionKey(&leaderPod),
4✔
330
        }
4✔
331

4✔
332
        podTemplateApplyConfiguration.WithLabels(labelMap)
4✔
333
        podAnnotations := make(map[string]string)
4✔
334
        podAnnotations[leaderworkerset.SizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.Size))
4✔
335
        podAnnotations[leaderworkerset.LeaderPodNameAnnotationKey] = leaderPod.Name
4✔
336
        if lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey] != "" {
5✔
337
                podAnnotations[leaderworkerset.ExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey]
1✔
338
        }
1✔
339
        if lws.Spec.LeaderWorkerTemplate.SubGroupPolicy != nil {
5✔
340
                podAnnotations[leaderworkerset.SubGroupSizeAnnotationKey] = strconv.Itoa(int(*lws.Spec.LeaderWorkerTemplate.SubGroupPolicy.SubGroupSize))
1✔
341
                if lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] != "" {
2✔
342
                        podAnnotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey] = lws.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey]
1✔
343
                }
1✔
344
        }
345
        acceleratorutils.AddTPUAnnotations(leaderPod, podAnnotations)
4✔
346
        podTemplateApplyConfiguration.WithAnnotations(podAnnotations)
4✔
347
        serviceName := leaderPod.Name
4✔
348
        if lws.Spec.NetworkConfig == nil || *lws.Spec.NetworkConfig.SubdomainPolicy == leaderworkerset.SubdomainShared {
8✔
349
                serviceName = lws.Name
4✔
350
        }
4✔
351
        // construct statefulset apply configuration
352
        statefulSetConfig := appsapplyv1.StatefulSet(leaderPod.Name, leaderPod.Namespace).
4✔
353
                WithSpec(appsapplyv1.StatefulSetSpec().
4✔
354
                        WithServiceName(serviceName).
4✔
355
                        WithReplicas(*lws.Spec.LeaderWorkerTemplate.Size - 1).
4✔
356
                        WithPodManagementPolicy(appsv1.ParallelPodManagement).
4✔
357
                        WithTemplate(&podTemplateApplyConfiguration).
4✔
358
                        WithOrdinals(appsapplyv1.StatefulSetOrdinals().WithStart(1)).
4✔
359
                        WithSelector(metaapplyv1.LabelSelector().
4✔
360
                                WithMatchLabels(selectorMap))).
4✔
361
                WithLabels(labelMap)
4✔
362
        return statefulSetConfig, nil
4✔
363
}
4✔
364

5✔
365
func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
1✔
366
        return ctrl.NewControllerManagedBy(mgr).
1✔
367
                For(&corev1.Pod{}).
368
                WithEventFilter(predicate.NewPredicateFuncs(func(object client.Object) bool {
5✔
369
                        if pod, ok := object.(*corev1.Pod); ok {
1✔
370
                                _, exist := pod.Labels[leaderworkerset.SetNameLabelKey]
1✔
371
                                return exist
1✔
372
                        }
1✔
373
                        if statefulSet, ok := object.(*appsv1.StatefulSet); ok {
1✔
374
                                _, exist := statefulSet.Labels[leaderworkerset.SetNameLabelKey]
1✔
375
                                return exist
4✔
376
                        }
377
                        return false
378
                })).Owns(&appsv1.StatefulSet{}).Complete(r)
×
379
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc