• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

foomo / keel / 11969135869

22 Nov 2024 08:32AM UTC coverage: 16.252% (-0.01%) from 16.265%
11969135869

push

github

web-flow
Merge pull request #220 from foomo/dependabot/go_modules/gomod-security-479016457e

chore(deps): bump google.golang.org/grpc from 1.67.1 to 1.68.0 in the gomod-security group across 1 directory

1222 of 7519 relevant lines covered (16.25%)

1.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.01
/server.go
1
package keel
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "net/http"
8
        "os"
9
        "os/signal"
10
        "reflect"
11
        "slices"
12
        "sync"
13
        "sync/atomic"
14
        "syscall"
15
        "time"
16

17
        "github.com/foomo/keel/config"
18
        "github.com/foomo/keel/env"
19
        "github.com/foomo/keel/healthz"
20
        "github.com/foomo/keel/interfaces"
21
        "github.com/foomo/keel/log"
22
        "github.com/foomo/keel/markdown"
23
        "github.com/foomo/keel/metrics"
24
        "github.com/foomo/keel/service"
25
        "github.com/foomo/keel/telemetry"
26
        "github.com/go-logr/logr"
27
        "github.com/spf13/viper"
28
        otelhost "go.opentelemetry.io/contrib/instrumentation/host"
29
        otelruntime "go.opentelemetry.io/contrib/instrumentation/runtime"
30
        "go.opentelemetry.io/otel"
31
        "go.opentelemetry.io/otel/metric"
32
        "go.opentelemetry.io/otel/propagation"
33
        "go.opentelemetry.io/otel/trace"
34
        "go.uber.org/zap"
35
        "golang.org/x/sync/errgroup"
36
)
37

38
// Server struct
39
type Server struct {
40
        services        []Service
41
        initServices    []Service
42
        meter           metric.Meter
43
        meterProvider   metric.MeterProvider
44
        tracer          trace.Tracer
45
        traceProvider   trace.TracerProvider
46
        shutdown        atomic.Bool
47
        shutdownSignals []os.Signal
48
        // gracefulPeriod should equal the terminationGracePeriodSeconds
49
        gracefulPeriod   time.Duration
50
        running          atomic.Bool
51
        syncClosers      []interface{}
52
        syncClosersLock  sync.RWMutex
53
        syncReadmers     []interfaces.Readmer
54
        syncReadmersLock sync.RWMutex
55
        syncProbes       map[healthz.Type][]interface{}
56
        syncProbesLock   sync.RWMutex
57
        ctx              context.Context
58
        cancel           context.CancelFunc
59
        gracefulCtx      context.Context
60
        gracefulCancel   context.CancelFunc
61
        g                *errgroup.Group
62
        gCtx             context.Context
63
        l                *zap.Logger
64
        c                *viper.Viper
65
}
66

67
func NewServer(opts ...Option) *Server {
3✔
68
        inst := &Server{
3✔
69
                gracefulPeriod:  time.Duration(env.GetInt("KEEL_GRACEFUL_PERIOD", 30)) * time.Second,
3✔
70
                shutdownSignals: []os.Signal{syscall.SIGINT, syscall.SIGTERM},
3✔
71
                syncReadmers:    []interfaces.Readmer{},
3✔
72
                syncProbes:      map[healthz.Type][]interface{}{},
3✔
73
                ctx:             context.Background(),
3✔
74
                c:               config.Config(),
3✔
75
                l:               log.Logger(),
3✔
76
        }
3✔
77

3✔
78
        for _, opt := range opts {
9✔
79
                opt(inst)
6✔
80
        }
6✔
81

82
        { // setup error group
3✔
83
                inst.AddReadinessHealthzers(healthz.NewHealthzerFn(func(ctx context.Context) error {
3✔
84
                        if inst.shutdown.Load() {
×
85
                                return ErrServerShutdown
×
86
                        }
×
87
                        return nil
×
88
                }))
89

90
                inst.ctx, inst.cancel = context.WithCancel(inst.ctx)
3✔
91
                inst.g, inst.gCtx = errgroup.WithContext(inst.ctx)
3✔
92
                inst.gracefulCtx, inst.gracefulCancel = signal.NotifyContext(inst.ctx, inst.shutdownSignals...)
3✔
93

3✔
94
                // gracefully shutdown
3✔
95
                inst.g.Go(func() error {
6✔
96
                        <-inst.gracefulCtx.Done()
3✔
97
                        inst.shutdown.Store(true)
3✔
98
                        timeoutCtx, timeoutCancel := context.WithTimeout(inst.ctx, inst.gracefulPeriod)
3✔
99
                        defer timeoutCancel()
3✔
100

3✔
101
                        inst.l.Info("keel graceful shutdown",
3✔
102
                                zap.Duration("graceful_period", inst.gracefulPeriod),
3✔
103
                        )
3✔
104

3✔
105
                        // append internal closers
3✔
106
                        closers := append(inst.closers(), inst.traceProvider, inst.meterProvider)
3✔
107

3✔
108
                        inst.l.Info("keel graceful shutdown: closers")
3✔
109
                        for _, closer := range closers {
13✔
110
                                var err error
10✔
111
                                l := inst.l.With(log.FName(fmt.Sprintf("%T", closer)))
10✔
112
                                switch c := closer.(type) {
10✔
113
                                case interfaces.Closer:
×
114
                                        c.Close()
×
115
                                case interfaces.ErrorCloser:
×
116
                                        err = c.Close()
×
117
                                case interfaces.CloserWithContext:
×
118
                                        c.Close(timeoutCtx)
×
119
                                case interfaces.ErrorCloserWithContext:
4✔
120
                                        err = c.Close(timeoutCtx)
4✔
121
                                case interfaces.Shutdowner:
×
122
                                        c.Shutdown()
×
123
                                case interfaces.ErrorShutdowner:
×
124
                                        err = c.Shutdown()
×
125
                                case interfaces.ShutdownerWithContext:
×
126
                                        c.Shutdown(timeoutCtx)
×
127
                                case interfaces.ErrorShutdownerWithContext:
×
128
                                        err = c.Shutdown(timeoutCtx)
×
129
                                case interfaces.Stopper:
×
130
                                        c.Stop()
×
131
                                case interfaces.ErrorStopper:
×
132
                                        err = c.Stop()
×
133
                                case interfaces.StopperWithContext:
×
134
                                        c.Stop(timeoutCtx)
×
135
                                case interfaces.ErrorStopperWithContext:
×
136
                                        err = c.Stop(timeoutCtx)
×
137
                                case interfaces.Unsubscriber:
×
138
                                        c.Unsubscribe()
×
139
                                case interfaces.ErrorUnsubscriber:
×
140
                                        err = c.Unsubscribe()
×
141
                                case interfaces.UnsubscriberWithContext:
×
142
                                        c.Unsubscribe(timeoutCtx)
×
143
                                case interfaces.ErrorUnsubscriberWithContext:
×
144
                                        err = c.Unsubscribe(timeoutCtx)
×
145
                                }
146
                                if err != nil {
10✔
147
                                        l.Warn("keel graceful shutdown: closer failed", zap.Error(err))
×
148
                                } else {
10✔
149
                                        l.Debug("keel graceful shutdown: closer closed")
10✔
150
                                }
10✔
151
                        }
152

153
                        inst.l.Info("keel graceful shutdown: complete")
3✔
154

3✔
155
                        return ErrServerShutdown
3✔
156
                })
157
        }
158

159
        { // setup telemetry
3✔
160
                var err error
3✔
161
                otel.SetLogger(logr.New(telemetry.NewLogger(inst.l)))
3✔
162
                otel.SetErrorHandler(telemetry.NewErrorHandler(inst.l))
3✔
163
                otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
3✔
164

3✔
165
                if inst.meterProvider == nil {
6✔
166
                        inst.meterProvider, err = telemetry.NewNoopMeterProvider()
3✔
167
                        log.Must(inst.l, err, "failed to create meter provider")
3✔
168
                } else if env.GetBool("OTEL_ENABLED", false) {
3✔
169
                        if env.GetBool("OTEL_METRICS_HOST_ENABLED", false) {
×
170
                                log.Must(inst.l, otelhost.Start(), "failed to start otel host metrics")
×
171
                        }
×
172
                        if env.GetBool("OTEL_METRICS_RUNTIME_ENABLED", false) {
×
173
                                log.Must(inst.l, otelruntime.Start(), "failed to start otel runtime metrics")
×
174
                        }
×
175
                }
176
                inst.meter = telemetry.Meter()
3✔
177

3✔
178
                if inst.traceProvider == nil {
6✔
179
                        inst.traceProvider, err = telemetry.NewNoopTraceProvider()
3✔
180
                        log.Must(inst.l, err, "failed to create tracer provider")
3✔
181
                }
3✔
182
                inst.tracer = telemetry.Tracer()
3✔
183
        }
184

185
        // add probe
186
        inst.AddAlwaysHealthzers(inst)
3✔
187
        inst.AddReadmers(
3✔
188
                interfaces.ReadmeFunc(env.Readme),
3✔
189
                interfaces.ReadmeFunc(config.Readme),
3✔
190
                inst,
3✔
191
                interfaces.ReadmeFunc(metrics.Readme),
3✔
192
        )
3✔
193

3✔
194
        // start init services
3✔
195
        inst.startService(inst.initServices...)
3✔
196

3✔
197
        return inst
3✔
198
}
199

200
// Logger returns server logger
201
func (s *Server) Logger() *zap.Logger {
3✔
202
        return s.l
3✔
203
}
3✔
204

205
// Meter returns the implementation meter
206
func (s *Server) Meter() metric.Meter {
×
207
        return s.meter
×
208
}
×
209

210
// Tracer returns the implementation tracer
211
func (s *Server) Tracer() trace.Tracer {
×
212
        return s.tracer
×
213
}
×
214

215
// Config returns server config
216
func (s *Server) Config() *viper.Viper {
×
217
        return s.c
×
218
}
×
219

220
// Context returns server context
221
func (s *Server) Context() context.Context {
×
222
        return s.ctx
×
223
}
×
224

225
// ShutdownContext returns server's shutdown cancel context
226
func (s *Server) ShutdownContext() context.Context {
×
227
        return s.gracefulCtx
×
228
}
×
229

230
// ShutdownCancel returns server's shutdown cancel function
231
func (s *Server) ShutdownCancel() context.CancelFunc {
×
232
        return s.gracefulCancel
×
233
}
×
234

235
// AddService add a single service
236
func (s *Server) AddService(v Service) {
4✔
237
        if !slices.Contains(s.services, v) {
8✔
238
                s.services = append(s.services, v)
4✔
239
                s.AddAlwaysHealthzers(v)
4✔
240
                s.AddCloser(v)
4✔
241
        }
4✔
242
}
243

244
// AddServices adds multiple service
245
func (s *Server) AddServices(services ...Service) {
3✔
246
        for _, value := range services {
7✔
247
                s.AddService(value)
4✔
248
        }
4✔
249
}
250

251
// AddCloser adds a closer to be called on shutdown
252
func (s *Server) AddCloser(closer interface{}) {
4✔
253
        for _, value := range s.closers() {
5✔
254
                if value == closer {
1✔
255
                        return
×
256
                }
×
257
        }
258
        if IsCloser(closer) {
8✔
259
                s.addClosers(closer)
4✔
260
        } else {
4✔
261
                s.l.Warn("unable to add closer", log.FValue(fmt.Sprintf("%T", closer)))
×
262
        }
×
263
}
264

265
// AddClosers adds the given closers to be called on shutdown
266
func (s *Server) AddClosers(closers ...interface{}) {
×
267
        for _, closer := range closers {
×
268
                s.AddCloser(closer)
×
269
        }
×
270
}
271

272
// AddReadmer adds a readmer to be added to the exposed readme
273
func (s *Server) AddReadmer(readmer interfaces.Readmer) {
12✔
274
        s.addReadmers(readmer)
12✔
275
}
12✔
276

277
// AddReadmers adds readmers to be added to the exposed readme
278
func (s *Server) AddReadmers(readmers ...interfaces.Readmer) {
3✔
279
        for _, readmer := range readmers {
15✔
280
                s.AddReadmer(readmer)
12✔
281
        }
12✔
282
}
283

284
// AddHealthzer adds a probe to be called on healthz checks
285
func (s *Server) AddHealthzer(typ healthz.Type, probe interface{}) {
10✔
286
        if IsHealthz(probe) {
20✔
287
                s.addProbes(typ, probe)
10✔
288
        } else {
10✔
289
                s.l.Debug("not a healthz probe", log.FValue(fmt.Sprintf("%T", probe)))
×
290
        }
×
291
}
292

293
// AddHealthzers adds the given probes to be called on healthz checks
294
func (s *Server) AddHealthzers(typ healthz.Type, probes ...interface{}) {
10✔
295
        for _, probe := range probes {
20✔
296
                s.AddHealthzer(typ, probe)
10✔
297
        }
10✔
298
}
299

300
// AddAlwaysHealthzers adds the probes to be called on any healthz checks
301
func (s *Server) AddAlwaysHealthzers(probes ...interface{}) {
7✔
302
        s.AddHealthzers(healthz.TypeAlways, probes...)
7✔
303
}
7✔
304

305
// AddStartupHealthzers adds the startup probes to be called on healthz checks
306
func (s *Server) AddStartupHealthzers(probes ...interface{}) {
×
307
        s.AddHealthzers(healthz.TypeStartup, probes...)
×
308
}
×
309

310
// AddLivenessHealthzers adds the liveness probes to be called on healthz checks
311
func (s *Server) AddLivenessHealthzers(probes ...interface{}) {
×
312
        s.AddHealthzers(healthz.TypeLiveness, probes...)
×
313
}
×
314

315
// AddReadinessHealthzers adds the readiness probes to be called on healthz checks
316
func (s *Server) AddReadinessHealthzers(probes ...interface{}) {
3✔
317
        s.AddHealthzers(healthz.TypeReadiness, probes...)
3✔
318
}
3✔
319

320
// Healthz returns true if the server is running
321
func (s *Server) Healthz() error {
×
322
        if !s.running.Load() {
×
323
                return ErrServerNotRunning
×
324
        }
×
325
        return nil
×
326
}
327

328
// Run runs the server
329
func (s *Server) Run() {
3✔
330
        s.l.Info("starting keel server")
3✔
331
        defer s.cancel()
3✔
332

3✔
333
        // start services
3✔
334
        s.startService(s.services...)
3✔
335

3✔
336
        // add init services to closers
3✔
337
        for _, initService := range s.initServices {
3✔
338
                s.AddClosers(initService)
×
339
        }
×
340

341
        // set running
342
        defer s.running.Store(false)
3✔
343
        s.running.Store(true)
3✔
344

3✔
345
        // wait for shutdown
3✔
346
        if err := s.g.Wait(); errors.Is(err, ErrServerShutdown) {
6✔
347
                s.l.Info("keel server stopped")
3✔
348
        } else if err != nil {
3✔
349
                log.WithError(s.l, err).Error("keel server failed")
×
350
        }
×
351
}
352

353
func (s *Server) closers() []interface{} {
7✔
354
        s.syncClosersLock.RLock()
7✔
355
        defer s.syncClosersLock.RUnlock()
7✔
356
        return s.syncClosers
7✔
357
}
7✔
358

359
func (s *Server) addClosers(v ...interface{}) {
4✔
360
        s.syncClosersLock.Lock()
4✔
361
        defer s.syncClosersLock.Unlock()
4✔
362
        s.syncClosers = append(s.syncClosers, v...)
4✔
363
}
4✔
364

365
func (s *Server) readmers() []interfaces.Readmer {
×
366
        s.syncReadmersLock.RLock()
×
367
        defer s.syncReadmersLock.RUnlock()
×
368
        return s.syncReadmers
×
369
}
×
370

371
func (s *Server) addReadmers(v ...interfaces.Readmer) {
12✔
372
        s.syncReadmersLock.Lock()
12✔
373
        defer s.syncReadmersLock.Unlock()
12✔
374
        s.syncReadmers = append(s.syncReadmers, v...)
12✔
375
}
12✔
376

377
func (s *Server) probes() map[healthz.Type][]interface{} {
×
378
        s.syncProbesLock.RLock()
×
379
        defer s.syncProbesLock.RUnlock()
×
380
        return s.syncProbes
×
381
}
×
382

383
func (s *Server) addProbes(typ healthz.Type, v ...interface{}) {
10✔
384
        s.syncProbesLock.Lock()
10✔
385
        defer s.syncProbesLock.Unlock()
10✔
386
        s.syncProbes[typ] = append(s.syncProbes[typ], v...)
10✔
387
}
10✔
388

389
// Readme returns the self-documenting string
390
func (s *Server) Readme() string {
×
391
        md := &markdown.Markdown{}
×
392

×
393
        md.Println(s.readmeServices())
×
394
        md.Println(s.readmeHealthz())
×
395
        md.Print(s.readmeCloser())
×
396

×
397
        return md.String()
×
398
}
×
399

400
// ------------------------------------------------------------------------------------------------
401
// ~ Private methods
402
// ------------------------------------------------------------------------------------------------
403

404
// startService starts the given services
405
func (s *Server) startService(services ...Service) {
6✔
406
        c := make(chan struct{}, 1)
6✔
407
        for _, value := range services {
10✔
408
                s.g.Go(func() error {
8✔
409
                        c <- struct{}{}
4✔
410
                        if err := value.Start(s.ctx); errors.Is(err, http.ErrServerClosed) {
4✔
411
                                log.WithError(s.l, err).Debug("server has closed")
×
412
                        } else if err != nil {
4✔
413
                                log.WithError(s.l, err).Error("failed to start service")
×
414
                                return err
×
415
                        }
×
416
                        return nil
4✔
417
                })
418
                <-c
4✔
419
        }
420
        close(c)
6✔
421
}
422

423
func (s *Server) readmeCloser() string {
×
424
        md := &markdown.Markdown{}
×
425
        closers := s.closers()
×
426
        rows := make([][]string, 0, len(closers))
×
427
        for _, value := range closers {
×
428
                t := reflect.TypeOf(value)
×
429
                var closer string
×
430
                switch value.(type) {
×
431
                case interfaces.Closer:
×
432
                        closer = "Closer"
×
433
                case interfaces.ErrorCloser:
×
434
                        closer = "ErrorCloser"
×
435
                case interfaces.CloserWithContext:
×
436
                        closer = "CloserWithContext"
×
437
                case interfaces.ErrorCloserWithContext:
×
438
                        closer = "ErrorCloserWithContext"
×
439
                case interfaces.Shutdowner:
×
440
                        closer = "Shutdowner"
×
441
                case interfaces.ErrorShutdowner:
×
442
                        closer = "ErrorShutdowner"
×
443
                case interfaces.ShutdownerWithContext:
×
444
                        closer = "ShutdownerWithContext"
×
445
                case interfaces.ErrorShutdownerWithContext:
×
446
                        closer = "ErrorShutdownerWithContext"
×
447
                case interfaces.Stopper:
×
448
                        closer = "Stopper"
×
449
                case interfaces.ErrorStopper:
×
450
                        closer = "ErrorStopper"
×
451
                case interfaces.StopperWithContext:
×
452
                        closer = "StopperWithContext"
×
453
                case interfaces.ErrorStopperWithContext:
×
454
                        closer = "ErrorStopperWithContext"
×
455
                case interfaces.Unsubscriber:
×
456
                        closer = "Unsubscriber"
×
457
                case interfaces.ErrorUnsubscriber:
×
458
                        closer = "ErrorUnsubscriber"
×
459
                case interfaces.UnsubscriberWithContext:
×
460
                        closer = "UnsubscriberWithContext"
×
461
                case interfaces.ErrorUnsubscriberWithContext:
×
462
                        closer = "ErrorUnsubscriberWithContext"
×
463
                }
464
                rows = append(rows, []string{
×
465
                        markdown.Code(markdown.Name(value)),
×
466
                        markdown.Code(t.String()),
×
467
                        markdown.Code(closer),
×
468
                        markdown.String(value),
×
469
                })
×
470
        }
471
        if len(rows) > 0 {
×
472
                md.Println("### Closers")
×
473
                md.Println("")
×
474
                md.Println("List of all registered closers that are being called during graceful shutdown.")
×
475
                md.Println("")
×
476
                md.Table([]string{"Name", "Type", "Closer", "Description"}, rows)
×
477
                md.Println("")
×
478
        }
×
479

480
        return md.String()
×
481
}
482

483
func (s *Server) readmeHealthz() string {
×
484
        var rows [][]string
×
485
        md := &markdown.Markdown{}
×
486

×
487
        for k, probes := range s.probes() {
×
488
                for _, probe := range probes {
×
489
                        t := reflect.TypeOf(probe)
×
490
                        rows = append(rows, []string{
×
491
                                markdown.Code(markdown.Name(probe)),
×
492
                                markdown.Code(k.String()),
×
493
                                markdown.Code(t.String()),
×
494
                                markdown.String(probe),
×
495
                        })
×
496
                }
×
497
        }
498
        if len(rows) > 0 {
×
499
                md.Println("### Health probes")
×
500
                md.Println("")
×
501
                md.Println("List of all registered healthz probes that are being called during startup and runtime.")
×
502
                md.Println("")
×
503
                md.Table([]string{"Name", "Probe", "Type", "Description"}, rows)
×
504
        }
×
505

506
        return md.String()
×
507
}
508

509
func (s *Server) readmeServices() string {
×
510
        md := &markdown.Markdown{}
×
511

×
512
        {
×
513
                var rows [][]string
×
514
                for _, value := range s.initServices {
×
515
                        if v, ok := value.(*service.HTTP); ok {
×
516
                                t := reflect.TypeOf(v)
×
517
                                rows = append(rows, []string{
×
518
                                        markdown.Code(v.Name()),
×
519
                                        markdown.Code(t.String()),
×
520
                                        markdown.String(v),
×
521
                                })
×
522
                        }
×
523
                }
524
                if len(rows) > 0 {
×
525
                        md.Println("### Init Services")
×
526
                        md.Println("")
×
527
                        md.Println("List of all registered init services that are being immediately started.")
×
528
                        md.Println("")
×
529
                        md.Table([]string{"Name", "Type", "Address"}, rows)
×
530
                }
×
531
        }
532

533
        md.Println("")
×
534

×
535
        {
×
536
                var rows [][]string
×
537
                for _, value := range s.services {
×
538
                        t := reflect.TypeOf(value)
×
539
                        rows = append(rows, []string{
×
540
                                markdown.Code(value.Name()),
×
541
                                markdown.Code(t.String()),
×
542
                                markdown.String(value),
×
543
                        })
×
544
                }
×
545
                if len(rows) > 0 {
×
546
                        md.Println("### Runtime Services")
×
547
                        md.Println("")
×
548
                        md.Println("List of all registered services that are being started.")
×
549
                        md.Println("")
×
550
                        md.Table([]string{"Name", "Type", "Description"}, rows)
×
551
                }
×
552
        }
553

554
        return md.String()
×
555
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc