• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tensorchord / openmodelz / 6572666738

19 Oct 2023 09:33AM UTC coverage: 25.985% (-0.3%) from 26.31%
6572666738

Pull #190

github

cutecutecat
feat: log pod start time

Signed-off-by: cutecutecat <junyuchen@tensorchord.ai>
Pull Request #190: WIP feat: log pod start time

70 of 70 new or added lines in 4 files covered. (100.0%)

956 of 3679 relevant lines covered (25.99%)

1.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/agent/pkg/server/server_init_kubernetes.go
1
package server
2

3
import (
4
        "context"
5
        "fmt"
6
        "strings"
7
        "time"
8

9
        "github.com/prometheus/client_golang/prometheus"
10
        "github.com/sirupsen/logrus"
11
        v1 "k8s.io/api/core/v1"
12
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13
        kubeinformers "k8s.io/client-go/informers"
14
        kubeinformersv1 "k8s.io/client-go/informers/core/v1"
15
        "k8s.io/client-go/kubernetes"
16
        "k8s.io/client-go/tools/cache"
17
        "k8s.io/client-go/tools/clientcmd"
18

19
        kubefledged "github.com/senthilrch/kube-fledged/pkg/client/clientset/versioned"
20
        "github.com/tensorchord/openmodelz/agent/api/types"
21
        "github.com/tensorchord/openmodelz/agent/pkg/event"
22
        "github.com/tensorchord/openmodelz/agent/pkg/k8s"
23
        "github.com/tensorchord/openmodelz/agent/pkg/log"
24
        "github.com/tensorchord/openmodelz/agent/pkg/runtime"
25
        "github.com/tensorchord/openmodelz/agent/pkg/scaling"
26
        ingressclient "github.com/tensorchord/openmodelz/ingress-operator/pkg/client/clientset/versioned"
27
        clientset "github.com/tensorchord/openmodelz/modelzetes/pkg/client/clientset/versioned"
28
        informers "github.com/tensorchord/openmodelz/modelzetes/pkg/client/informers/externalversions"
29
        "github.com/tensorchord/openmodelz/modelzetes/pkg/consts"
30
        "github.com/tensorchord/openmodelz/modelzetes/pkg/signals"
31
)
32

33
func (s *Server) initKubernetesResources() error {
×
34
        clientCmdConfig, err := clientcmd.BuildConfigFromFlags(
×
35
                s.config.KubeConfig.MasterURL, s.config.KubeConfig.Kubeconfig)
×
36
        if err != nil {
×
37
                return err
×
38
        }
×
39

40
        clientCmdConfig.QPS = float32(s.config.KubeConfig.QPS)
×
41
        clientCmdConfig.Burst = s.config.KubeConfig.Burst
×
42

×
43
        kubeClient, err := kubernetes.NewForConfig(clientCmdConfig)
×
44
        if err != nil {
×
45
                return err
×
46
        }
×
47
        inferenceClient, err := clientset.NewForConfig(clientCmdConfig)
×
48
        if err != nil {
×
49
                return err
×
50
        }
×
51

52
        var ingressClient ingressclient.Interface
×
53
        if s.config.Ingress.IngressEnabled {
×
54
                ingressClient, err = ingressclient.NewForConfig(clientCmdConfig)
×
55
                if err != nil {
×
56
                        return err
×
57
                }
×
58
        }
59
        kubefledgedClient, err := kubefledged.NewForConfig(clientCmdConfig)
×
60
        if err != nil {
×
61
                return err
×
62
        }
×
63

64
        kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(
×
65
                kubeClient, s.config.KubeConfig.ResyncPeriod)
×
66

×
67
        inferenceInformerFactory := informers.NewSharedInformerFactoryWithOptions(
×
68
                inferenceClient, s.config.KubeConfig.ResyncPeriod)
×
69

×
70
        // set up signals so we handle the first shutdown signal gracefully
×
71
        stopCh := signals.SetupSignalHandler()
×
72

×
73
        inferences := inferenceInformerFactory.Tensorchord().V2alpha1().Inferences()
×
74
        go inferences.Informer().Run(stopCh)
×
75
        if ok := cache.WaitForNamedCacheSync(
×
76
                fmt.Sprintf("%s:inferences", consts.ProviderName),
×
77
                stopCh, inferences.Informer().HasSynced); !ok {
×
78
                s.logger.Errorf("failed to wait for cache to sync")
×
79
        }
×
80

81
        deployments := kubeInformerFactory.Apps().V1().Deployments()
×
82
        go deployments.Informer().Run(stopCh)
×
83
        if ok := cache.WaitForNamedCacheSync(
×
84
                fmt.Sprintf("%s:deployments", consts.ProviderName),
×
85
                stopCh, deployments.Informer().HasSynced); !ok {
×
86
                s.logger.Errorf("failed to wait for cache to sync")
×
87
        }
×
88

89
        pods := kubeInformerFactory.Core().V1().Pods()
×
90
        s.podStartWatch(pods, kubeClient)
×
91
        go pods.Informer().Run(stopCh)
×
92
        if ok := cache.WaitForNamedCacheSync(
×
93
                fmt.Sprintf("%s:pods", consts.ProviderName),
×
94
                stopCh, pods.Informer().HasSynced); !ok {
×
95
                s.logger.Errorf("failed to wait for cache to sync")
×
96
        }
×
97

98
        endpoints := kubeInformerFactory.Core().V1().Endpoints()
×
99
        go endpoints.Informer().Run(stopCh)
×
100
        if ok := cache.WaitForNamedCacheSync(
×
101
                fmt.Sprintf("%s:endpoints", consts.ProviderName),
×
102
                stopCh, endpoints.Informer().HasSynced); !ok {
×
103
                s.logger.Errorf("failed to wait for cache to sync")
×
104
        }
×
105

106
        runtime, err := runtime.New(clientCmdConfig,
×
107
                endpoints, deployments, inferences, pods,
×
108
                kubeClient, ingressClient, kubefledgedClient, inferenceClient,
×
109
                s.eventRecorder,
×
110
                s.config.Ingress.IngressEnabled, s.config.ModelZCloud.EventEnabled,
×
111
                s.config.Build.BuildEnabled, s.config.Ingress.AnyIPToDomain,
×
112
        )
×
113
        if err != nil {
×
114
                return err
×
115
        }
×
116
        s.runtime = runtime
×
117
        if s.config.Server.Dev {
×
118
                logrus.Warn("running in dev mode, using port forwarding to access pods, please do not use dev mode in production")
×
119
                s.endpointResolver = k8s.NewPortForwardingResolver(clientCmdConfig, kubeClient)
×
120
        } else {
×
121
                s.endpointResolver = k8s.NewEndpointResolver(endpoints.Lister())
×
122
        }
×
123
        s.deploymentLogRequester = log.NewK8sAPIRequestor(kubeClient)
×
124
        s.scaler, err = scaling.NewInferenceScaler(runtime, s.config.Inference.CacheTTL)
×
125
        if err != nil {
×
126
                return err
×
127
        }
×
128
        if s.scaler == nil {
×
129
                return fmt.Errorf("scaler is nil")
×
130
        }
×
131
        return nil
×
132
}
133

134
// podStartWatch log event when pod start began and finished
135
func (s *Server) podStartWatch(pods kubeinformersv1.PodInformer, client *kubernetes.Clientset) {
×
136
        pods.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
×
137
                AddFunc: func(obj interface{}) {
×
138
                        new := obj.(*v1.Pod)
×
139
                        if !strings.HasPrefix(new.Namespace, "modelz-") {
×
140
                                return
×
141
                        }
×
142
                        podWatchEventLog(s.eventRecorder, new, types.DeploymentStartBeginEvent)
×
143
                        start := time.Now()
×
144

×
145
                        // Ticker will keep watching until pod start or timeout
×
146
                        ticker := time.NewTicker(time.Second * 2)
×
147
                        timeout := time.After(5 * time.Minute)
×
148
                        go func() {
×
149
                                for {
×
150
                                        select {
×
151
                                        case <-timeout:
×
152
                                                podWatchEventLog(s.eventRecorder, new, types.DeploymentStartTimeoutEvent)
×
153
                                                return
×
154
                                        case <-ticker.C:
×
155
                                                pod, err := client.CoreV1().Pods(new.Namespace).Get(context.TODO(), new.Name, metav1.GetOptions{})
×
156
                                                if err != nil {
×
157
                                                        logrus.WithFields(logrus.Fields{
×
158
                                                                "namespace":  pod.Namespace,
×
159
                                                                "deployment": pod.Labels["app"],
×
160
                                                                "name":       pod.Name,
×
161
                                                        }).Errorf("failed to get pod: %s", err)
×
162
                                                }
×
163
                                                for _, c := range pod.Status.Conditions {
×
164
                                                        if c.Type == v1.PodReady && c.Status == v1.ConditionTrue {
×
165
                                                                podWatchEventLog(s.eventRecorder, new, types.DeploymentStartFinishEvent)
×
166
                                                                label := prometheus.Labels{
×
167
                                                                        "inference_name": new.Labels["app"],
×
168
                                                                        "source_image":   new.Annotations[consts.AnnotationDockerImage]}
×
169
                                                                s.metricsOptions.PodStartHistogram.With(label).
×
170
                                                                        Observe(time.Since(start).Seconds())
×
171
                                                                return
×
172
                                                        }
×
173
                                                }
174
                                                break
×
175
                                        }
176
                                }
177
                        }()
178
                },
179
        })
180
}
181

182
// log status for pod watch status transfer
183
func podWatchEventLog(recorder event.Interface, obj *v1.Pod, event string) {
×
184
        deployment := obj.Labels["app"]
×
185
        err := recorder.CreateDeploymentEvent(obj.Namespace, deployment, event, obj.Name)
×
186
        if err != nil {
×
187
                logrus.WithFields(logrus.Fields{
×
188
                        "namespace":  obj.Namespace,
×
189
                        "deployment": deployment,
×
190
                        "name":       obj.Name,
×
191
                        "event":      event,
×
192
                }).Errorf("failed to create deployment event: %s", err)
×
193
        }
×
194
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc