• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

foomo / keel / 7129509419

07 Dec 2023 02:21PM UTC coverage: 30.646% (-0.08%) from 30.724%
7129509419

push

github

web-flow
Merge pull request #196 from foomo/fix-pprof

fix: default pprof arguments

787 of 2568 relevant lines covered (30.65%)

1.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.56
/server.go
1
package keel
2

3
import (
4
        "context"
5
        "fmt"
6
        "net/http"
7
        "os"
8
        "os/signal"
9
        "reflect"
10
        "slices"
11
        "sync"
12
        "sync/atomic"
13
        "syscall"
14
        "time"
15

16
        "github.com/foomo/keel/healthz"
17
        "github.com/foomo/keel/interfaces"
18
        "github.com/foomo/keel/markdown"
19
        "github.com/foomo/keel/metrics"
20
        "github.com/foomo/keel/service"
21
        "github.com/go-logr/logr"
22
        "github.com/pkg/errors"
23
        "github.com/spf13/viper"
24
        otelhost "go.opentelemetry.io/contrib/instrumentation/host"
25
        otelruntime "go.opentelemetry.io/contrib/instrumentation/runtime"
26
        "go.opentelemetry.io/otel"
27
        "go.opentelemetry.io/otel/metric"
28
        "go.opentelemetry.io/otel/propagation"
29
        "go.opentelemetry.io/otel/trace"
30
        "go.uber.org/zap"
31
        "golang.org/x/sync/errgroup"
32

33
        "github.com/foomo/keel/config"
34
        "github.com/foomo/keel/env"
35
        "github.com/foomo/keel/log"
36
        "github.com/foomo/keel/telemetry"
37
)
38

39
// Server struct
40
type Server struct {
41
        services         []Service
42
        initServices     []Service
43
        meter            metric.Meter
44
        meterProvider    metric.MeterProvider
45
        tracer           trace.Tracer
46
        traceProvider    trace.TracerProvider
47
        shutdownSignals  []os.Signal
48
        shutdownTimeout  time.Duration
49
        running          atomic.Bool
50
        syncClosers      []interface{}
51
        syncClosersLock  sync.RWMutex
52
        syncReadmers     []interfaces.Readmer
53
        syncReadmersLock sync.RWMutex
54
        syncProbes       map[healthz.Type][]interface{}
55
        syncProbesLock   sync.RWMutex
56
        ctx              context.Context
57
        ctxCancel        context.Context
58
        ctxCancelFn      context.CancelFunc
59
        g                *errgroup.Group
60
        gCtx             context.Context
61
        l                *zap.Logger
62
        c                *viper.Viper
63
}
64

65
func NewServer(opts ...Option) *Server {
3✔
66
        inst := &Server{
3✔
67
                shutdownTimeout: 30 * time.Second,
3✔
68
                shutdownSignals: []os.Signal{os.Interrupt, syscall.SIGTERM},
3✔
69
                syncReadmers:    []interfaces.Readmer{},
3✔
70
                syncProbes:      map[healthz.Type][]interface{}{},
3✔
71
                ctx:             context.Background(),
3✔
72
                c:               config.Config(),
3✔
73
                l:               log.Logger(),
3✔
74
        }
3✔
75

3✔
76
        for _, opt := range opts {
9✔
77
                opt(inst)
6✔
78
        }
6✔
79

80
        { // setup error group
3✔
81
                inst.ctxCancel, inst.ctxCancelFn = signal.NotifyContext(inst.ctx, inst.shutdownSignals...)
3✔
82
                inst.g, inst.gCtx = errgroup.WithContext(inst.ctxCancel)
3✔
83

3✔
84
                // gracefully shutdown
3✔
85
                inst.g.Go(func() error {
6✔
86
                        <-inst.gCtx.Done()
3✔
87
                        inst.l.Debug("keel graceful shutdown")
3✔
88
                        defer inst.ctxCancelFn()
3✔
89

3✔
90
                        timeoutCtx, timeoutCancel := context.WithTimeout(inst.ctx, inst.shutdownTimeout)
3✔
91
                        defer timeoutCancel()
3✔
92

3✔
93
                        // append internal closers
3✔
94
                        closers := append(inst.closers(), inst.traceProvider, inst.meterProvider)
3✔
95

3✔
96
                        for _, closer := range closers {
13✔
97
                                l := inst.l.With(log.FName(fmt.Sprintf("%T", closer)))
10✔
98
                                switch c := closer.(type) {
10✔
99
                                case interfaces.Closer:
×
100
                                        c.Close()
×
101
                                case interfaces.ErrorCloser:
×
102
                                        if err := c.Close(); err != nil {
×
103
                                                log.WithError(l, err).Error("failed to gracefully stop ErrorCloser")
×
104
                                        }
×
105
                                case interfaces.CloserWithContext:
×
106
                                        c.Close(timeoutCtx)
×
107
                                case interfaces.ErrorCloserWithContext:
4✔
108
                                        if err := c.Close(timeoutCtx); err != nil {
4✔
109
                                                log.WithError(l, err).Error("failed to gracefully stop ErrorCloserWithContext")
×
110
                                        }
×
111
                                case interfaces.Shutdowner:
×
112
                                        c.Shutdown()
×
113
                                case interfaces.ErrorShutdowner:
×
114
                                        if err := c.Shutdown(); err != nil {
×
115
                                                log.WithError(l, err).Error("failed to gracefully stop ErrorShutdowner")
×
116
                                        }
×
117
                                case interfaces.ShutdownerWithContext:
×
118
                                        c.Shutdown(timeoutCtx)
×
119
                                case interfaces.ErrorShutdownerWithContext:
×
120
                                        if err := c.Shutdown(timeoutCtx); err != nil {
×
121
                                                log.WithError(l, err).Error("failed to gracefully stop ErrorShutdownerWithContext")
×
122
                                        }
×
123
                                case interfaces.Stopper:
×
124
                                        c.Stop()
×
125
                                case interfaces.ErrorStopper:
×
126
                                        if err := c.Stop(); err != nil {
×
127
                                                log.WithError(l, err).Error("failed to gracefully stop ErrorStopper")
×
128
                                        }
×
129
                                case interfaces.StopperWithContext:
×
130
                                        c.Stop(timeoutCtx)
×
131
                                case interfaces.ErrorStopperWithContext:
×
132
                                        if err := c.Stop(timeoutCtx); err != nil {
×
133
                                                log.WithError(l, err).Error("failed to gracefully stop ErrorStopperWithContext")
×
134
                                        }
×
135
                                case interfaces.Unsubscriber:
×
136
                                        c.Unsubscribe()
×
137
                                case interfaces.ErrorUnsubscriber:
×
138
                                        if err := c.Unsubscribe(); err != nil {
×
139
                                                log.WithError(l, err).Error("failed to gracefully stop ErrorUnsubscriber")
×
140
                                        }
×
141
                                case interfaces.UnsubscriberWithContext:
×
142
                                        c.Unsubscribe(timeoutCtx)
×
143
                                case interfaces.ErrorUnsubscriberWithContext:
×
144
                                        if err := c.Unsubscribe(timeoutCtx); err != nil {
×
145
                                                log.WithError(l, err).Error("failed to gracefully stop ErrorUnsubscriberWithContext")
×
146
                                        }
×
147
                                }
148
                        }
149
                        return inst.gCtx.Err()
3✔
150
                })
151
        }
152

153
        { // setup telemetry
3✔
154
                var err error
3✔
155
                otel.SetLogger(logr.New(telemetry.NewLogger(inst.l)))
3✔
156
                otel.SetErrorHandler(telemetry.NewErrorHandler(inst.l))
3✔
157
                otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
3✔
158

3✔
159
                if inst.meterProvider == nil {
6✔
160
                        inst.meterProvider, err = telemetry.NewNoopMeterProvider()
3✔
161
                        log.Must(inst.l, err, "failed to create meter provider")
3✔
162
                } else if env.GetBool("OTEL_ENABLED", false) {
3✔
163
                        if env.GetBool("OTEL_METRICS_HOST_ENABLED", false) {
×
164
                                log.Must(inst.l, otelhost.Start(), "failed to start otel host metrics")
×
165
                        }
×
166
                        if env.GetBool("OTEL_METRICS_RUNTIME_ENABLED", false) {
×
167
                                log.Must(inst.l, otelruntime.Start(), "failed to start otel runtime metrics")
×
168
                        }
×
169
                }
170
                inst.meter = telemetry.Meter()
3✔
171

3✔
172
                if inst.traceProvider == nil {
6✔
173
                        inst.traceProvider, err = telemetry.NewNoopTraceProvider()
3✔
174
                        log.Must(inst.l, err, "failed to create tracer provider")
3✔
175
                }
3✔
176
                inst.tracer = telemetry.Tracer()
3✔
177
        }
178

179
        // add probe
180
        inst.AddAlwaysHealthzers(inst)
3✔
181
        inst.AddReadmers(
3✔
182
                interfaces.ReadmeFunc(env.Readme),
3✔
183
                interfaces.ReadmeFunc(config.Readme),
3✔
184
                inst,
3✔
185
                interfaces.ReadmeFunc(metrics.Readme),
3✔
186
        )
3✔
187

3✔
188
        // start init services
3✔
189
        inst.startService(inst.initServices...)
3✔
190

3✔
191
        return inst
3✔
192
}
193

194
// Logger returns server logger
195
func (s *Server) Logger() *zap.Logger {
3✔
196
        return s.l
3✔
197
}
3✔
198

199
// Meter returns the implementation meter
200
func (s *Server) Meter() metric.Meter {
×
201
        return s.meter
×
202
}
×
203

204
// Tracer returns the implementation tracer
205
func (s *Server) Tracer() trace.Tracer {
×
206
        return s.tracer
×
207
}
×
208

209
// Config returns server config
210
func (s *Server) Config() *viper.Viper {
×
211
        return s.c
×
212
}
×
213

214
// Context returns server context
215
func (s *Server) Context() context.Context {
×
216
        return s.ctx
×
217
}
×
218

219
// CancelContext returns server's cancel context
220
func (s *Server) CancelContext() context.Context {
×
221
        return s.ctxCancel
×
222
}
×
223

224
// AddService add a single service
225
func (s *Server) AddService(service Service) {
4✔
226
        if !slices.Contains(s.services, service) {
8✔
227
                s.services = append(s.services, service)
4✔
228
                s.AddAlwaysHealthzers(service)
4✔
229
                s.AddCloser(service)
4✔
230
        }
4✔
231
}
232

233
// AddServices adds multiple service
234
func (s *Server) AddServices(services ...Service) {
3✔
235
        for _, value := range services {
7✔
236
                s.AddService(value)
4✔
237
        }
4✔
238
}
239

240
// AddCloser adds a closer to be called on shutdown
241
func (s *Server) AddCloser(closer interface{}) {
4✔
242
        for _, value := range s.closers() {
5✔
243
                if value == closer {
1✔
244
                        return
×
245
                }
×
246
        }
247
        if IsCloser(closer) {
8✔
248
                s.addClosers(closer)
4✔
249
        } else {
4✔
250
                s.l.Warn("unable to add closer", log.FValue(fmt.Sprintf("%T", closer)))
×
251
        }
×
252
}
253

254
// AddClosers adds the given closers to be called on shutdown
255
func (s *Server) AddClosers(closers ...interface{}) {
×
256
        for _, closer := range closers {
×
257
                s.AddCloser(closer)
×
258
        }
×
259
}
260

261
// AddReadmer adds a readmer to be added to the exposed readme
262
func (s *Server) AddReadmer(readmer interfaces.Readmer) {
12✔
263
        s.addReadmers(readmer)
12✔
264
}
12✔
265

266
// AddReadmers adds readmers to be added to the exposed readme
267
func (s *Server) AddReadmers(readmers ...interfaces.Readmer) {
3✔
268
        for _, readmer := range readmers {
15✔
269
                s.AddReadmer(readmer)
12✔
270
        }
12✔
271
}
272

273
// AddHealthzer adds a probe to be called on healthz checks
274
func (s *Server) AddHealthzer(typ healthz.Type, probe interface{}) {
7✔
275
        if IsHealthz(probe) {
14✔
276
                s.addProbes(typ, probe)
7✔
277
        } else {
7✔
278
                s.l.Debug("not a healthz probe", log.FValue(fmt.Sprintf("%T", probe)))
×
279
        }
×
280
}
281

282
// AddHealthzers adds the given probes to be called on healthz checks
283
func (s *Server) AddHealthzers(typ healthz.Type, probes ...interface{}) {
7✔
284
        for _, probe := range probes {
14✔
285
                s.AddHealthzer(typ, probe)
7✔
286
        }
7✔
287
}
288

289
// AddAlwaysHealthzers adds the probes to be called on any healthz checks
290
func (s *Server) AddAlwaysHealthzers(probes ...interface{}) {
7✔
291
        s.AddHealthzers(healthz.TypeAlways, probes...)
7✔
292
}
7✔
293

294
// AddStartupHealthzers adds the startup probes to be called on healthz checks
295
func (s *Server) AddStartupHealthzers(probes ...interface{}) {
×
296
        s.AddHealthzers(healthz.TypeStartup, probes...)
×
297
}
×
298

299
// AddLivenessHealthzers adds the liveness probes to be called on healthz checks
300
func (s *Server) AddLivenessHealthzers(probes ...interface{}) {
×
301
        s.AddHealthzers(healthz.TypeLiveness, probes...)
×
302
}
×
303

304
// AddReadinessHealthzers adds the readiness probes to be called on healthz checks
305
func (s *Server) AddReadinessHealthzers(probes ...interface{}) {
×
306
        s.AddHealthzers(healthz.TypeReadiness, probes...)
×
307
}
×
308

309
// IsCanceled returns true if the internal errgroup has been canceled
310
func (s *Server) IsCanceled() bool {
3✔
311
        return errors.Is(s.gCtx.Err(), context.Canceled)
3✔
312
}
3✔
313

314
// Healthz returns true if the server is running
315
func (s *Server) Healthz() error {
×
316
        if !s.running.Load() {
×
317
                return ErrServerNotRunning
×
318
        }
×
319
        return nil
×
320
}
321

322
// Run runs the server
323
func (s *Server) Run() {
3✔
324
        if s.IsCanceled() {
3✔
325
                s.l.Info("keel server canceled")
×
326
                return
×
327
        }
×
328

329
        defer s.ctxCancelFn()
3✔
330
        s.l.Info("starting keel server")
3✔
331

3✔
332
        // start services
3✔
333
        s.startService(s.services...)
3✔
334

3✔
335
        // add init services to closers
3✔
336
        for _, initService := range s.initServices {
3✔
337
                s.AddClosers(initService)
×
338
        }
×
339

340
        // set running
341
        defer func() {
6✔
342
                s.running.Store(false)
3✔
343
        }()
3✔
344
        s.running.Store(true)
3✔
345

3✔
346
        // wait for shutdown
3✔
347
        if err := s.g.Wait(); err != nil && !errors.Is(err, context.Canceled) {
3✔
348
                log.WithError(s.l, err).Error("service error")
×
349
        }
×
350

351
        s.l.Info("keel server stopped")
3✔
352
}
353

354
func (s *Server) closers() []interface{} {
7✔
355
        s.syncClosersLock.RLock()
7✔
356
        defer s.syncClosersLock.RUnlock()
7✔
357
        return s.syncClosers
7✔
358
}
7✔
359

360
func (s *Server) addClosers(v ...interface{}) {
4✔
361
        s.syncClosersLock.Lock()
4✔
362
        defer s.syncClosersLock.Unlock()
4✔
363
        s.syncClosers = append(s.syncClosers, v...)
4✔
364
}
4✔
365

366
func (s *Server) readmers() []interfaces.Readmer {
×
367
        s.syncReadmersLock.RLock()
×
368
        defer s.syncReadmersLock.RUnlock()
×
369
        return s.syncReadmers
×
370
}
×
371

372
func (s *Server) addReadmers(v ...interfaces.Readmer) {
12✔
373
        s.syncReadmersLock.Lock()
12✔
374
        defer s.syncReadmersLock.Unlock()
12✔
375
        s.syncReadmers = append(s.syncReadmers, v...)
12✔
376
}
12✔
377

378
func (s *Server) probes() map[healthz.Type][]interface{} {
×
379
        s.syncProbesLock.RLock()
×
380
        defer s.syncProbesLock.RUnlock()
×
381
        return s.syncProbes
×
382
}
×
383

384
func (s *Server) addProbes(typ healthz.Type, v ...interface{}) {
7✔
385
        s.syncProbesLock.Lock()
7✔
386
        defer s.syncProbesLock.Unlock()
7✔
387
        s.syncProbes[typ] = append(s.syncProbes[typ], v...)
7✔
388
}
7✔
389

390
// Readme returns the self-documenting string
391
func (s *Server) Readme() string {
×
392
        md := &markdown.Markdown{}
×
393

×
394
        md.Println(s.readmeServices())
×
395
        md.Println(s.readmeHealthz())
×
396
        md.Print(s.readmeCloser())
×
397

×
398
        return md.String()
×
399
}
×
400

401
// ------------------------------------------------------------------------------------------------
402
// ~ Private methods
403
// ------------------------------------------------------------------------------------------------
404

405
// startService starts the given services
406
func (s *Server) startService(services ...Service) {
6✔
407
        for _, value := range services {
10✔
408
                value := value
4✔
409
                s.g.Go(func() error {
8✔
410
                        if err := value.Start(s.ctx); errors.Is(err, http.ErrServerClosed) {
4✔
411
                                log.WithError(s.l, err).Debug("server has closed")
×
412
                        } else if err != nil {
4✔
413
                                log.WithError(s.l, err).Error("failed to start service")
×
414
                                return err
×
415
                        }
×
416
                        return nil
4✔
417
                })
418
        }
419
}
420

421
func (s *Server) readmeCloser() string {
×
422
        md := &markdown.Markdown{}
×
423
        closers := s.closers()
×
424
        rows := make([][]string, 0, len(closers))
×
425
        for _, value := range closers {
×
426
                t := reflect.TypeOf(value)
×
427
                var closer string
×
428
                switch value.(type) {
×
429
                case interfaces.Closer:
×
430
                        closer = "Closer"
×
431
                case interfaces.ErrorCloser:
×
432
                        closer = "ErrorCloser"
×
433
                case interfaces.CloserWithContext:
×
434
                        closer = "CloserWithContext"
×
435
                case interfaces.ErrorCloserWithContext:
×
436
                        closer = "ErrorCloserWithContext"
×
437
                case interfaces.Shutdowner:
×
438
                        closer = "Shutdowner"
×
439
                case interfaces.ErrorShutdowner:
×
440
                        closer = "ErrorShutdowner"
×
441
                case interfaces.ShutdownerWithContext:
×
442
                        closer = "ShutdownerWithContext"
×
443
                case interfaces.ErrorShutdownerWithContext:
×
444
                        closer = "ErrorShutdownerWithContext"
×
445
                case interfaces.Stopper:
×
446
                        closer = "Stopper"
×
447
                case interfaces.ErrorStopper:
×
448
                        closer = "ErrorStopper"
×
449
                case interfaces.StopperWithContext:
×
450
                        closer = "StopperWithContext"
×
451
                case interfaces.ErrorStopperWithContext:
×
452
                        closer = "ErrorStopperWithContext"
×
453
                case interfaces.Unsubscriber:
×
454
                        closer = "Unsubscriber"
×
455
                case interfaces.ErrorUnsubscriber:
×
456
                        closer = "ErrorUnsubscriber"
×
457
                case interfaces.UnsubscriberWithContext:
×
458
                        closer = "UnsubscriberWithContext"
×
459
                case interfaces.ErrorUnsubscriberWithContext:
×
460
                        closer = "ErrorUnsubscriberWithContext"
×
461
                }
462
                rows = append(rows, []string{
×
463
                        markdown.Code(markdown.Name(value)),
×
464
                        markdown.Code(t.String()),
×
465
                        markdown.Code(closer),
×
466
                        markdown.String(value),
×
467
                })
×
468
        }
469
        if len(rows) > 0 {
×
470
                md.Println("### Closers")
×
471
                md.Println("")
×
472
                md.Println("List of all registered closers that are being called during graceful shutdown.")
×
473
                md.Println("")
×
474
                md.Table([]string{"Name", "Type", "Closer", "Description"}, rows)
×
475
                md.Println("")
×
476
        }
×
477

478
        return md.String()
×
479
}
480

481
func (s *Server) readmeHealthz() string {
×
482
        var rows [][]string
×
483
        md := &markdown.Markdown{}
×
484

×
485
        for k, probes := range s.probes() {
×
486
                for _, probe := range probes {
×
487
                        t := reflect.TypeOf(probe)
×
488
                        rows = append(rows, []string{
×
489
                                markdown.Code(markdown.Name(probe)),
×
490
                                markdown.Code(k.String()),
×
491
                                markdown.Code(t.String()),
×
492
                                markdown.String(probe),
×
493
                        })
×
494
                }
×
495
        }
496
        if len(rows) > 0 {
×
497
                md.Println("### Health probes")
×
498
                md.Println("")
×
499
                md.Println("List of all registered healthz probes that are being called during startup and runtime.")
×
500
                md.Println("")
×
501
                md.Table([]string{"Name", "Probe", "Type", "Description"}, rows)
×
502
        }
×
503

504
        return md.String()
×
505
}
506

507
func (s *Server) readmeServices() string {
×
508
        md := &markdown.Markdown{}
×
509

×
510
        {
×
511
                var rows [][]string
×
512
                for _, value := range s.initServices {
×
513
                        if v, ok := value.(*service.HTTP); ok {
×
514
                                t := reflect.TypeOf(v)
×
515
                                rows = append(rows, []string{
×
516
                                        markdown.Code(v.Name()),
×
517
                                        markdown.Code(t.String()),
×
518
                                        markdown.String(v),
×
519
                                })
×
520
                        }
×
521
                }
522
                if len(rows) > 0 {
×
523
                        md.Println("### Init Services")
×
524
                        md.Println("")
×
525
                        md.Println("List of all registered init services that are being immediately started.")
×
526
                        md.Println("")
×
527
                        md.Table([]string{"Name", "Type", "Address"}, rows)
×
528
                }
×
529
        }
530

531
        md.Println("")
×
532

×
533
        {
×
534
                var rows [][]string
×
535
                for _, value := range s.services {
×
536
                        t := reflect.TypeOf(value)
×
537
                        rows = append(rows, []string{
×
538
                                markdown.Code(value.Name()),
×
539
                                markdown.Code(t.String()),
×
540
                                markdown.String(value),
×
541
                        })
×
542
                }
×
543
                if len(rows) > 0 {
×
544
                        md.Println("### Runtime Services")
×
545
                        md.Println("")
×
546
                        md.Println("List of all registered services that are being started.")
×
547
                        md.Println("")
×
548
                        md.Table([]string{"Name", "Type", "Description"}, rows)
×
549
                }
×
550
        }
551

552
        return md.String()
×
553
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc