• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zalando / postgres-operator / 17069871161

19 Aug 2025 12:40PM UTC coverage: 42.059% (-3.4%) from 45.498%
17069871161

push

github

web-flow
upgrade Go from 1.23.4 to 1.25.0 (#2945)

* upgrade go to 1.25
* add minor version to be Go 1.25.0
* revert the Go version on README to keep the history of the release

5 of 9 new or added lines in 7 files covered. (55.56%)

531 existing lines in 13 files now uncovered.

6493 of 15438 relevant lines covered (42.06%)

15.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/apiserver/apiserver.go
1
package apiserver
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "fmt"
7
        "net/http"
8
        "net/http/pprof"
9
        "regexp"
10
        "strconv"
11
        "sync"
12
        "time"
13

14
        "github.com/sirupsen/logrus"
15
        "github.com/zalando/postgres-operator/pkg/cluster"
16
        "github.com/zalando/postgres-operator/pkg/spec"
17
        "github.com/zalando/postgres-operator/pkg/util"
18
        "github.com/zalando/postgres-operator/pkg/util/config"
19
)
20

21
const (
22
        httpAPITimeout  = time.Minute * 1
23
        shutdownTimeout = time.Second * 10
24
        httpReadTimeout = time.Millisecond * 100
25
)
26

27
// ControllerInformer describes stats methods of a controller
28
type controllerInformer interface {
29
        GetConfig() *spec.ControllerConfig
30
        GetOperatorConfig() *config.Config
31
        GetStatus() *spec.ControllerStatus
32
        TeamClusterList() map[string][]spec.NamespacedName
33
        ClusterStatus(namespace, cluster string) (*cluster.ClusterStatus, error)
34
        ClusterLogs(namespace, cluster string) ([]*spec.LogEntry, error)
35
        ClusterHistory(namespace, cluster string) ([]*spec.Diff, error)
36
        ClusterDatabasesMap() map[string][]string
37
        WorkerLogs(workerID uint32) ([]*spec.LogEntry, error)
38
        ListQueue(workerID uint32) (*spec.QueueDump, error)
39
        GetWorkersCnt() uint32
40
        WorkerStatus(workerID uint32) (*cluster.WorkerStatus, error)
41
}
42

43
// Server describes HTTP API server
44
type Server struct {
45
        logger     *logrus.Entry
46
        http       http.Server
47
        controller controllerInformer
48
}
49

50
const (
51
        teamRe      = `(?P<team>[a-zA-Z][a-zA-Z0-9\-_]*)`
52
        namespaceRe = `(?P<namespace>[a-z0-9]([-a-z0-9\-_]*[a-z0-9])?)`
53
        clusterRe   = `(?P<cluster>[a-zA-Z][a-zA-Z0-9\-_]*)`
54
)
55

56
var (
57
        clusterStatusRe  = fmt.Sprintf(`^/clusters/%s/%s/?$`, namespaceRe, clusterRe)
58
        clusterLogsRe    = fmt.Sprintf(`^/clusters/%s/%s/logs/?$`, namespaceRe, clusterRe)
59
        clusterHistoryRe = fmt.Sprintf(`^/clusters/%s/%s/history/?$`, namespaceRe, clusterRe)
60
        teamURLRe        = fmt.Sprintf(`^/clusters/%s/?$`, teamRe)
61

62
        clusterStatusURL     = regexp.MustCompile(clusterStatusRe)
63
        clusterLogsURL       = regexp.MustCompile(clusterLogsRe)
64
        clusterHistoryURL    = regexp.MustCompile(clusterHistoryRe)
65
        teamURL              = regexp.MustCompile(teamURLRe)
66
        workerLogsURL        = regexp.MustCompile(`^/workers/(?P<id>\d+)/logs/?$`)
67
        workerEventsQueueURL = regexp.MustCompile(`^/workers/(?P<id>\d+)/queue/?$`)
68
        workerStatusURL      = regexp.MustCompile(`^/workers/(?P<id>\d+)/status/?$`)
69
        workerAllQueue       = regexp.MustCompile(`^/workers/all/queue/?$`)
70
        workerAllStatus      = regexp.MustCompile(`^/workers/all/status/?$`)
71
        clustersURL          = "/clusters/"
72
)
73

74
// New creates new HTTP API server
75
func New(controller controllerInformer, port int, logger *logrus.Logger) *Server {
×
76
        s := &Server{
×
77
                logger:     logger.WithField("pkg", "apiserver"),
×
78
                controller: controller,
×
79
        }
×
80
        mux := http.NewServeMux()
×
81

×
82
        mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
×
83
        mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
×
84
        mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
×
85
        mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
×
86
        mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
×
87

×
88
        mux.Handle("/status/", http.HandlerFunc(s.controllerStatus))
×
89
        mux.Handle("/readyz/", http.HandlerFunc(s.controllerReady))
×
90
        mux.Handle("/config/", http.HandlerFunc(s.operatorConfig))
×
91

×
92
        mux.HandleFunc("/clusters/", s.clusters)
×
93
        mux.HandleFunc("/workers/", s.workers)
×
94
        mux.HandleFunc("/databases/", s.databases)
×
95

×
96
        s.http = http.Server{
×
97
                Addr:        fmt.Sprintf(":%d", port),
×
98
                Handler:     http.TimeoutHandler(mux, httpAPITimeout, ""),
×
99
                ReadTimeout: httpReadTimeout,
×
100
        }
×
101

×
102
        return s
×
103
}
×
104

105
// Run starts the HTTP server
106
func (s *Server) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
×
107

×
108
        var err error
×
109

×
110
        defer wg.Done()
×
111

×
112
        go func() {
×
113
                if err2 := s.http.ListenAndServe(); err2 != http.ErrServerClosed {
×
114
                        s.logger.Fatalf("Could not start http server: %v", err2)
×
115
                }
×
116
        }()
117
        s.logger.Infof("listening on %s", s.http.Addr)
×
118

×
119
        <-stopCh
×
120

×
121
        ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
×
122
        defer cancel()
×
123
        if err = s.http.Shutdown(ctx); err == nil {
×
124
                s.logger.Infoln("Http server shut down")
×
125
                return
×
126
        }
×
127
        if err == context.DeadlineExceeded {
×
128
                s.logger.Warningf("Shutdown timeout exceeded. closing http server")
×
129
                if err = s.http.Close(); err != nil {
×
130
                        s.logger.Errorf("could not close http connection: %v", err)
×
131
                }
×
132
                return
×
133
        }
134
        s.logger.Errorf("Could not shutdown http server: %v", err)
×
135
}
136

137
func (s *Server) respond(obj interface{}, err error, w http.ResponseWriter) {
×
138
        w.Header().Set("Content-Type", "application/json")
×
139
        if err != nil {
×
140
                w.WriteHeader(http.StatusInternalServerError)
×
141
                if err2 := json.NewEncoder(w).Encode(map[string]interface{}{"error": err.Error()}); err2 != nil {
×
142
                        s.logger.Errorf("could not encode error response %q: %v", err, err2)
×
143
                }
×
144
                return
×
145
        }
146

147
        err = json.NewEncoder(w).Encode(obj)
×
148
        if err != nil {
×
149
                w.WriteHeader(http.StatusInternalServerError)
×
150
                s.logger.Errorf("Could not encode: %v", err)
×
151
        }
×
152
}
153

154
func (s *Server) controllerStatus(w http.ResponseWriter, req *http.Request) {
×
155
        s.respond(s.controller.GetStatus(), nil, w)
×
156
}
×
157

158
func (s *Server) controllerReady(w http.ResponseWriter, req *http.Request) {
×
159
        s.respond("OK", nil, w)
×
160
}
×
161

162
func (s *Server) operatorConfig(w http.ResponseWriter, req *http.Request) {
×
163
        s.respond(map[string]interface{}{
×
164
                "controller": s.controller.GetConfig(),
×
165
                "operator":   s.controller.GetOperatorConfig(),
×
166
        }, nil, w)
×
167
}
×
168

169
func (s *Server) clusters(w http.ResponseWriter, req *http.Request) {
×
170
        var (
×
171
                resp interface{}
×
172
                err  error
×
173
        )
×
174

×
175
        if matches := util.FindNamedStringSubmatch(clusterStatusURL, req.URL.Path); matches != nil {
×
176
                namespace := matches["namespace"]
×
177
                resp, err = s.controller.ClusterStatus(namespace, matches["cluster"])
×
178
        } else if matches := util.FindNamedStringSubmatch(teamURL, req.URL.Path); matches != nil {
×
179
                teamClusters := s.controller.TeamClusterList()
×
180
                clusters, found := teamClusters[matches["team"]]
×
181
                if !found {
×
182
                        s.respond(nil, fmt.Errorf("could not find clusters for the team"), w)
×
183
                        return
×
184
                }
×
185

186
                clusterNames := make([]string, 0)
×
187
                for _, cluster := range clusters {
×
188
                        clusterNames = append(clusterNames, cluster.Name)
×
189
                }
×
190

191
                resp, err = clusterNames, nil
×
192
        } else if matches := util.FindNamedStringSubmatch(clusterLogsURL, req.URL.Path); matches != nil {
×
193
                namespace := matches["namespace"]
×
194
                resp, err = s.controller.ClusterLogs(namespace, matches["cluster"])
×
195
        } else if matches := util.FindNamedStringSubmatch(clusterHistoryURL, req.URL.Path); matches != nil {
×
196
                namespace := matches["namespace"]
×
197
                resp, err = s.controller.ClusterHistory(namespace, matches["cluster"])
×
198
        } else if req.URL.Path == clustersURL {
×
199
                clusterNamesPerTeam := make(map[string][]string)
×
200
                for team, clusters := range s.controller.TeamClusterList() {
×
201
                        for _, cluster := range clusters {
×
202
                                clusterNamesPerTeam[team] = append(clusterNamesPerTeam[team], cluster.Name)
×
203
                        }
×
204
                }
205
                resp, err = clusterNamesPerTeam, nil
×
206
        } else {
×
207
                resp, err = nil, fmt.Errorf("page not found")
×
208
        }
×
209

210
        s.respond(resp, err, w)
×
211
}
212

213
func mustConvertToUint32(s string) uint32 {
×
214
        result, err := strconv.Atoi(s)
×
215
        if err != nil {
×
216
                panic(fmt.Errorf("mustConvertToUint32 called for %s: %v", s, err))
×
217
        }
218
        return uint32(result)
×
219
}
220

221
func (s *Server) workers(w http.ResponseWriter, req *http.Request) {
×
222
        var (
×
223
                resp interface{}
×
224
                err  error
×
225
        )
×
226

×
227
        if workerAllQueue.MatchString(req.URL.Path) {
×
228
                s.allQueues(w, req)
×
229
                return
×
230
        }
×
231
        if workerAllStatus.MatchString(req.URL.Path) {
×
232
                s.allWorkers(w, req)
×
233
                return
×
234
        }
×
235

236
        err = fmt.Errorf("page not found")
×
237

×
238
        if matches := util.FindNamedStringSubmatch(workerLogsURL, req.URL.Path); matches != nil {
×
239
                workerID := mustConvertToUint32(matches["id"])
×
240
                resp, err = s.controller.WorkerLogs(workerID)
×
241

×
242
        } else if matches := util.FindNamedStringSubmatch(workerEventsQueueURL, req.URL.Path); matches != nil {
×
243
                workerID := mustConvertToUint32(matches["id"])
×
244
                resp, err = s.controller.ListQueue(workerID)
×
245

×
246
        } else if matches := util.FindNamedStringSubmatch(workerStatusURL, req.URL.Path); matches != nil {
×
247
                var workerStatus *cluster.WorkerStatus
×
248

×
249
                workerID := mustConvertToUint32(matches["id"])
×
250
                resp = "idle"
×
251
                if workerStatus, err = s.controller.WorkerStatus(workerID); workerStatus != nil {
×
252
                        resp = workerStatus
×
253
                }
×
254
        }
255

256
        s.respond(resp, err, w)
×
257
}
258

259
func (s *Server) databases(w http.ResponseWriter, req *http.Request) {
×
260

×
261
        databaseNamesPerCluster := s.controller.ClusterDatabasesMap()
×
262
        s.respond(databaseNamesPerCluster, nil, w)
×
263
}
×
264

265
func (s *Server) allQueues(w http.ResponseWriter, r *http.Request) {
×
266
        workersCnt := s.controller.GetWorkersCnt()
×
267
        resp := make(map[uint32]*spec.QueueDump, workersCnt)
×
268
        for i := uint32(0); i < workersCnt; i++ {
×
269
                queueDump, err := s.controller.ListQueue(i)
×
270
                if err != nil {
×
271
                        s.respond(nil, err, w)
×
272
                        return
×
273
                }
×
274

275
                resp[i] = queueDump
×
276
        }
277

278
        s.respond(resp, nil, w)
×
279
}
280

281
func (s *Server) allWorkers(w http.ResponseWriter, r *http.Request) {
×
282
        workersCnt := s.controller.GetWorkersCnt()
×
283
        resp := make(map[uint32]interface{}, workersCnt)
×
284
        for i := uint32(0); i < workersCnt; i++ {
×
285
                status, err := s.controller.WorkerStatus(i)
×
286
                if err != nil {
×
287
                        s.respond(nil, err, w)
×
288
                        continue
×
289
                }
290

291
                if status == nil {
×
292
                        resp[i] = "idle"
×
293
                } else {
×
294
                        resp[i] = status
×
295
                }
×
296
        }
297

298
        s.respond(resp, nil, w)
×
299
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc