• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tensorchord / openmodelz / 6232030347

19 Sep 2023 06:26AM UTC coverage: 27.165% (+0.02%) from 27.144%
6232030347

Pull #176

github

tanshuqiang
ix: 404 inference not found (#175)

Signed-off-by: tanshuqiang <tan.shuqiang@21vianet.com>
Pull Request #176: fix: 404 inference not found (#175)

2 of 2 new or added lines in 1 file covered. (100.0%)

982 of 3615 relevant lines covered (27.16%)

1.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

19.0
/ingress-operator/pkg/controller/core.go
1
package controller
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "fmt"
7
        "strings"
8
        "time"
9

10
        "github.com/google/go-cmp/cmp"
11
        faasv1 "github.com/tensorchord/openmodelz/ingress-operator/pkg/apis/modelzetes/v1"
12
        "github.com/tensorchord/openmodelz/ingress-operator/pkg/client/clientset/versioned/scheme"
13
        faasscheme "github.com/tensorchord/openmodelz/ingress-operator/pkg/client/clientset/versioned/scheme"
14
        v1 "github.com/tensorchord/openmodelz/ingress-operator/pkg/client/informers/externalversions/modelzetes/v1"
15
        listers "github.com/tensorchord/openmodelz/ingress-operator/pkg/client/listers/modelzetes/v1"
16
        "github.com/tensorchord/openmodelz/modelzetes/pkg/consts"
17
        corev1 "k8s.io/api/core/v1"
18
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19
        "k8s.io/apimachinery/pkg/runtime/schema"
20
        "k8s.io/apimachinery/pkg/util/runtime"
21
        "k8s.io/apimachinery/pkg/util/wait"
22
        kubeinformers "k8s.io/client-go/informers"
23
        "k8s.io/client-go/kubernetes"
24
        typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
25
        "k8s.io/client-go/tools/cache"
26
        "k8s.io/client-go/tools/record"
27
        "k8s.io/client-go/util/workqueue"
28
        klog "k8s.io/klog"
29
)
30

31
const AgentName = "ingress-operator"
32
const FaasIngressKind = "InferenceIngress"
33
const OpenfaasWorkloadPort = 8080
34

35
const (
36
        // SuccessSynced is used as part of the Event 'reason' when a Function is synced
37
        SuccessSynced = "Synced"
38
        // ErrResourceExists is used as part of the Event 'reason' when a Function fails
39
        // to sync due to a Deployment of the same name already existing.
40
        ErrResourceExists = "ErrResourceExists"
41
        // MessageResourceExists is the message used for Events when a resource
42
        // fails to sync due to a Deployment already existing
43
        MessageResourceExists = "Resource %q already exists and is not managed by controller"
44
        // MessageResourceSynced is the message used for an Event fired when a Function
45
        // is synced successfully
46
        MessageResourceSynced = "FunctionIngress synced successfully"
47
)
48

49
// BaseController is the controller contains the common function ingress
50
// implementation that is shared between the various versions of k8s.
51
type BaseController struct {
52
        FunctionsLister listers.InferenceIngressLister
53
        FunctionsSynced cache.InformerSynced
54

55
        // Workqueue is a rate limited work queue. This is used to queue work to be
56
        // processed instead of performing it as soon as a change happens. This
57
        // means we can ensure we only process a fixed amount of resources at a
58
        // time, and makes it easy to ensure we are never processing the same item
59
        // simultaneously in two different workers.
60
        Workqueue workqueue.RateLimitingInterface
61

62
        SyncHandler func(ctx context.Context, key string) error
63
}
64

65
func (c BaseController) Run(threadiness int, stopCh <-chan struct{}) error {
×
66
        ctx, cancel := context.WithCancel(context.Background())
×
67
        defer runtime.HandleCrash()
×
68
        defer c.Workqueue.ShutDown()
×
69
        defer cancel()
×
70

×
71
        // Start the informer factories to begin populating the informer caches
×
72
        // Wait for the caches to be synced before starting workers
×
73
        klog.Info("Waiting for informer caches to sync")
×
74
        if ok := cache.WaitForCacheSync(stopCh, c.FunctionsSynced); !ok {
×
75
                return fmt.Errorf("failed to wait for caches to sync")
×
76
        }
×
77

78
        klog.Info("Starting workers")
×
79
        // Launch two workers to process Function resources
×
80
        for i := 0; i < threadiness; i++ {
×
81
                go wait.Until(c.runWorker(ctx), time.Second, stopCh)
×
82
        }
×
83

84
        klog.Info("Started workers")
×
85
        <-stopCh
×
86
        klog.Info("Shutting down workers")
×
87

×
88
        return nil
×
89
}
90

91
// runWorker is a long-running function that will continually call the
92
// processNextWorkItem function in order to read and process a message on the workqueue.
93
func (c BaseController) runWorker(ctx context.Context) func() {
×
94
        return func() {
×
95
                for c.processNextWorkItem(ctx) {
×
96
                }
×
97
        }
98
}
99

100
// processNextWorkItem will read a single work item off the workqueue and
101
// attempt to process it, by calling the syncHandler.
102
func (c BaseController) processNextWorkItem(ctx context.Context) bool {
×
103
        obj, shutdown := c.Workqueue.Get()
×
104

×
105
        if shutdown {
×
106
                return false
×
107
        }
×
108

109
        err := func(obj interface{}) error {
×
110
                defer c.Workqueue.Done(obj)
×
111
                var key string
×
112
                var ok bool
×
113
                if key, ok = obj.(string); !ok {
×
114
                        c.Workqueue.Forget(obj)
×
115
                        runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
×
116
                        return nil
×
117
                }
×
118
                if err := c.SyncHandler(ctx, key); err != nil {
×
119
                        return fmt.Errorf("error syncing '%s': %s", key, err.Error())
×
120
                }
×
121
                c.Workqueue.Forget(obj)
×
122
                return nil
×
123
        }(obj)
124

125
        if err != nil {
×
126
                runtime.HandleError(err)
×
127
                return true
×
128
        }
×
129

130
        return true
×
131
}
132

133
// enqueueFunction takes a fni resource and converts it into a namespace/name
134
// string which is then put onto the work queue. This method should *not* be
135
// passed resources of any type other than fni.
136
func (c *BaseController) EnqueueFunction(obj interface{}) {
×
137
        var key string
×
138
        var err error
×
139
        if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
×
140
                runtime.HandleError(err)
×
141
                return
×
142
        }
×
143
        c.Workqueue.AddRateLimited(key)
×
144
}
145

146
// handleObject will take any resource implementing metav1.Object and attempt
147
// to find the fni resource that 'owns' it. It does this by looking at the
148
// objects metadata.ownerReferences field for an appropriate OwnerReference.
149
// It then enqueues that fni resource to be processed. If the object does not
150
// have an appropriate OwnerReference, it will simply be skipped.
151
func (c BaseController) HandleObject(obj interface{}) {
×
152
        var object metav1.Object
×
153
        var ok bool
×
154
        if object, ok = obj.(metav1.Object); !ok {
×
155
                tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
×
156
                if !ok {
×
157
                        runtime.HandleError(fmt.Errorf("error decoding object, invalid type"))
×
158
                        return
×
159
                }
×
160
                object, ok = tombstone.Obj.(metav1.Object)
×
161
                if !ok {
×
162
                        runtime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
×
163
                        return
×
164
                }
×
165
                klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
×
166
        }
167

168
        klog.V(4).Infof("Processing object: %s", object.GetName())
×
169
        if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
×
170
                // If this object is not owned by a fni, we should not do anything more
×
171
                // with it.
×
172
                if ownerRef.Kind != FaasIngressKind {
×
173
                        return
×
174
                }
×
175

176
                fni, err := c.FunctionsLister.InferenceIngresses(object.GetNamespace()).Get(ownerRef.Name)
×
177
                if err != nil {
×
178
                        klog.Infof("FunctionIngress '%s' deleted. Ignoring orphaned object '%s': %v", ownerRef.Name, object.GetSelfLink(), err)
×
179
                        return
×
180
                }
×
181

182
                c.EnqueueFunction(fni)
×
183
                return
×
184
        }
185
}
186

187
func (c BaseController) SetupEventHandlers(
188
        functionIngress v1.InferenceIngressInformer,
189
        kubeInformerFactory kubeinformers.SharedInformerFactory,
190
) {
×
191
        functionIngress.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
192
                AddFunc: c.EnqueueFunction,
×
193
                UpdateFunc: func(old, new interface{}) {
×
194

×
195
                        oldFn, ok := CheckCustomResourceType(old)
×
196
                        if !ok {
×
197
                                return
×
198
                        }
×
199
                        newFn, ok := CheckCustomResourceType(new)
×
200
                        if !ok {
×
201
                                return
×
202
                        }
×
203
                        diffSpec := cmp.Diff(oldFn.Spec, newFn.Spec)
×
204
                        diffAnnotations := cmp.Diff(oldFn.ObjectMeta.Annotations, newFn.ObjectMeta.Annotations)
×
205

×
206
                        if diffSpec != "" || diffAnnotations != "" {
×
207
                                c.EnqueueFunction(new)
×
208
                        }
×
209
                },
210
        })
211

212
        // Set up an event handler for when functions related resources like pods, deployments, replica sets
213
        // can't be materialized. This logs abnormal events like ImagePullBackOff, back-off restarting failed container,
214
        // failed to start container, oci runtime errors, etc
215
        // Enable this with -v=3
216
        kubeInformerFactory.Core().V1().Events().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
217
                AddFunc: func(obj interface{}) {
×
218
                        key, err := cache.MetaNamespaceKeyFunc(obj)
×
219
                        if err == nil {
×
220
                                event := obj.(*corev1.Event)
×
221
                                since := time.Since(event.LastTimestamp.Time)
×
222
                                // log abnormal events occurred in the last minute
×
223
                                if since.Seconds() < 61 && strings.Contains(event.Type, "Warning") {
×
224
                                        klog.V(3).Infof("Abnormal event detected on %s %s: %s", event.LastTimestamp, key, event.Message)
×
225
                                }
×
226
                        }
227
                },
228
        })
229
}
230

231
func GetClass(ingressType string) string {
5✔
232
        switch ingressType {
5✔
233
        case "":
×
234
        case "nginx":
3✔
235
                return "nginx"
3✔
236
        default:
2✔
237
                return ingressType
2✔
238
        }
239

240
        return "nginx"
×
241
}
242

243
func GetIssuerKind(issuerType string) string {
×
244
        switch issuerType {
×
245
        case "ClusterIssuer":
×
246
                return "cert-manager.io/cluster-issuer"
×
247
        default:
×
248
                return "cert-manager.io/issuer"
×
249
        }
250
}
251

252
func MakeAnnotations(fni *faasv1.InferenceIngress, host string) map[string]string {
5✔
253
        class := GetClass(fni.Spec.IngressType)
5✔
254
        specJSON, _ := json.Marshal(fni)
5✔
255
        annotations := make(map[string]string)
5✔
256

5✔
257
        annotations["ai.tensorchord.spec"] = string(specJSON)
5✔
258
        inferenceNamespace := fni.Labels[consts.LabelInferenceNamespace]
5✔
259

5✔
260
        if !fni.Spec.BypassGateway {
9✔
261
                switch class {
4✔
262
                case "nginx":
2✔
263
                        switch host {
2✔
264
                        // TODO: make this configurable
265
                        case "apiserver":
2✔
266
                                annotations["nginx.ingress.kubernetes.io/rewrite-target"] = "/api/v1/" + fni.Spec.Framework +
2✔
267
                                        "/" + fni.Spec.Function + "/$1"
2✔
268
                                annotations["nginx.ingress.kubernetes.io/use-regex"] = "true"
2✔
269
                        default:
×
270
                                annotations["nginx.ingress.kubernetes.io/rewrite-target"] = "/inference/" + fni.Name + "." + inferenceNamespace + "/$1"
×
271
                                annotations["nginx.ingress.kubernetes.io/ssl-redirect"] = "false"
×
272
                                annotations["nginx.ingress.kubernetes.io/use-regex"] = "true"
×
273
                        }
274

275
                }
276
        }
277

278
        annotations["nginx.ingress.kubernetes.io/proxy-send-timeout"] = "300"
5✔
279
        annotations["nginx.ingress.kubernetes.io/proxy-read-timeout"] = "300"
5✔
280
        annotations["nginx.ingress.kubernetes.io/proxy-body-size"] = "16m"
5✔
281

5✔
282
        // We use the default certificate for now.
5✔
283
        // if fni.Spec.UseTLS() {
5✔
284
        //         issuerType := GetIssuerKind(fni.Spec.TLS.IssuerRef.Kind)
5✔
285
        //         annotations[issuerType] = fni.Spec.TLS.IssuerRef.Name
5✔
286
        // }
5✔
287

5✔
288
        // Set annotations with overrides from FunctionIngress
5✔
289
        // annotations
5✔
290
        for k, v := range fni.ObjectMeta.Annotations {
10✔
291
                annotations[k] = v
5✔
292
        }
5✔
293

294
        return annotations
5✔
295
}
296

297
func MakeOwnerRef(fni *faasv1.InferenceIngress) []metav1.OwnerReference {
×
298
        ref := []metav1.OwnerReference{
×
299
                *metav1.NewControllerRef(fni, schema.GroupVersionKind{
×
300
                        Group:   faasv1.SchemeGroupVersion.Group,
×
301
                        Version: faasv1.SchemeGroupVersion.Version,
×
302
                        Kind:    FaasIngressKind,
×
303
                }),
×
304
        }
×
305
        return ref
×
306
}
×
307

308
func CheckCustomResourceType(obj interface{}) (faasv1.InferenceIngress, bool) {
×
309
        var fn *faasv1.InferenceIngress
×
310
        var ok bool
×
311
        if fn, ok = obj.(*faasv1.InferenceIngress); !ok {
×
312
                klog.Errorf("Event Watch received an invalid object: %#v", obj)
×
313
                return faasv1.InferenceIngress{}, false
×
314
        }
×
315
        return *fn, true
×
316
}
317

318
func IngressNeedsUpdate(old, fni *faasv1.InferenceIngress) bool {
×
319
        return !cmp.Equal(old.Spec, fni.Spec) ||
×
320
                !cmp.Equal(old.ObjectMeta.Annotations, fni.ObjectMeta.Annotations)
×
321
}
×
322

323
func EventRecorder(client kubernetes.Interface) record.EventRecorder {
×
324
        // Create event broadcaster
×
325
        // Add o6s types to the default Kubernetes Scheme so Events can be
×
326
        // logged for faas-controller types.
×
327
        faasscheme.AddToScheme(scheme.Scheme)
×
328
        klog.V(4).Info("Creating event broadcaster")
×
329
        eventBroadcaster := record.NewBroadcaster()
×
330
        eventBroadcaster.StartLogging(klog.V(4).Infof)
×
331
        eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: client.CoreV1().Events("")})
×
332
        return eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: AgentName})
×
333
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc