• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tensorchord / openmodelz / 6146306117

11 Sep 2023 12:22PM UTC coverage: 26.069% (-3.9%) from 29.927%
6146306117

Pull #171

github

xieydd
chore: Make proxy endpoint port configurable

Signed-off-by: xieydd <xieydd@gmail.com>
Pull Request #171: feat: Support BYOC

443 of 443 new or added lines in 14 files covered. (100.0%)

939 of 3602 relevant lines covered (26.07%)

1.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/agent/pkg/server/server_run.go
1
package server
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "net/http"
8
        "os"
9
        "os/signal"
10
        "syscall"
11
        "time"
12

13
        "github.com/sirupsen/logrus"
14
        "github.com/tensorchord/openmodelz/agent/api/types"
15
        "k8s.io/apimachinery/pkg/util/wait"
16
)
17

18
func (s *Server) Run() error {
×
19
        srv := &http.Server{
×
20
                Addr:         fmt.Sprintf(":%d", s.config.Server.ServerPort),
×
21
                Handler:      s.router,
×
22
                WriteTimeout: s.config.Server.WriteTimeout,
×
23
                ReadTimeout:  s.config.Server.ReadTimeout,
×
24
        }
×
25

×
26
        go func() {
×
27
                if err := srv.ListenAndServe(); err != nil &&
×
28
                        !errors.Is(err, http.ErrServerClosed) {
×
29
                        logrus.Errorf("listen on port %d error: %v", s.config.Server.ServerPort, err)
×
30
                }
×
31
        }()
32

33
        metricsSrv := &http.Server{
×
34
                Addr:         fmt.Sprintf(":%d", s.config.Metrics.ServerPort),
×
35
                Handler:      s.metricsRouter,
×
36
                ReadTimeout:  s.config.Metrics.PollingInterval,
×
37
                WriteTimeout: s.config.Metrics.PollingInterval,
×
38
        }
×
39
        go func() {
×
40
                if err := metricsSrv.ListenAndServe(); err != nil &&
×
41
                        !errors.Is(err, http.ErrServerClosed) {
×
42
                        logrus.Errorf("listen on port %d error: %v",
×
43
                                s.config.Metrics.ServerPort, err)
×
44
                }
×
45
        }()
46

47
        logrus.WithField("port", s.config.Server.ServerPort).
×
48
                Info("server is running...")
×
49
        logrus.WithField("metrics-port", s.config.Metrics.ServerPort).
×
50
                Info("metrics server is running...")
×
51

×
52
        if s.config.ModelZCloud.Enabled {
×
53
                // check apiserver is ready
×
54
                apiServerReady := make(chan struct{})
×
55
                go func() {
×
56
                        if err := s.modelzCloudClient.WaitForAPIServerReady(); err != nil {
×
57
                                logrus.Fatalf("failed to wait for apiserver ready: %v", err)
×
58
                        }
×
59
                        close(apiServerReady)
×
60
                }()
61
                // websocket
62
                // build websocket
63
                go s.connect(apiServerReady)
×
64

×
65
                // heartbeat with apiserver
×
66
                go wait.UntilWithContext(context.Background(), func(ctx context.Context) {
×
67
                        cluster := types.ManagedCluster{
×
68
                                ID:        s.config.ModelZCloud.ID,
×
69
                                Status:    types.ClusterStatusActive,
×
70
                                UpdatedAt: time.Now().UTC(),
×
71
                                TokenID:   s.config.ModelZCloud.TokenID,
×
72
                                Region:    s.config.ModelZCloud.Region,
×
73
                        }
×
74
                        err := s.runtime.GetClusterInfo(&cluster)
×
75
                        if err != nil {
×
76
                                logrus.Errorf("failed to get managed cluster info: %v", err)
×
77
                        }
×
78

79
                        err = s.modelzCloudClient.UpdateAgentStatus(ctx, apiServerReady, s.config.ModelZCloud.AgentToken, cluster)
×
80
                        if err != nil {
×
81
                                logrus.Errorf("failed to update agent status: %v", err)
×
82
                        }
×
83
                        logrus.Debugf("update agent status: %v", cluster)
×
84
                }, s.config.ModelZCloud.HeartbeatInterval)
85

86
                go wait.UntilWithContext(context.Background(), func(ctx context.Context) {
×
87
                        apikeys, err := s.modelzCloudClient.GetAPIKeys(ctx, apiServerReady, s.config.ModelZCloud.AgentToken, s.config.ModelZCloud.ID)
×
88
                        if err != nil {
×
89
                                logrus.Errorf("failed to get apikeys: %v", err)
×
90
                        }
×
91

92
                        s.config.ModelZCloud.APIKeys = apikeys
×
93
                        logrus.Debugf("update apikeys")
×
94
                }, s.config.ModelZCloud.HeartbeatInterval) // default 1min update, TODO(xieydd) make it configurable
95

96
                go wait.UntilWithContext(context.Background(), func(ctx context.Context) {
×
97
                        namespaces, err := s.modelzCloudClient.GetNamespaces(ctx, apiServerReady, s.config.ModelZCloud.AgentToken, s.config.ModelZCloud.ID)
×
98
                        if err != nil {
×
99
                                logrus.Errorf("failed to get namespaces: %v", err)
×
100
                        }
×
101

102
                        for _, ns := range namespaces.Items {
×
103
                                if ContainString(ns, s.config.ModelZCloud.UserNamespaces) {
×
104
                                        continue
×
105
                                }
106
                                err = s.runtime.NamespaceCreate(ctx, ns)
×
107
                                if err != nil {
×
108
                                        logrus.Errorf("failed to create namespace %s: %v", ns, err)
×
109
                                        continue
×
110
                                }
111
                                s.config.ModelZCloud.UserNamespaces = append(s.config.ModelZCloud.UserNamespaces, ns)
×
112
                                logrus.Debugf("update namespaces")
×
113
                        }
114
                }, s.config.ModelZCloud.HeartbeatInterval) // default 1h update, make it configurable
115
        }
116

117
        quit := make(chan os.Signal, 1)
×
118
        signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
×
119
        <-quit
×
120
        logrus.Info("shutdown server")
×
121

×
122
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
×
123
        defer cancel()
×
124

×
125
        return srv.Shutdown(ctx)
×
126
}
127

128
func ContainString(target string, strs []string) bool {
×
129
        for _, str := range strs {
×
130
                if str == target {
×
131
                        return true
×
132
                }
×
133
        }
134
        return false
×
135
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc