• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tensorchord / openmodelz / 6582261457

19 Oct 2023 10:09AM UTC coverage: 25.992% (-0.3%) from 26.31%
6582261457

Pull #190

github

cutecutecat
fix

Signed-off-by: cutecutecat <junyuchen@tensorchord.ai>
Pull Request #190: feat: log pod start time

70 of 70 new or added lines in 4 files covered. (100.0%)

956 of 3678 relevant lines covered (25.99%)

1.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.66
/ingress-operator/pkg/controller/core.go
1
package controller
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "fmt"
7
        "strings"
8
        "time"
9

10
        "github.com/google/go-cmp/cmp"
11
        corev1 "k8s.io/api/core/v1"
12
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13
        "k8s.io/apimachinery/pkg/runtime/schema"
14
        "k8s.io/apimachinery/pkg/util/runtime"
15
        "k8s.io/apimachinery/pkg/util/wait"
16
        kubeinformers "k8s.io/client-go/informers"
17
        "k8s.io/client-go/kubernetes"
18
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
19
        "k8s.io/client-go/tools/cache"
20
        "k8s.io/client-go/tools/record"
21
        "k8s.io/client-go/util/workqueue"
22
        klog "k8s.io/klog"
23

24
        faasv1 "github.com/tensorchord/openmodelz/ingress-operator/pkg/apis/modelzetes/v1"
25
        "github.com/tensorchord/openmodelz/ingress-operator/pkg/client/clientset/versioned/scheme"
26
        faasscheme "github.com/tensorchord/openmodelz/ingress-operator/pkg/client/clientset/versioned/scheme"
27
        v1 "github.com/tensorchord/openmodelz/ingress-operator/pkg/client/informers/externalversions/modelzetes/v1"
28
        listers "github.com/tensorchord/openmodelz/ingress-operator/pkg/client/listers/modelzetes/v1"
29
        localconsts "github.com/tensorchord/openmodelz/ingress-operator/pkg/consts"
30
        "github.com/tensorchord/openmodelz/modelzetes/pkg/consts"
31
)
32

33
const AgentName = "ingress-operator"
34
const FaasIngressKind = "InferenceIngress"
35
const OpenfaasWorkloadPort = 8080
36

37
const (
38
        // SuccessSynced is used as part of the Event 'reason' when a Function is synced
39
        SuccessSynced = "Synced"
40
        // ErrResourceExists is used as part of the Event 'reason' when a Function fails
41
        // to sync due to a Deployment of the same name already existing.
42
        ErrResourceExists = "ErrResourceExists"
43
        // MessageResourceExists is the message used for Events when a resource
44
        // fails to sync due to a Deployment already existing
45
        MessageResourceExists = "Resource %q already exists and is not managed by controller"
46
        // MessageResourceSynced is the message used for an Event fired when a Function
47
        // is synced successfully
48
        MessageResourceSynced = "FunctionIngress synced successfully"
49
)
50

51
// BaseController is the controller contains the common function ingress
52
// implementation that is shared between the various versions of k8s.
53
type BaseController struct {
54
        FunctionsLister listers.InferenceIngressLister
55
        FunctionsSynced cache.InformerSynced
56

57
        // Workqueue is a rate limited work queue. This is used to queue work to be
58
        // processed instead of performing it as soon as a change happens. This
59
        // means we can ensure we only process a fixed amount of resources at a
60
        // time, and makes it easy to ensure we are never processing the same item
61
        // simultaneously in two different workers.
62
        Workqueue workqueue.RateLimitingInterface
63

64
        SyncHandler func(ctx context.Context, key string) error
65
}
66

67
func (c BaseController) Run(threadiness int, stopCh <-chan struct{}) error {
×
68
        ctx, cancel := context.WithCancel(context.Background())
×
69
        defer runtime.HandleCrash()
×
70
        defer c.Workqueue.ShutDown()
×
71
        defer cancel()
×
72

×
73
        // Start the informer factories to begin populating the informer caches
×
74
        // Wait for the caches to be synced before starting workers
×
75
        klog.Info("Waiting for informer caches to sync")
×
76
        if ok := cache.WaitForCacheSync(stopCh, c.FunctionsSynced); !ok {
×
77
                return fmt.Errorf("failed to wait for caches to sync")
×
78
        }
×
79

80
        klog.Info("Starting workers")
×
81
        // Launch two workers to process Function resources
×
82
        for i := 0; i < threadiness; i++ {
×
83
                go wait.Until(c.runWorker(ctx), time.Second, stopCh)
×
84
        }
×
85

86
        klog.Info("Started workers")
×
87
        <-stopCh
×
88
        klog.Info("Shutting down workers")
×
89

×
90
        return nil
×
91
}
92

93
// runWorker is a long-running function that will continually call the
94
// processNextWorkItem function in order to read and process a message on the workqueue.
95
func (c BaseController) runWorker(ctx context.Context) func() {
×
96
        return func() {
×
97
                for c.processNextWorkItem(ctx) {
×
98
                }
×
99
        }
100
}
101

102
// processNextWorkItem will read a single work item off the workqueue and
103
// attempt to process it, by calling the syncHandler.
104
func (c BaseController) processNextWorkItem(ctx context.Context) bool {
×
105
        obj, shutdown := c.Workqueue.Get()
×
106

×
107
        if shutdown {
×
108
                return false
×
109
        }
×
110

111
        err := func(obj interface{}) error {
×
112
                defer c.Workqueue.Done(obj)
×
113
                var key string
×
114
                var ok bool
×
115
                if key, ok = obj.(string); !ok {
×
116
                        c.Workqueue.Forget(obj)
×
117
                        runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
×
118
                        return nil
×
119
                }
×
120
                if err := c.SyncHandler(ctx, key); err != nil {
×
121
                        return fmt.Errorf("error syncing '%s': %s", key, err.Error())
×
122
                }
×
123
                c.Workqueue.Forget(obj)
×
124
                return nil
×
125
        }(obj)
126

127
        if err != nil {
×
128
                runtime.HandleError(err)
×
129
                return true
×
130
        }
×
131

132
        return true
×
133
}
134

135
// enqueueFunction takes a fni resource and converts it into a namespace/name
136
// string which is then put onto the work queue. This method should *not* be
137
// passed resources of any type other than fni.
138
func (c *BaseController) EnqueueFunction(obj interface{}) {
×
139
        var key string
×
140
        var err error
×
141
        if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
×
142
                runtime.HandleError(err)
×
143
                return
×
144
        }
×
145
        c.Workqueue.AddRateLimited(key)
×
146
}
147

148
// handleObject will take any resource implementing metav1.Object and attempt
149
// to find the fni resource that 'owns' it. It does this by looking at the
150
// objects metadata.ownerReferences field for an appropriate OwnerReference.
151
// It then enqueues that fni resource to be processed. If the object does not
152
// have an appropriate OwnerReference, it will simply be skipped.
153
func (c BaseController) HandleObject(obj interface{}) {
×
154
        var object metav1.Object
×
155
        var ok bool
×
156
        if object, ok = obj.(metav1.Object); !ok {
×
157
                tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
×
158
                if !ok {
×
159
                        runtime.HandleError(fmt.Errorf("error decoding object, invalid type"))
×
160
                        return
×
161
                }
×
162
                object, ok = tombstone.Obj.(metav1.Object)
×
163
                if !ok {
×
164
                        runtime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
×
165
                        return
×
166
                }
×
167
                klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
×
168
        }
169

170
        klog.V(4).Infof("Processing object: %s", object.GetName())
×
171
        if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
×
172
                // If this object is not owned by a fni, we should not do anything more
×
173
                // with it.
×
174
                if ownerRef.Kind != FaasIngressKind {
×
175
                        return
×
176
                }
×
177

178
                fni, err := c.FunctionsLister.InferenceIngresses(object.GetNamespace()).Get(ownerRef.Name)
×
179
                if err != nil {
×
180
                        klog.Infof("FunctionIngress '%s' deleted. Ignoring orphaned object '%s': %v", ownerRef.Name, object.GetSelfLink(), err)
×
181
                        return
×
182
                }
×
183

184
                c.EnqueueFunction(fni)
×
185
                return
×
186
        }
187
}
188

189
func (c BaseController) SetupEventHandlers(
190
        functionIngress v1.InferenceIngressInformer,
191
        kubeInformerFactory kubeinformers.SharedInformerFactory,
192
) {
×
193
        functionIngress.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
194
                AddFunc: c.EnqueueFunction,
×
195
                UpdateFunc: func(old, new interface{}) {
×
196

×
197
                        oldFn, ok := CheckCustomResourceType(old)
×
198
                        if !ok {
×
199
                                return
×
200
                        }
×
201
                        newFn, ok := CheckCustomResourceType(new)
×
202
                        if !ok {
×
203
                                return
×
204
                        }
×
205
                        diffSpec := cmp.Diff(oldFn.Spec, newFn.Spec)
×
206
                        diffAnnotations := cmp.Diff(oldFn.ObjectMeta.Annotations, newFn.ObjectMeta.Annotations)
×
207

×
208
                        if diffSpec != "" || diffAnnotations != "" {
×
209
                                c.EnqueueFunction(new)
×
210
                        }
×
211
                },
212
        })
213

214
        // Set up an event handler for when functions related resources like pods, deployments, replica sets
215
        // can't be materialized. This logs abnormal events like ImagePullBackOff, back-off restarting failed container,
216
        // failed to start container, oci runtime errors, etc
217
        // Enable this with -v=3
218
        kubeInformerFactory.Core().V1().Events().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
219
                AddFunc: func(obj interface{}) {
×
220
                        key, err := cache.MetaNamespaceKeyFunc(obj)
×
221
                        if err == nil {
×
222
                                event := obj.(*corev1.Event)
×
223
                                since := time.Since(event.LastTimestamp.Time)
×
224
                                // log abnormal events occurred in the last minute
×
225
                                if since.Seconds() < 61 && strings.Contains(event.Type, "Warning") {
×
226
                                        klog.V(3).Infof("Abnormal event detected on %s %s: %s", event.LastTimestamp, key, event.Message)
×
227
                                }
×
228
                        }
229
                },
230
        })
231
}
232

233
func GetClass(ingressType string) string {
5✔
234
        switch ingressType {
5✔
235
        case "":
×
236
        case "nginx":
3✔
237
                return "nginx"
3✔
238
        default:
2✔
239
                return ingressType
2✔
240
        }
241

242
        return "nginx"
×
243
}
244

245
func GetIssuerKind(issuerType string) string {
×
246
        switch issuerType {
×
247
        case "ClusterIssuer":
×
248
                return "cert-manager.io/cluster-issuer"
×
249
        default:
×
250
                return "cert-manager.io/issuer"
×
251
        }
252
}
253

254
func MakeAnnotations(fni *faasv1.InferenceIngress, host string) map[string]string {
5✔
255
        controlPlane, exist := fni.Annotations[consts.AnnotationControlPlaneKey]
5✔
256
        class := GetClass(fni.Spec.IngressType)
5✔
257
        specJSON, _ := json.Marshal(fni)
5✔
258
        annotations := make(map[string]string)
5✔
259

5✔
260
        annotations["ai.tensorchord.spec"] = string(specJSON)
5✔
261
        inferenceNamespace := fni.Labels[consts.LabelInferenceNamespace]
5✔
262

5✔
263
        if !fni.Spec.BypassGateway {
9✔
264
                switch class {
4✔
265
                case "nginx":
2✔
266
                        switch host {
2✔
267
                        // TODO: make this configurable
268
                        case "apiserver":
2✔
269
                                annotations["nginx.ingress.kubernetes.io/rewrite-target"] = "/api/v1/" + fni.Spec.Framework +
2✔
270
                                        "/" + fni.Spec.Function + "/$1"
2✔
271
                                annotations["nginx.ingress.kubernetes.io/use-regex"] = "true"
2✔
272
                        default:
×
273
                                // for inference created by modelz apiserver
×
274
                                if exist && controlPlane == localconsts.ModelzAnnotationValue {
×
275
                                        annotations["nginx.ingress.kubernetes.io/rewrite-target"] = "/api/v1/" + fni.Spec.Framework +
×
276
                                                "/" + fni.Spec.Function + "/$1"
×
277
                                        annotations["nginx.ingress.kubernetes.io/use-regex"] = "true"
×
278
                                        annotations["nginx.ingress.kubernetes.io/ssl-redirect"] = "false"
×
279
                                } else {
×
280
                                        annotations["nginx.ingress.kubernetes.io/rewrite-target"] = "/inference/" + fni.Name + "." + inferenceNamespace + "/$1"
×
281
                                        annotations["nginx.ingress.kubernetes.io/ssl-redirect"] = "false"
×
282
                                        annotations["nginx.ingress.kubernetes.io/use-regex"] = "true"
×
283
                                }
×
284
                        }
285
                }
286
        }
287

288
        annotations["nginx.ingress.kubernetes.io/proxy-send-timeout"] = "300"
5✔
289
        annotations["nginx.ingress.kubernetes.io/proxy-read-timeout"] = "300"
5✔
290
        annotations["nginx.ingress.kubernetes.io/proxy-body-size"] = "16m"
5✔
291

5✔
292
        // We use the default certificate for now.
5✔
293
        // if fni.Spec.UseTLS() {
5✔
294
        //         issuerType := GetIssuerKind(fni.Spec.TLS.IssuerRef.Kind)
5✔
295
        //         annotations[issuerType] = fni.Spec.TLS.IssuerRef.Name
5✔
296
        // }
5✔
297

5✔
298
        // Set annotations with overrides from FunctionIngress
5✔
299
        // annotations
5✔
300
        for k, v := range fni.ObjectMeta.Annotations {
10✔
301
                annotations[k] = v
5✔
302
        }
5✔
303

304
        return annotations
5✔
305
}
306

307
func MakeOwnerRef(fni *faasv1.InferenceIngress) []metav1.OwnerReference {
×
308
        ref := []metav1.OwnerReference{
×
309
                *metav1.NewControllerRef(fni, schema.GroupVersionKind{
×
310
                        Group:   faasv1.SchemeGroupVersion.Group,
×
311
                        Version: faasv1.SchemeGroupVersion.Version,
×
312
                        Kind:    FaasIngressKind,
×
313
                }),
×
314
        }
×
315
        return ref
×
316
}
×
317

318
func CheckCustomResourceType(obj interface{}) (faasv1.InferenceIngress, bool) {
×
319
        var fn *faasv1.InferenceIngress
×
320
        var ok bool
×
321
        if fn, ok = obj.(*faasv1.InferenceIngress); !ok {
×
322
                klog.Errorf("Event Watch received an invalid object: %#v", obj)
×
323
                return faasv1.InferenceIngress{}, false
×
324
        }
×
325
        return *fn, true
×
326
}
327

328
func IngressNeedsUpdate(old, fni *faasv1.InferenceIngress) bool {
×
329
        return !cmp.Equal(old.Spec, fni.Spec) ||
×
330
                !cmp.Equal(old.ObjectMeta.Annotations, fni.ObjectMeta.Annotations)
×
331
}
×
332

333
func EventRecorder(client kubernetes.Interface) record.EventRecorder {
×
334
        // Create event broadcaster
×
335
        // Add o6s types to the default Kubernetes Scheme so Events can be
×
336
        // logged for faas-controller types.
×
337
        faasscheme.AddToScheme(scheme.Scheme)
×
338
        klog.V(4).Info("Creating event broadcaster")
×
339
        eventBroadcaster := record.NewBroadcaster()
×
340
        eventBroadcaster.StartLogging(klog.V(4).Infof)
×
341
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: client.CoreV1().Events("")})
×
342
        return eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: AgentName})
×
343
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc