• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tensorchord / openmodelz / 6582376263

20 Oct 2023 01:48AM UTC coverage: 25.971% (-0.3%) from 26.31%
6582376263

Pull #190

github

cutecutecat
fix

Signed-off-by: cutecutecat <junyuchen@tensorchord.ai>
Pull Request #190: feat: log pod start time

72 of 72 new or added lines in 4 files covered. (100.0%)

956 of 3681 relevant lines covered (25.97%)

1.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/agent/pkg/server/server_init_kubernetes.go
1
package server
2

3
import (
4
        "context"
5
        "fmt"
6
        "time"
7

8
        "github.com/prometheus/client_golang/prometheus"
9
        "github.com/sirupsen/logrus"
10
        v1 "k8s.io/api/core/v1"
11
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12
        kubeinformers "k8s.io/client-go/informers"
13
        kubeinformersv1 "k8s.io/client-go/informers/core/v1"
14
        "k8s.io/client-go/kubernetes"
15
        "k8s.io/client-go/tools/cache"
16
        "k8s.io/client-go/tools/clientcmd"
17

18
        kubefledged "github.com/senthilrch/kube-fledged/pkg/client/clientset/versioned"
19
        "github.com/tensorchord/openmodelz/agent/api/types"
20
        "github.com/tensorchord/openmodelz/agent/pkg/event"
21
        "github.com/tensorchord/openmodelz/agent/pkg/k8s"
22
        "github.com/tensorchord/openmodelz/agent/pkg/log"
23
        "github.com/tensorchord/openmodelz/agent/pkg/runtime"
24
        "github.com/tensorchord/openmodelz/agent/pkg/scaling"
25
        ingressclient "github.com/tensorchord/openmodelz/ingress-operator/pkg/client/clientset/versioned"
26
        clientset "github.com/tensorchord/openmodelz/modelzetes/pkg/client/clientset/versioned"
27
        informers "github.com/tensorchord/openmodelz/modelzetes/pkg/client/informers/externalversions"
28
        "github.com/tensorchord/openmodelz/modelzetes/pkg/consts"
29
        "github.com/tensorchord/openmodelz/modelzetes/pkg/signals"
30
)
31

32
func (s *Server) initKubernetesResources() error {
×
33
        clientCmdConfig, err := clientcmd.BuildConfigFromFlags(
×
34
                s.config.KubeConfig.MasterURL, s.config.KubeConfig.Kubeconfig)
×
35
        if err != nil {
×
36
                return err
×
37
        }
×
38

39
        clientCmdConfig.QPS = float32(s.config.KubeConfig.QPS)
×
40
        clientCmdConfig.Burst = s.config.KubeConfig.Burst
×
41

×
42
        kubeClient, err := kubernetes.NewForConfig(clientCmdConfig)
×
43
        if err != nil {
×
44
                return err
×
45
        }
×
46
        inferenceClient, err := clientset.NewForConfig(clientCmdConfig)
×
47
        if err != nil {
×
48
                return err
×
49
        }
×
50

51
        var ingressClient ingressclient.Interface
×
52
        if s.config.Ingress.IngressEnabled {
×
53
                ingressClient, err = ingressclient.NewForConfig(clientCmdConfig)
×
54
                if err != nil {
×
55
                        return err
×
56
                }
×
57
        }
58
        kubefledgedClient, err := kubefledged.NewForConfig(clientCmdConfig)
×
59
        if err != nil {
×
60
                return err
×
61
        }
×
62

63
        kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(
×
64
                kubeClient, s.config.KubeConfig.ResyncPeriod)
×
65

×
66
        inferenceInformerFactory := informers.NewSharedInformerFactoryWithOptions(
×
67
                inferenceClient, s.config.KubeConfig.ResyncPeriod)
×
68

×
69
        // set up signals so we handle the first shutdown signal gracefully
×
70
        stopCh := signals.SetupSignalHandler()
×
71

×
72
        inferences := inferenceInformerFactory.Tensorchord().V2alpha1().Inferences()
×
73
        go inferences.Informer().Run(stopCh)
×
74
        if ok := cache.WaitForNamedCacheSync(
×
75
                fmt.Sprintf("%s:inferences", consts.ProviderName),
×
76
                stopCh, inferences.Informer().HasSynced); !ok {
×
77
                s.logger.Errorf("failed to wait for cache to sync")
×
78
        }
×
79

80
        deployments := kubeInformerFactory.Apps().V1().Deployments()
×
81
        go deployments.Informer().Run(stopCh)
×
82
        if ok := cache.WaitForNamedCacheSync(
×
83
                fmt.Sprintf("%s:deployments", consts.ProviderName),
×
84
                stopCh, deployments.Informer().HasSynced); !ok {
×
85
                s.logger.Errorf("failed to wait for cache to sync")
×
86
        }
×
87

88
        pods := kubeInformerFactory.Core().V1().Pods()
×
89
        s.podStartWatch(pods, kubeClient)
×
90
        go pods.Informer().Run(stopCh)
×
91
        if ok := cache.WaitForNamedCacheSync(
×
92
                fmt.Sprintf("%s:pods", consts.ProviderName),
×
93
                stopCh, pods.Informer().HasSynced); !ok {
×
94
                s.logger.Errorf("failed to wait for cache to sync")
×
95
        }
×
96

97
        endpoints := kubeInformerFactory.Core().V1().Endpoints()
×
98
        go endpoints.Informer().Run(stopCh)
×
99
        if ok := cache.WaitForNamedCacheSync(
×
100
                fmt.Sprintf("%s:endpoints", consts.ProviderName),
×
101
                stopCh, endpoints.Informer().HasSynced); !ok {
×
102
                s.logger.Errorf("failed to wait for cache to sync")
×
103
        }
×
104

105
        runtime, err := runtime.New(clientCmdConfig,
×
106
                endpoints, deployments, inferences, pods,
×
107
                kubeClient, ingressClient, kubefledgedClient, inferenceClient,
×
108
                s.eventRecorder,
×
109
                s.config.Ingress.IngressEnabled, s.config.ModelZCloud.EventEnabled,
×
110
                s.config.Build.BuildEnabled, s.config.Ingress.AnyIPToDomain,
×
111
        )
×
112
        if err != nil {
×
113
                return err
×
114
        }
×
115
        s.runtime = runtime
×
116
        if s.config.Server.Dev {
×
117
                logrus.Warn("running in dev mode, using port forwarding to access pods, please do not use dev mode in production")
×
118
                s.endpointResolver = k8s.NewPortForwardingResolver(clientCmdConfig, kubeClient)
×
119
        } else {
×
120
                s.endpointResolver = k8s.NewEndpointResolver(endpoints.Lister())
×
121
        }
×
122
        s.deploymentLogRequester = log.NewK8sAPIRequestor(kubeClient)
×
123
        s.scaler, err = scaling.NewInferenceScaler(runtime, s.config.Inference.CacheTTL)
×
124
        if err != nil {
×
125
                return err
×
126
        }
×
127
        if s.scaler == nil {
×
128
                return fmt.Errorf("scaler is nil")
×
129
        }
×
130
        return nil
×
131
}
132

133
// podStartWatch log event when pod start began and finished
134
func (s *Server) podStartWatch(pods kubeinformersv1.PodInformer, client *kubernetes.Clientset) {
×
135
        pods.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
136
                AddFunc: func(obj interface{}) {
×
137
                        new := obj.(*v1.Pod)
×
138
                        controlPlane, exist := new.Annotations[consts.AnnotationControlPlaneKey]
×
139
                        // for inference created by modelz apiserver
×
140
                        if !exist || controlPlane != consts.ModelzAnnotationValue {
×
141
                                return
×
142
                        }
×
143
                        podWatchEventLog(s.eventRecorder, new, types.PodCreateEvent)
×
144
                        start := time.Now()
×
145

×
146
                        // Ticker will keep watching until pod start or timeout
×
147
                        ticker := time.NewTicker(time.Second * 2)
×
148
                        timeout := time.After(5 * time.Minute)
×
149
                        go func() {
×
150
                                for {
×
151
                                        select {
×
152
                                        case <-timeout:
×
153
                                                podWatchEventLog(s.eventRecorder, new, types.PodTimeoutEvent)
×
154
                                                return
×
155
                                        case <-ticker.C:
×
156
                                                pod, err := client.CoreV1().Pods(new.Namespace).Get(context.TODO(), new.Name, metav1.GetOptions{})
×
157
                                                if err != nil {
×
158
                                                        logrus.WithFields(logrus.Fields{
×
159
                                                                "namespace":  pod.Namespace,
×
160
                                                                "deployment": pod.Labels["app"],
×
161
                                                                "name":       pod.Name,
×
162
                                                        }).Errorf("failed to get pod: %s", err)
×
163
                                                        return
×
164
                                                }
×
165
                                                for _, c := range pod.Status.Conditions {
×
166
                                                        if c.Type == v1.PodReady && c.Status == v1.ConditionTrue {
×
167
                                                                podWatchEventLog(s.eventRecorder, new, types.PodReadyEvent)
×
168
                                                                label := prometheus.Labels{
×
169
                                                                        "inference_name": new.Labels["app"],
×
170
                                                                        "source_image":   new.Annotations[consts.AnnotationDockerImage]}
×
171
                                                                s.metricsOptions.PodStartHistogram.With(label).
×
172
                                                                        Observe(time.Since(start).Seconds())
×
173
                                                                return
×
174
                                                        }
×
175
                                                }
176
                                        }
177
                                }
178
                        }()
179
                },
180
        })
181
}
182

183
// log status for pod watch status transfer
184
func podWatchEventLog(recorder event.Interface, obj *v1.Pod, event string) {
×
185
        deployment := obj.Labels["app"]
×
186
        err := recorder.CreateDeploymentEvent(obj.Namespace, deployment, event, obj.Name)
×
187
        if err != nil {
×
188
                logrus.WithFields(logrus.Fields{
×
189
                        "namespace":  obj.Namespace,
×
190
                        "deployment": deployment,
×
191
                        "name":       obj.Name,
×
192
                        "event":      event,
×
193
                }).Errorf("failed to create deployment event: %s", err)
×
194
        }
×
195
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc