• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

gatewayd-io / gatewayd / 22284447224

22 Feb 2026 08:10PM UTC coverage: 59.676% (+0.2%) from 59.468%
22284447224

Pull #731

github

mostafa
Fetch latest version
Pull Request #731: Extensive plugin tests

4 of 4 new or added lines in 1 file covered. (100.0%)

38 existing lines in 4 files now uncovered.

5791 of 9704 relevant lines covered (59.68%)

17.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.6
/network/server.go
1
package network
2

3
import (
4
        "context"
5
        "crypto/tls"
6
        "errors"
7
        "fmt"
8
        "net"
9
        "os"
10
        "strconv"
11
        "sync"
12
        "sync/atomic"
13
        "time"
14

15
        v1 "github.com/gatewayd-io/gatewayd-plugin-sdk/plugin/v1"
16
        "github.com/gatewayd-io/gatewayd/config"
17
        gerr "github.com/gatewayd-io/gatewayd/errors"
18
        "github.com/gatewayd-io/gatewayd/metrics"
19
        "github.com/gatewayd-io/gatewayd/plugin"
20
        "github.com/gatewayd-io/gatewayd/raft"
21
        "github.com/rs/zerolog"
22
        "go.opentelemetry.io/otel"
23
        "go.opentelemetry.io/otel/attribute"
24
)
25

26
type Option struct {
27
        EnableTicker bool
28
}
29

30
type Action int
31

32
const (
33
        None Action = iota
34
        Close
35
        Shutdown
36
)
37

38
//nolint:interfacebloat
39
type IServer interface {
40
        OnBoot() Action
41
        OnOpen(conn *ConnWrapper) ([]byte, Action)
42
        OnClose(conn *ConnWrapper, err error) Action
43
        OnTraffic(conn *ConnWrapper) Action
44
        OnShutdown()
45
        OnTick() (time.Duration, Action)
46
        Run() *gerr.GatewayDError
47
        Shutdown()
48
        IsRunning() bool
49
        IsTLSEnabled() bool
50
        CountConnections() int
51
        GetProxyForConnection(conn *ConnWrapper) (IProxy, bool)
52
        RemoveConnectionFromMap(conn *ConnWrapper)
53
}
54

55
type Server struct {
56
        Proxies        []IProxy
57
        Logger         zerolog.Logger
58
        PluginRegistry *plugin.Registry
59
        ctx            context.Context //nolint:containedctx
60
        PluginTimeout  time.Duration
61
        mu             *sync.RWMutex
62

63
        GroupName string
64

65
        Network      string // tcp/udp/unix
66
        Address      string
67
        Options      Option
68
        Status       config.Status
69
        TickInterval time.Duration
70

71
        // TLS config
72
        EnableTLS        bool
73
        CertFile         string
74
        KeyFile          string
75
        HandshakeTimeout time.Duration
76

77
        listener     net.Listener
78
        host         string
79
        port         int
80
        connections  uint32
81
        running      *atomic.Bool
82
        isTLSEnabled *atomic.Bool
83
        stopServer   chan struct{}
84

85
        // loadbalancer
86
        loadbalancerStrategy       LoadBalancerStrategy
87
        LoadbalancerStrategyName   string
88
        LoadbalancerRules          []config.LoadBalancingRule
89
        LoadbalancerConsistentHash *config.ConsistentHash
90
        connectionToProxyMap       *sync.Map
91

92
        RaftNode *raft.Node
93

94
        ProxyByBlock map[string]IProxy
95
}
96

97
var _ IServer = (*Server)(nil)
98

99
// OnBoot is called when the server is booted. It calls the OnBooting and OnBooted hooks.
100
// It also sets the status to running, which is used to determine if the server should be running
101
// or shutdown.
102
func (s *Server) OnBoot() Action {
1✔
103
        _, span := otel.Tracer("gatewayd").Start(s.ctx, "OnBoot")
1✔
104
        defer span.End()
1✔
105

1✔
106
        s.Logger.Debug().Msg("GatewayD is booting...")
1✔
107

1✔
108
        pluginTimeoutCtx, cancel := context.WithTimeout(context.Background(), s.PluginTimeout)
1✔
109
        defer cancel()
1✔
110
        // Run the OnBooting hooks.
1✔
111
        result, err := s.PluginRegistry.Run(
1✔
112
                pluginTimeoutCtx,
1✔
113
                map[string]any{"status": fmt.Sprint(s.Status)},
1✔
114
                v1.HookName_HOOK_NAME_ON_BOOTING)
1✔
115
        if err != nil {
1✔
116
                s.Logger.Error().Err(err).Msg("Failed to run OnBooting hook")
×
117
                span.RecordError(err)
×
118
        }
×
119
        if result != nil {
2✔
120
                _ = s.PluginRegistry.ActRegistry.RunAll(result)
1✔
121
        }
1✔
122
        span.AddEvent("Ran the OnBooting hooks")
1✔
123

1✔
124
        // Set the server status to running.
1✔
125
        s.mu.Lock()
1✔
126
        s.Status = config.Running
1✔
127
        s.mu.Unlock()
1✔
128

1✔
129
        // Run the OnBooted hooks.
1✔
130
        pluginTimeoutCtx, cancel = context.WithTimeout(context.Background(), s.PluginTimeout)
1✔
131
        defer cancel()
1✔
132

1✔
133
        result, err = s.PluginRegistry.Run(
1✔
134
                pluginTimeoutCtx,
1✔
135
                map[string]any{"status": fmt.Sprint(s.Status)},
1✔
136
                v1.HookName_HOOK_NAME_ON_BOOTED)
1✔
137
        if err != nil {
1✔
138
                s.Logger.Error().Err(err).Msg("Failed to run OnBooted hook")
×
139
                span.RecordError(err)
×
140
        }
×
141
        if result != nil {
2✔
142
                _ = s.PluginRegistry.ActRegistry.RunAll(result)
1✔
143
        }
1✔
144
        span.AddEvent("Ran the OnBooted hooks")
1✔
145

1✔
146
        s.Logger.Debug().Msg("GatewayD booted")
1✔
147

1✔
148
        return None
1✔
149
}
150

151
// OnOpen is called when a new connection is opened. It calls the OnOpening and OnOpened hooks.
152
// It also checks if the server is at the soft or hard limit and closes the connection if it is.
153
func (s *Server) OnOpen(conn *ConnWrapper) ([]byte, Action) {
2✔
154
        _, span := otel.Tracer("gatewayd").Start(s.ctx, "OnOpen")
2✔
155
        defer span.End()
2✔
156

2✔
157
        s.Logger.Debug().Str("from", RemoteAddr(conn.Conn())).Msg(
2✔
158
                "GatewayD is opening a connection")
2✔
159

2✔
160
        pluginTimeoutCtx, cancel := context.WithTimeout(context.Background(), s.PluginTimeout)
2✔
161
        defer cancel()
2✔
162
        // Run the OnOpening hooks.
2✔
163
        onOpeningData := map[string]any{
2✔
164
                "client": map[string]any{
2✔
165
                        "local":  LocalAddr(conn.Conn()),
2✔
166
                        "remote": RemoteAddr(conn.Conn()),
2✔
167
                },
2✔
168
        }
2✔
169
        result, err := s.PluginRegistry.Run(
2✔
170
                pluginTimeoutCtx, onOpeningData, v1.HookName_HOOK_NAME_ON_OPENING)
2✔
171
        if err != nil {
2✔
172
                s.Logger.Error().Err(err).Msg("Failed to run OnOpening hook")
×
173
                span.RecordError(err)
×
174
        }
×
175
        if result != nil {
4✔
176
                _ = s.PluginRegistry.ActRegistry.RunAll(result)
2✔
177
        }
2✔
178
        span.AddEvent("Ran the OnOpening hooks")
2✔
179

2✔
180
        findProxyCtx, cancel := context.WithTimeout(context.Background(), config.FindProxyTimeout)
2✔
181
        defer cancel()
2✔
182

2✔
183
        // Attempt to retrieve the next proxy.
2✔
184
        proxy, err := s.loadbalancerStrategy.NextProxy(findProxyCtx, conn)
2✔
185
        if err != nil {
2✔
186
                span.RecordError(err)
×
187
                s.Logger.Error().Err(err).Msg("failed to retrieve next proxy")
×
188
                return nil, Close
×
189
        }
×
190

191
        // Use the proxy to connect to the backend. Close the connection if the pool is exhausted.
192
        // This effectively get a connection from the pool and puts both the incoming and the server
193
        // connections in the pool of the busy connections.
194
        if err := proxy.Connect(conn); err != nil {
2✔
195
                if errors.Is(err, gerr.ErrPoolExhausted) {
×
196
                        span.RecordError(err)
×
197
                        return nil, Close
×
198
                }
×
199

200
                // This should never happen.
201
                // TODO: Send error to client or retry connection
202
                s.Logger.Error().Err(err).Msg("Failed to connect to proxy")
×
203
                span.RecordError(err)
×
204
                return nil, None
×
205
        }
206

207
        // Assign connection to proxy
208
        s.connectionToProxyMap.Store(conn, proxy)
2✔
209

2✔
210
        // Run the OnOpened hooks.
2✔
211
        pluginTimeoutCtx, cancel = context.WithTimeout(context.Background(), s.PluginTimeout)
2✔
212
        defer cancel()
2✔
213

2✔
214
        onOpenedData := map[string]any{
2✔
215
                "client": map[string]any{
2✔
216
                        "local":  LocalAddr(conn.Conn()),
2✔
217
                        "remote": RemoteAddr(conn.Conn()),
2✔
218
                },
2✔
219
        }
2✔
220
        result, err = s.PluginRegistry.Run(
2✔
221
                pluginTimeoutCtx, onOpenedData, v1.HookName_HOOK_NAME_ON_OPENED)
2✔
222
        if err != nil {
2✔
223
                s.Logger.Error().Err(err).Msg("Failed to run OnOpened hook")
×
224
                span.RecordError(err)
×
225
        }
×
226
        if result != nil {
4✔
227
                _ = s.PluginRegistry.ActRegistry.RunAll(result)
2✔
228
        }
2✔
229
        span.AddEvent("Ran the OnOpened hooks")
2✔
230

2✔
231
        metrics.ClientConnections.WithLabelValues(s.GroupName, proxy.GetBlockName()).Inc()
2✔
232

2✔
233
        return nil, None
2✔
234
}
235

236
// OnClose is called when a connection is closed. It calls the OnClosing and OnClosed hooks.
237
// It also recycles the connection back to the available connection pool.
238
func (s *Server) OnClose(conn *ConnWrapper, err error) Action {
2✔
239
        _, span := otel.Tracer("gatewayd").Start(s.ctx, "OnClose")
2✔
240
        defer span.End()
2✔
241

2✔
242
        s.Logger.Debug().Str("from", RemoteAddr(conn.Conn())).Msg(
2✔
243
                "GatewayD is closing a connection")
2✔
244

2✔
245
        // Run the OnClosing hooks.
2✔
246
        pluginTimeoutCtx, cancel := context.WithTimeout(context.Background(), s.PluginTimeout)
2✔
247
        defer cancel()
2✔
248

2✔
249
        data := map[string]any{
2✔
250
                "client": map[string]any{
2✔
251
                        "local":  LocalAddr(conn.Conn()),
2✔
252
                        "remote": RemoteAddr(conn.Conn()),
2✔
253
                },
2✔
254
                "error": "",
2✔
255
        }
2✔
256
        if err != nil {
2✔
257
                data["error"] = err.Error()
×
258
        }
×
259
        result, gatewaydErr := s.PluginRegistry.Run(
2✔
260
                pluginTimeoutCtx, data, v1.HookName_HOOK_NAME_ON_CLOSING)
2✔
261
        if gatewaydErr != nil {
2✔
262
                s.Logger.Error().Err(gatewaydErr).Msg("Failed to run OnClosing hook")
×
263
                span.RecordError(gatewaydErr)
×
264
        }
×
265
        if result != nil {
4✔
266
                _ = s.PluginRegistry.ActRegistry.RunAll(result)
2✔
267
        }
2✔
268
        span.AddEvent("Ran the OnClosing hooks")
2✔
269

2✔
270
        // Shutdown the server if there are no more connections and the server is stopped.
2✔
271
        // This is used to shut down the server gracefully.
2✔
272
        if s.CountConnections() == 0 && !s.IsRunning() {
2✔
UNCOV
273
                span.AddEvent("Shutting down the server")
×
UNCOV
274
                return Shutdown
×
UNCOV
275
        }
×
276

277
        // Find the proxy associated with the given connection
278
        proxy, exists := s.GetProxyForConnection(conn)
2✔
279
        if !exists {
2✔
280
                // Log an error and return Close if no matching proxy is found
×
281
                s.Logger.Error().Msg("Failed to find proxy to disconnect it")
×
282
                return Close
×
283
        }
×
284

285
        // Disconnect the connection from the proxy. This effectively removes the mapping between
286
        // the incoming and the server connections in the pool of the busy connections and either
287
        // recycles or disconnects the connections.
288
        if err := proxy.Disconnect(conn); err != nil {
2✔
UNCOV
289
                // During shutdown, proxy.Shutdown() already cleans up busy connections,
×
UNCOV
290
                // so Disconnect will fail with ErrClientNotFound. This is expected.
×
UNCOV
291
                if err == gerr.ErrClientNotFound && !s.IsRunning() {
×
UNCOV
292
                        s.Logger.Debug().Msg("Connection already cleaned up during shutdown")
×
UNCOV
293
                } else {
×
294
                        s.Logger.Error().Err(err).Msg("Failed to disconnect the server connection")
×
295
                        span.RecordError(err)
×
296
                }
×
UNCOV
297
                return Close
×
298
        }
299

300
        // remove a connection from proxy connention map
301
        s.RemoveConnectionFromMap(conn)
2✔
302

2✔
303
        if conn.IsTLSEnabled() {
2✔
304
                metrics.TLSConnections.WithLabelValues(s.GroupName, proxy.GetBlockName()).Dec()
×
305
        }
×
306

307
        // Close the incoming connection.
308
        if err := conn.Close(); err != nil {
2✔
309
                s.Logger.Error().Err(err).Msg("Failed to close the incoming connection")
×
310
                span.RecordError(err)
×
311
                return Close
×
312
        }
×
313

314
        // Run the OnClosed hooks.
315
        pluginTimeoutCtx, cancel = context.WithTimeout(context.Background(), s.PluginTimeout)
2✔
316
        defer cancel()
2✔
317

2✔
318
        data = map[string]any{
2✔
319
                "client": map[string]any{
2✔
320
                        "local":  LocalAddr(conn.Conn()),
2✔
321
                        "remote": RemoteAddr(conn.Conn()),
2✔
322
                },
2✔
323
                "error": "",
2✔
324
        }
2✔
325
        if err != nil {
2✔
326
                data["error"] = err.Error()
×
327
        }
×
328
        result, gatewaydErr = s.PluginRegistry.Run(
2✔
329
                pluginTimeoutCtx, data, v1.HookName_HOOK_NAME_ON_CLOSED)
2✔
330
        if gatewaydErr != nil {
2✔
331
                s.Logger.Error().Err(gatewaydErr).Msg("Failed to run OnClosed hook")
×
332
                span.RecordError(gatewaydErr)
×
333
        }
×
334
        if result != nil {
4✔
335
                _ = s.PluginRegistry.ActRegistry.RunAll(result)
2✔
336
        }
2✔
337
        span.AddEvent("Ran the OnClosed hooks")
2✔
338

2✔
339
        metrics.ClientConnections.WithLabelValues(s.GroupName, proxy.GetBlockName()).Dec()
2✔
340

2✔
341
        return Close
2✔
342
}
343

344
// OnTraffic is called when data is received from the client. It calls the OnTraffic hooks.
345
// It then passes the traffic to the proxied connection via two goroutines (client->server
346
// and server->client). When either goroutine exits (e.g. client disconnect or backend error),
347
// it interrupts the other by expiring its read deadline. OnTraffic waits for BOTH goroutines
348
// to finish before returning, ensuring the backend connection is idle and can be safely
349
// reused for session reset (DISCARD ALL) without concurrent reader conflicts.
350
func (s *Server) OnTraffic(conn *ConnWrapper) Action {
2✔
351
        _, span := otel.Tracer("gatewayd").Start(s.ctx, "OnTraffic")
2✔
352
        defer span.End()
2✔
353

2✔
354
        // Run the OnTraffic hooks.
2✔
355
        pluginTimeoutCtx, cancel := context.WithTimeout(context.Background(), s.PluginTimeout)
2✔
356
        defer cancel()
2✔
357

2✔
358
        onTrafficData := map[string]any{
2✔
359
                "client": map[string]any{
2✔
360
                        "local":  LocalAddr(conn.Conn()),
2✔
361
                        "remote": RemoteAddr(conn.Conn()),
2✔
362
                },
2✔
363
        }
2✔
364
        result, err := s.PluginRegistry.Run(
2✔
365
                pluginTimeoutCtx, onTrafficData, v1.HookName_HOOK_NAME_ON_TRAFFIC)
2✔
366
        if err != nil {
2✔
367
                s.Logger.Error().Err(err).Msg("Failed to run OnTraffic hook")
×
368
                span.RecordError(err)
×
369
        }
×
370
        if result != nil {
4✔
371
                _ = s.PluginRegistry.ActRegistry.RunAll(result)
2✔
372
        }
2✔
373
        span.AddEvent("Ran the OnTraffic hooks")
2✔
374

2✔
375
        stack := NewStack()
2✔
376

2✔
377
        var trafficWaitGroup sync.WaitGroup
2✔
378
        trafficWaitGroup.Add(2) //nolint:mnd
2✔
379

2✔
380
        // Pass the traffic from the client to server.
2✔
381
        // When this goroutine exits it expires the backend read deadline to
2✔
382
        // unblock the server->client goroutine's Receive() call.
2✔
383
        go func(server *Server, conn *ConnWrapper, stack *Stack) {
4✔
384
                defer trafficWaitGroup.Done()
2✔
385
                for {
8✔
386
                        server.Logger.Trace().Msg("Passing through traffic from client to server")
6✔
387

6✔
388
                        // Find the proxy associated with the given connection.
6✔
389
                        proxy, exists := server.GetProxyForConnection(conn)
6✔
390
                        if !exists {
6✔
391
                                server.Logger.Error().Msg("Failed to find proxy that matches the connection")
×
392
                                break
×
393
                        }
394

395
                        if err := proxy.PassThroughToServer(conn, stack); err != nil {
8✔
396
                                server.Logger.Trace().Err(err).Msg("Failed to pass through traffic")
2✔
397
                                span.RecordError(err)
2✔
398
                                break
2✔
399
                        }
400
                }
401
                // Unblock the server->client goroutine by expiring the backend read deadline.
402
                if proxy, exists := s.GetProxyForConnection(conn); exists {
4✔
403
                        proxy.ExpireBackendReadDeadline(conn)
2✔
404
                }
2✔
405
        }(s, conn, stack)
406

407
        // Pass the traffic from the server to client.
408
        // When this goroutine exits it expires the frontend read deadline to
409
        // unblock the client->server goroutine's Read() call.
410
        go func(server *Server, conn *ConnWrapper, stack *Stack) {
4✔
411
                defer trafficWaitGroup.Done()
2✔
412
                for {
6✔
413
                        server.Logger.Trace().Msg("Passing through traffic from server to client")
4✔
414

4✔
415
                        // Find the proxy associated with the given connection.
4✔
416
                        proxy, exists := server.GetProxyForConnection(conn)
4✔
417
                        if !exists {
4✔
418
                                server.Logger.Error().Msg("Failed to find proxy that matches the connection")
×
419
                                break
×
420
                        }
421
                        if err := proxy.PassThroughToClient(conn, stack); err != nil {
6✔
422
                                server.Logger.Trace().Err(err).Msg("Failed to pass through traffic")
2✔
423
                                span.RecordError(err)
2✔
424
                                break
2✔
425
                        }
426
                }
427
                // Unblock the client->server goroutine by expiring the frontend read deadline.
428
                if err := conn.Conn().SetReadDeadline(time.Now()); err != nil {
2✔
429
                        server.Logger.Error().Err(err).Msg("Failed to expire frontend read deadline")
×
430
                }
×
431
        }(s, conn, stack)
432

433
        // Wait for BOTH goroutines to finish. This guarantees the backend
434
        // connection is idle (no concurrent readers/writers) before Disconnect
435
        // attempts DISCARD ALL for session reset.
436
        trafficWaitGroup.Wait()
2✔
437

2✔
438
        // Clear the backend deadline so Disconnect -> ResetSession -> DISCARD ALL
2✔
439
        // can read/write on the connection without hitting a stale deadline.
2✔
440
        if proxy, exists := s.GetProxyForConnection(conn); exists {
4✔
441
                proxy.ClearBackendDeadline(conn)
2✔
442
        }
2✔
443

444
        stack.Clear()
2✔
445

2✔
446
        return Close
2✔
447
}
448

449
// OnShutdown is called when the server is shutting down. It calls the OnShutdown hooks.
450
func (s *Server) OnShutdown() {
1✔
451
        _, span := otel.Tracer("gatewayd").Start(s.ctx, "OnShutdown")
1✔
452
        defer span.End()
1✔
453

1✔
454
        s.Logger.Debug().Msg("GatewayD is shutting down")
1✔
455

1✔
456
        pluginTimeoutCtx, cancel := context.WithTimeout(context.Background(), s.PluginTimeout)
1✔
457
        defer cancel()
1✔
458
        // Run the OnShutdown hooks.
1✔
459
        result, err := s.PluginRegistry.Run(
1✔
460
                pluginTimeoutCtx,
1✔
461
                map[string]any{"connections": s.CountConnections()},
1✔
462
                v1.HookName_HOOK_NAME_ON_SHUTDOWN)
1✔
463
        if err != nil {
1✔
464
                s.Logger.Error().Err(err).Msg("Failed to run OnShutdown hook")
×
465
                span.RecordError(err)
×
466
        }
×
467
        if result != nil {
2✔
468
                _ = s.PluginRegistry.ActRegistry.RunAll(result)
1✔
469
        }
1✔
470
        span.AddEvent("Ran the OnShutdown hooks")
1✔
471

1✔
472
        // Shutdown proxies.
1✔
473
        for _, proxy := range s.Proxies {
3✔
474
                proxy.Shutdown()
2✔
475
        }
2✔
476

477
        // Set the server status to stopped. This is used to shutdown the server gracefully in OnClose.
478
        s.mu.Lock()
1✔
479
        s.Status = config.Stopped
1✔
480
        s.mu.Unlock()
1✔
481
}
482

483
// OnTick is called every TickInterval. It calls the OnTick hooks.
484
func (s *Server) OnTick() (time.Duration, Action) {
1✔
485
        _, span := otel.Tracer("gatewayd").Start(s.ctx, "OnTick")
1✔
486
        defer span.End()
1✔
487

1✔
488
        s.Logger.Debug().Msg("GatewayD is ticking...")
1✔
489
        s.Logger.Info().Str("count", strconv.Itoa(s.CountConnections())).Msg(
1✔
490
                "Active client connections")
1✔
491

1✔
492
        pluginTimeoutCtx, cancel := context.WithTimeout(context.Background(), s.PluginTimeout)
1✔
493
        defer cancel()
1✔
494
        // Run the OnTick hooks.
1✔
495
        result, err := s.PluginRegistry.Run(
1✔
496
                pluginTimeoutCtx,
1✔
497
                map[string]any{"connections": s.CountConnections()},
1✔
498
                v1.HookName_HOOK_NAME_ON_TICK)
1✔
499
        if err != nil {
1✔
500
                s.Logger.Error().Err(err).Msg("Failed to run OnTick hook")
×
501
                span.RecordError(err)
×
502
        }
×
503
        if result != nil {
2✔
504
                _ = s.PluginRegistry.ActRegistry.RunAll(result)
1✔
505
        }
1✔
506
        span.AddEvent("Ran the OnTick hooks")
1✔
507

1✔
508
        // TODO: Investigate whether to move schedulers here or not
1✔
509

1✔
510
        metrics.ServerTicksFired.Inc()
1✔
511

1✔
512
        // TickInterval is the interval at which the OnTick hooks are called. It can be adjusted
1✔
513
        // in the configuration file.
1✔
514
        return s.TickInterval, None
1✔
515
}
516

517
// Run starts the server and blocks until the server is stopped. It calls the OnRun hooks.
518
func (s *Server) Run() *gerr.GatewayDError {
1✔
519
        _, span := otel.Tracer("gatewayd").Start(s.ctx, "Run")
1✔
520
        defer span.End()
1✔
521

1✔
522
        s.Logger.Info().Str("pid", strconv.Itoa(os.Getpid())).Msg("GatewayD is running")
1✔
523

1✔
524
        // Try to resolve the address and log an error if it can't be resolved
1✔
525
        addr, err := Resolve(s.Network, s.Address, s.Logger)
1✔
526
        if err != nil {
1✔
527
                s.Logger.Error().Err(err).Msg("Failed to resolve address")
×
528
                span.RecordError(err)
×
529
        }
×
530

531
        pluginTimeoutCtx, cancel := context.WithTimeout(context.Background(), s.PluginTimeout)
1✔
532
        defer cancel()
1✔
533
        // Run the OnRun hooks.
1✔
534
        // Since Run is blocking, we need to run OnRun before it.
1✔
535
        onRunData := map[string]any{"address": addr}
1✔
536
        if err != nil && err.Unwrap() != nil {
1✔
537
                onRunData["error"] = err.OriginalError.Error()
×
538
        }
×
539
        result, err := s.PluginRegistry.Run(
1✔
540
                pluginTimeoutCtx, onRunData, v1.HookName_HOOK_NAME_ON_RUN)
1✔
541
        if err != nil {
1✔
542
                s.Logger.Error().Err(err).Msg("Failed to run the hook")
×
543
                span.RecordError(err)
×
544
        }
×
545
        span.AddEvent("Ran the OnRun hooks")
1✔
546

1✔
547
        if result != nil {
2✔
548
                _ = s.PluginRegistry.ActRegistry.RunAll(result)
1✔
549

1✔
550
                if errMsg, ok := result["error"].(string); ok && errMsg != "" {
1✔
551
                        s.Logger.Error().Str("error", errMsg).Msg("Error in hook")
×
552
                }
×
553

554
                if address, ok := result["address"].(string); ok {
1✔
555
                        addr = address
×
556
                }
×
557
        }
558

559
        if action := s.OnBoot(); action != None {
1✔
560
                return nil
×
561
        }
×
562

563
        listener, origErr := net.Listen(s.Network, addr)
1✔
564
        if origErr != nil {
1✔
565
                s.Logger.Error().Err(origErr).Msg("Server failed to start listening")
×
566
                return gerr.ErrServerListenFailed.Wrap(origErr)
×
567
        }
×
568
        s.mu.Lock()
1✔
569
        s.listener = listener
1✔
570
        s.mu.Unlock()
1✔
571
        defer s.listener.Close()
1✔
572

1✔
573
        if s.listener == nil {
1✔
574
                s.Logger.Error().Msg("Server is not properly initialized")
×
575
                return nil
×
576
        }
×
577

578
        var port string
1✔
579
        s.host, port, origErr = net.SplitHostPort(s.listener.Addr().String())
1✔
580
        if origErr != nil {
1✔
581
                s.Logger.Error().Err(origErr).Msg("Failed to split host and port")
×
582
                return gerr.ErrSplitHostPortFailed.Wrap(origErr)
×
583
        }
×
584

585
        if s.port, origErr = strconv.Atoi(port); origErr != nil {
1✔
586
                s.Logger.Error().Err(origErr).Msg("Failed to convert port to integer")
×
587
                return gerr.ErrCastFailed.Wrap(origErr)
×
588
        }
×
589

590
        s.isTLSEnabled = &atomic.Bool{}
1✔
591
        s.running = &atomic.Bool{}
1✔
592

1✔
593
        go func(server *Server) {
2✔
594
                <-server.stopServer
1✔
595
                server.OnShutdown()
1✔
596
                server.Logger.Debug().Msg("Server stopped")
1✔
597
        }(s)
1✔
598

599
        go func(server *Server) {
2✔
600
                if !server.Options.EnableTicker {
1✔
601
                        return
×
602
                }
×
603

604
                for {
2✔
605
                        select {
1✔
606
                        case <-server.stopServer:
×
607
                                return
×
608
                        default:
1✔
609
                                interval, action := server.OnTick()
1✔
610
                                if action == Shutdown {
1✔
611
                                        server.OnShutdown()
×
612
                                        return
×
613
                                }
×
614
                                if interval == time.Duration(0) {
1✔
615
                                        return
×
616
                                }
×
617
                                time.Sleep(interval)
1✔
618
                        }
619
                }
620
        }(s)
621

622
        s.running.Store(true)
1✔
623

1✔
624
        var tlsConfig *tls.Config
1✔
625
        if s.EnableTLS {
1✔
626
                tlsConfig, origErr = CreateTLSConfig(s.CertFile, s.KeyFile)
×
627
                if origErr != nil {
×
628
                        s.Logger.Error().Err(origErr).Msg("Failed to create TLS config")
×
629
                        return gerr.ErrGetTLSConfigFailed.Wrap(origErr)
×
630
                }
×
631
                s.Logger.Info().Msg("TLS is enabled")
×
632
                s.isTLSEnabled.Store(true)
×
633
        } else {
1✔
634
                s.Logger.Debug().Msg("TLS is disabled")
1✔
635
        }
1✔
636

637
        for {
4✔
638
                select {
3✔
639
                case <-s.stopServer:
×
640
                        s.Logger.Info().Msg("Server stopped")
×
641
                        return nil
×
642
                default:
3✔
643
                        netConn, err := s.listener.Accept()
3✔
644
                        if err != nil {
4✔
645
                                if !s.running.Load() {
2✔
646
                                        return nil
1✔
647
                                }
1✔
648
                                s.Logger.Error().Err(err).Msg("Failed to accept connection")
×
649
                                return gerr.ErrAcceptFailed.Wrap(err)
×
650
                        }
651

652
                        conn := NewConnWrapper(ConnWrapper{
2✔
653
                                NetConn:          netConn,
2✔
654
                                TLSConfig:        tlsConfig,
2✔
655
                                HandshakeTimeout: s.HandshakeTimeout,
2✔
656
                        })
2✔
657

2✔
658
                        if out, action := s.OnOpen(conn); action != None {
2✔
659
                                if _, err := conn.Write(out); err != nil {
×
660
                                        s.Logger.Error().Err(err).Msg("Failed to write to connection")
×
661
                                }
×
662
                                _ = conn.Close()
×
663
                                if action == Shutdown {
×
664
                                        s.OnShutdown()
×
665
                                        return nil
×
666
                                }
×
667
                        }
668
                        s.mu.Lock()
2✔
669
                        s.connections++
2✔
670
                        s.mu.Unlock()
2✔
671

2✔
672
                        // OnTraffic blocks until both traffic goroutines (client->server
2✔
673
                        // and server->client) have finished. After it returns, we
2✔
674
                        // decrement the connection counter and run the OnClose hooks.
2✔
675
                        // During shutdown, proxy.Shutdown() closes connections which
2✔
676
                        // unblocks OnTraffic, so this goroutine always completes.
2✔
677
                        go func(server *Server, conn *ConnWrapper) {
4✔
678
                                action := server.OnTraffic(conn)
2✔
679

2✔
680
                                server.mu.Lock()
2✔
681
                                server.connections--
2✔
682
                                server.mu.Unlock()
2✔
683

2✔
684
                                if action == Close {
4✔
685
                                        server.OnClose(conn, err)
2✔
686
                                }
2✔
687
                        }(s, conn)
688
                }
689
        }
690
}
691

692
// Shutdown stops the server.
693
func (s *Server) Shutdown() {
1✔
694
        _, span := otel.Tracer("gatewayd").Start(s.ctx, "Shutdown")
1✔
695
        defer span.End()
1✔
696

1✔
697
        // Set the server status to stopped before shutting down proxies, so that
1✔
698
        // OnClose (triggered by proxy.Shutdown closing connections) can detect the
1✔
699
        // shutdown-in-progress state and avoid spurious error logs.
1✔
700
        s.mu.Lock()
1✔
701
        s.Status = config.Stopped
1✔
702
        s.mu.Unlock()
1✔
703
        s.running.Store(false)
1✔
704

1✔
705
        for _, proxy := range s.Proxies {
3✔
706
                // Shutdown the proxy.
2✔
707
                proxy.Shutdown()
2✔
708
        }
2✔
709

710
        // Shutdown the server.
711
        var err error
1✔
712
        if s.listener != nil {
2✔
713
                if err = s.listener.Close(); err != nil {
1✔
714
                        s.Logger.Error().Err(err).Msg("Failed to close listener")
×
715
                }
×
716
        } else {
×
717
                s.Logger.Error().Msg("Listener is not initialized")
×
718
        }
×
719

720
        select {
1✔
721
        case <-s.stopServer:
×
722
                s.Logger.Info().Msg("Server stopped")
×
723
        default:
1✔
724
                s.stopServer <- struct{}{}
1✔
725
                close(s.stopServer)
1✔
726
        }
727

728
        if err != nil {
1✔
729
                s.Logger.Error().Err(err).Msg("Failed to shutdown server")
×
730
                span.RecordError(err)
×
731
        }
×
732
}
733

734
// IsRunning returns true if the server is running.
735
func (s *Server) IsRunning() bool {
3✔
736
        _, span := otel.Tracer("gatewayd").Start(s.ctx, "IsRunning")
3✔
737
        defer span.End()
3✔
738
        span.SetAttributes(attribute.Bool("status", s.Status == config.Running))
3✔
739

3✔
740
        s.mu.Lock()
3✔
741
        defer s.mu.Unlock()
3✔
742
        return s.Status == config.Running
3✔
743
}
3✔
744

745
// NewServer creates a new server.
746
func NewServer(
747
        ctx context.Context,
748
        srv Server,
749
) *Server {
1✔
750
        serverCtx, span := otel.Tracer(config.TracerName).Start(ctx, "NewServer")
1✔
751
        defer span.End()
1✔
752

1✔
753
        // Create the server.
1✔
754
        server := Server{
1✔
755
                ctx:                        serverCtx,
1✔
756
                GroupName:                  srv.GroupName,
1✔
757
                Network:                    srv.Network,
1✔
758
                Address:                    srv.Address,
1✔
759
                Options:                    srv.Options,
1✔
760
                TickInterval:               srv.TickInterval,
1✔
761
                Status:                     config.Stopped,
1✔
762
                EnableTLS:                  srv.EnableTLS,
1✔
763
                CertFile:                   srv.CertFile,
1✔
764
                KeyFile:                    srv.KeyFile,
1✔
765
                HandshakeTimeout:           srv.HandshakeTimeout,
1✔
766
                Proxies:                    srv.Proxies,
1✔
767
                Logger:                     srv.Logger,
1✔
768
                PluginRegistry:             srv.PluginRegistry,
1✔
769
                PluginTimeout:              srv.PluginTimeout,
1✔
770
                mu:                         &sync.RWMutex{},
1✔
771
                connections:                0,
1✔
772
                running:                    &atomic.Bool{},
1✔
773
                stopServer:                 make(chan struct{}),
1✔
774
                connectionToProxyMap:       &sync.Map{},
1✔
775
                LoadbalancerStrategyName:   srv.LoadbalancerStrategyName,
1✔
776
                LoadbalancerRules:          srv.LoadbalancerRules,
1✔
777
                LoadbalancerConsistentHash: srv.LoadbalancerConsistentHash,
1✔
778
                RaftNode:                   srv.RaftNode,
1✔
779
        }
1✔
780

1✔
781
        // Try to resolve the address and log an error if it can't be resolved.
1✔
782
        addr, err := Resolve(server.Network, server.Address, srv.Logger)
1✔
783
        if err != nil {
1✔
784
                srv.Logger.Error().Err(err).Msg("Failed to resolve address")
×
785
                span.AddEvent(err.Error())
×
786
        }
×
787

788
        if addr != "" {
2✔
789
                server.Address = addr
1✔
790
                srv.Logger.Debug().Str("address", addr).Msg("Resolved address")
1✔
791
                srv.Logger.Info().Str("address", addr).Msg("GatewayD is listening")
1✔
792
        } else {
1✔
793
                srv.Logger.Error().Msg("Failed to resolve address")
×
794
                srv.Logger.Warn().Str("address", server.Address).Msg(
×
795
                        "GatewayD is listening on an unresolved address")
×
796
        }
×
797

798
        st, err := NewLoadBalancerStrategy(&server)
1✔
799
        if err != nil {
1✔
800
                srv.Logger.Error().Err(err).Msg("Failed to create a loadbalancer strategy")
×
801
        }
×
802
        server.loadbalancerStrategy = st
1✔
803

1✔
804
        server.initializeProxies()
1✔
805

1✔
806
        return &server
1✔
807
}
808

809
// CountConnections returns the current number of connections.
810
func (s *Server) CountConnections() int {
7✔
811
        s.mu.RLock()
7✔
812
        defer s.mu.RUnlock()
7✔
813
        return int(s.connections)
7✔
814
}
7✔
815

816
// IsTLSEnabled returns true if TLS is enabled.
817
func (s *Server) IsTLSEnabled() bool {
×
818
        if s.isTLSEnabled == nil {
×
819
                return false
×
820
        }
×
821
        return s.isTLSEnabled.Load()
×
822
}
823

824
// GetProxyForConnection returns the proxy associated with the given connection.
825
func (s *Server) GetProxyForConnection(conn *ConnWrapper) (IProxy, bool) {
16✔
826
        proxy, exists := s.connectionToProxyMap.Load(conn)
16✔
827
        if !exists {
16✔
828
                return nil, false
×
829
        }
×
830

831
        if proxy, ok := proxy.(IProxy); ok {
32✔
832
                return proxy, true
16✔
833
        }
16✔
834

835
        return nil, false
×
836
}
837

838
// RemoveConnectionFromMap removes the given connection from the connection-to-proxy map.
839
func (s *Server) RemoveConnectionFromMap(conn *ConnWrapper) {
2✔
840
        s.connectionToProxyMap.Delete(conn)
2✔
841
}
2✔
842

843
// Initialize the map when creating proxies.
844
func (s *Server) initializeProxies() {
6✔
845
        s.ProxyByBlock = make(map[string]IProxy)
6✔
846
        for _, proxy := range s.Proxies {
23✔
847
                s.ProxyByBlock[proxy.GetBlockName()] = proxy
17✔
848
        }
17✔
849
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc