• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

gatewayd-io / gatewayd / 13485346862

23 Feb 2025 05:33PM UTC coverage: 63.036% (+0.2%) from 62.872%
13485346862

push

github

web-flow
Add Raft cluster peer management (GetPeers, AddPeer, RemovePeer) (#663)

* feat(raft): add peer discovery and cluster join functionality

Adds support for automatic peer discovery and cluster joining for non-bootstrap nodes.
Key changes:

- Add AddPeer RPC endpoint to allow nodes to join an existing cluster
- Implement TryConnectToCluster() to handle automatic cluster joining
- Forward AddPeer requests to leader if received by follower
- Add protobuf definitions for AddPeer request/response
- Update .gitignore to exclude raft node data files

This change allows new nodes to automatically discover and join an existing cluster
by attempting to connect to configured peers until successful. Non-leader nodes
will forward join requests to the current leader.

* test: add test cases for Raft AddPeer functionality

Add unit tests to verify the AddPeer behavior in both leader and follower nodes:
- Test successful peer addition when node is leader

* test: enhance TestAddPeer to verify peer addition for leader and follower nodes

- Updated TestAddPeer to include checks for adding peers when the node is a leader and a follower.
- Introduced temporary directories for each node to ensure isolated testing environments.
- Added assertions to confirm that both new peers are successfully integrated into the cluster.
- Improved test reliability by implementing a loop to wait for both nodes to join the cluster before completing the test.

* feat(raft): implement RemovePeer functionality and associated tests

- Added RemovePeer RPC endpoint to the Raft service, allowing nodes to remove peers from the cluster.
- Introduced RemovePeerRequest and RemovePeerResponse message types in the protobuf definitions.
- Updated RaftNode to handle peer removal, including forwarding requests to the leader if the node is not the leader.
- Enhanced the README documentation to include details about the new RemovePeerRequest and RemovePeerResponse.
- Implemented unit tests for the RemovePee... (continued)

692 of 1121 new or added lines in 13 files covered. (61.73%)

7 existing lines in 4 files now uncovered.

5875 of 9320 relevant lines covered (63.04%)

19.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.39
/api/api.go
1
//go:build !embed
2

3
package api
4

5
import (
6
        "context"
7
        "encoding/json"
8

9
        sdkPlugin "github.com/gatewayd-io/gatewayd-plugin-sdk/plugin"
10
        v1 "github.com/gatewayd-io/gatewayd/api/v1"
11
        "github.com/gatewayd-io/gatewayd/config"
12
        "github.com/gatewayd-io/gatewayd/metrics"
13
        "github.com/gatewayd-io/gatewayd/network"
14
        "github.com/gatewayd-io/gatewayd/plugin"
15
        "github.com/gatewayd-io/gatewayd/pool"
16
        "github.com/gatewayd-io/gatewayd/raft"
17
        hcraft "github.com/hashicorp/raft"
18
        "github.com/rs/zerolog"
19
        "go.opentelemetry.io/otel"
20
        "google.golang.org/grpc/codes"
21
        "google.golang.org/grpc/status"
22
        "google.golang.org/protobuf/types/known/emptypb"
23
        "google.golang.org/protobuf/types/known/structpb"
24
)
25

26
type Options struct {
27
        Logger      zerolog.Logger
28
        GRPCNetwork string
29
        GRPCAddress string
30
        HTTPAddress string
31
        Servers     map[string]*network.Server
32
        RaftNode    *raft.Node
33
}
34

35
type API struct {
36
        v1.GatewayDAdminAPIServiceServer
37

38
        // Tracer context.
39
        ctx context.Context //nolint:containedctx
40

41
        Options        *Options
42
        Config         *config.Config
43
        PluginRegistry *plugin.Registry
44
        Pools          map[string]map[string]*pool.Pool
45
        Proxies        map[string]map[string]*network.Proxy
46
        Servers        map[string]*network.Server
47
}
48

49
// Version returns the version information of the GatewayD.
50
func (a *API) Version(context.Context, *emptypb.Empty) (*v1.VersionResponse, error) {
3✔
51
        _, span := otel.Tracer(config.TracerName).Start(a.ctx, "Get Version")
3✔
52
        defer span.End()
3✔
53

3✔
54
        metrics.APIRequests.WithLabelValues("GET", "/v1/GatewayDPluginService/Version").Inc()
3✔
55
        return &v1.VersionResponse{
3✔
56
                Version:     config.Version,
3✔
57
                VersionInfo: config.VersionInfo(),
3✔
58
        }, nil
3✔
59
}
3✔
60

61
// GetGlobalConfig returns the global configuration of the GatewayD.
62
//
63
//nolint:wrapcheck
64
func (a *API) GetGlobalConfig(_ context.Context, group *v1.Group) (*structpb.Struct, error) {
3✔
65
        _, span := otel.Tracer(config.TracerName).Start(a.ctx, "Getting Global Config")
3✔
66
        defer span.End()
3✔
67

3✔
68
        var (
3✔
69
                jsonData []byte
3✔
70
                global   map[string]any
3✔
71
                err      error
3✔
72
        )
3✔
73

3✔
74
        if group.GetGroupName() == "" {
4✔
75
                jsonData, err = json.Marshal(a.Config.Global)
1✔
76
        } else {
3✔
77
                configGroup := a.Config.Global.Filter(group.GetGroupName())
2✔
78
                if configGroup == nil {
3✔
79
                        metrics.APIRequestsErrors.WithLabelValues(
1✔
80
                                "GET", "/v1/GatewayDPluginService/GetGlobalConfig", codes.NotFound.String(),
1✔
81
                        ).Inc()
1✔
82
                        return nil, status.Error(codes.NotFound, "group not found")
1✔
83
                }
1✔
84
                jsonData, err = json.Marshal(configGroup)
1✔
85
        }
86
        if err != nil {
2✔
87
                metrics.APIRequestsErrors.WithLabelValues(
×
88
                        "GET", "/v1/GatewayDPluginService/GetGlobalConfig", codes.Internal.String(),
×
89
                ).Inc()
×
90
                a.Options.Logger.Err(err).Msg("GetGroupName is nil")
×
91
                span.RecordError(err)
×
92
                return nil, status.Errorf(codes.Internal, "failed to marshal global config: %v", err)
×
93
        }
×
94

95
        err = json.Unmarshal(jsonData, &global)
2✔
96
        if err != nil {
2✔
97
                metrics.APIRequestsErrors.WithLabelValues(
×
98
                        "GET", "/v1/GatewayDPluginService/GetGlobalConfig", codes.Internal.String(),
×
99
                ).Inc()
×
100
                a.Options.Logger.Err(err).Msg("Failed to marshal global config")
×
101
                span.RecordError(err)
×
102
                return nil, status.Errorf(codes.Internal, "failed to marshal global config: %v", err)
×
103
        }
×
104

105
        globalConfig, err := structpb.NewStruct(global)
2✔
106
        if err != nil {
2✔
107
                metrics.APIRequestsErrors.WithLabelValues(
×
108
                        "GET", "/v1/GatewayDPluginService/GetGlobalConfig", codes.Internal.String(),
×
109
                ).Inc()
×
110
                a.Options.Logger.Err(err).Msg("Failed to marshal global config")
×
111
                span.RecordError(err)
×
112
                return nil, status.Errorf(codes.Internal, "failed to marshal global config: %v", err)
×
113
        }
×
114

115
        metrics.APIRequests.WithLabelValues("GET", "/v1/GatewayDPluginService/GetGlobalConfig").Inc()
2✔
116
        return globalConfig, nil
2✔
117
}
118

119
// GetPluginConfig returns the plugin configuration of the GatewayD.
120
func (a *API) GetPluginConfig(context.Context, *emptypb.Empty) (*structpb.Struct, error) {
1✔
121
        _, span := otel.Tracer(config.TracerName).Start(a.ctx, "Get GetPlugin Config")
1✔
122
        defer span.End()
1✔
123

1✔
124
        jsonData, err := json.Marshal(a.Config.Plugin)
1✔
125
        if err != nil {
1✔
126
                metrics.APIRequestsErrors.WithLabelValues(
×
127
                        "GET", "/v1/GatewayDPluginService/GetPluginConfig", codes.Internal.String(),
×
128
                ).Inc()
×
129
                a.Options.Logger.Err(err).Msg("Failed to marshal plugin config")
×
130
                span.RecordError(err)
×
131
                return nil, status.Errorf(codes.Internal, "failed to marshal plugin config: %v", err)
×
132
        }
×
133

134
        var pluginConfigMap map[string]any
1✔
135

1✔
136
        err = json.Unmarshal(jsonData, &pluginConfigMap)
1✔
137
        if err != nil {
1✔
138
                metrics.APIRequestsErrors.WithLabelValues(
×
139
                        "GET", "/v1/GatewayDPluginService/GetPluginConfig", codes.Internal.String(),
×
140
                ).Inc()
×
141
                a.Options.Logger.Err(err).Msg("Failed to unmarshal plugin config")
×
142
                span.RecordError(err)
×
143
                return nil, status.Errorf(codes.Internal, "failed to unmarshal plugin config: %v", err)
×
144
        }
×
145

146
        pluginConfig, err := structpb.NewStruct(pluginConfigMap)
1✔
147
        if err != nil {
1✔
148
                metrics.APIRequestsErrors.WithLabelValues(
×
149
                        "GET", "/v1/GatewayDPluginService/GetPluginConfig", codes.Internal.String(),
×
150
                ).Inc()
×
151
                a.Options.Logger.Err(err).Msg("Failed to marshal plugin config")
×
152
                span.RecordError(err)
×
153
                return nil, status.Errorf(codes.Internal, "failed to marshal plugin config: %v", err)
×
154
        }
×
155

156
        metrics.APIRequests.WithLabelValues("GET", "/v1/GatewayDPluginService/GetPluginConfig").Inc()
1✔
157
        return pluginConfig, nil
1✔
158
}
159

160
// GetPlugins returns the active plugin configuration of the GatewayD.
161
func (a *API) GetPlugins(context.Context, *emptypb.Empty) (*v1.PluginConfigs, error) {
2✔
162
        _, span := otel.Tracer(config.TracerName).Start(a.ctx, "Get Plugins")
2✔
163
        defer span.End()
2✔
164

2✔
165
        plugins := make([]*v1.PluginConfig, 0)
2✔
166
        a.PluginRegistry.ForEach(
2✔
167
                func(pluginID sdkPlugin.Identifier, plugIn *plugin.Plugin) {
3✔
168
                        requires := make(map[string]string)
1✔
169
                        if plugIn.Requires != nil {
2✔
170
                                for _, r := range plugIn.Requires {
2✔
171
                                        requires[r.Name] = r.Version
1✔
172
                                }
1✔
173
                        }
174

175
                        hooks := make([]int32, 0)
1✔
176
                        for _, hook := range plugIn.Hooks {
2✔
177
                                hooks = append(hooks, int32(hook.Number()))
1✔
178
                        }
1✔
179

180
                        plugins = append(plugins, &v1.PluginConfig{
1✔
181
                                Id: &v1.PluginID{
1✔
182
                                        Name:      pluginID.Name,
1✔
183
                                        Version:   pluginID.Version,
1✔
184
                                        RemoteUrl: pluginID.RemoteURL,
1✔
185
                                        Checksum:  pluginID.Checksum,
1✔
186
                                },
1✔
187
                                Description: plugIn.Description,
1✔
188
                                Authors:     plugIn.Authors,
1✔
189
                                License:     plugIn.License,
1✔
190
                                ProjectUrl:  plugIn.ProjectURL,
1✔
191
                                Config:      plugIn.Config,
1✔
192
                                Hooks:       hooks,
1✔
193
                                Requires:    requires,
1✔
194
                                Tags:        plugIn.Tags,
1✔
195
                                Categories:  plugIn.Categories,
1✔
196
                        })
1✔
197
                },
198
        )
199

200
        metrics.APIRequests.WithLabelValues("GET", "/v1/GatewayDPluginService/GetPlugins").Inc()
2✔
201
        return &v1.PluginConfigs{
2✔
202
                Configs: plugins,
2✔
203
        }, nil
2✔
204
}
205

206
// GetPools returns the pool configuration of the GatewayD.
207
func (a *API) GetPools(context.Context, *emptypb.Empty) (*structpb.Struct, error) {
2✔
208
        _, span := otel.Tracer(config.TracerName).Start(a.ctx, "Get Pools")
2✔
209
        defer span.End()
2✔
210

2✔
211
        pools := make(map[string]any)
2✔
212

2✔
213
        for configGroupName, configGroupPools := range a.Pools {
3✔
214
                groupPools := make(map[string]any)
1✔
215
                for name, p := range configGroupPools {
2✔
216
                        groupPools[name] = map[string]any{
1✔
217
                                "cap":  p.Cap(),
1✔
218
                                "size": p.Size(),
1✔
219
                        }
1✔
220
                }
1✔
221
                pools[configGroupName] = groupPools
1✔
222
        }
223

224
        poolsConfig, err := structpb.NewStruct(pools)
2✔
225
        if err != nil {
2✔
226
                metrics.APIRequestsErrors.WithLabelValues(
×
227
                        "GET", "/v1/GatewayDPluginService/GetPools", codes.Internal.String(),
×
228
                ).Inc()
×
229
                a.Options.Logger.Err(err).Msg("Failed to marshal pools config")
×
230
                return nil, status.Errorf(codes.Internal, "failed to marshal pools config: %v", err)
×
231
        }
×
232

233
        metrics.APIRequests.WithLabelValues("GET", "/v1/GatewayDPluginService/GetPools").Inc()
2✔
234
        return poolsConfig, nil
2✔
235
}
236

237
// GetProxies returns the proxy configuration of the GatewayD.
238
func (a *API) GetProxies(context.Context, *emptypb.Empty) (*structpb.Struct, error) {
1✔
239
        _, span := otel.Tracer(config.TracerName).Start(a.ctx, "Get Proxies")
1✔
240
        defer span.End()
1✔
241

1✔
242
        // Create a new map to hold the flattened proxies data
1✔
243
        proxies := make(map[string]any)
1✔
244

1✔
245
        for configGroupName, configGroupProxies := range a.Proxies {
2✔
246
                // Create a map for each configuration group
1✔
247
                groupProxies := make(map[string]any)
1✔
248
                for name, proxy := range configGroupProxies {
2✔
249
                        available := make([]any, 0)
1✔
250
                        for _, c := range proxy.AvailableConnectionsString() {
2✔
251
                                available = append(available, c)
1✔
252
                        }
1✔
253

254
                        busy := make([]any, 0)
1✔
255
                        for _, conn := range proxy.BusyConnectionsString() {
1✔
256
                                busy = append(busy, conn)
×
257
                        }
×
258

259
                        groupProxies[name] = map[string]any{
1✔
260
                                "available": available,
1✔
261
                                "busy":      busy,
1✔
262
                                "total":     len(available) + len(busy),
1✔
263
                        }
1✔
264
                }
265

266
                proxies[configGroupName] = groupProxies
1✔
267
        }
268

269
        proxiesConfig, err := structpb.NewStruct(proxies)
1✔
270
        if err != nil {
1✔
271
                metrics.APIRequestsErrors.WithLabelValues(
×
272
                        "GET", "/v1/GatewayDPluginService/GetProxies", codes.Internal.String(),
×
273
                ).Inc()
×
274
                a.Options.Logger.Err(err).Msg("Failed to marshal proxies config")
×
275
                return nil, status.Errorf(codes.Internal, "failed to marshal proxies config: %v", err)
×
276
        }
×
277

278
        metrics.APIRequests.WithLabelValues("GET", "/v1/GatewayDPluginService/GetProxies").Inc()
1✔
279
        return proxiesConfig, nil
1✔
280
}
281

282
// GetServers returns the server configuration of the GatewayD.
283
func (a *API) GetServers(context.Context, *emptypb.Empty) (*structpb.Struct, error) {
1✔
284
        _, span := otel.Tracer(config.TracerName).Start(a.ctx, "Get Servers")
1✔
285
        defer span.End()
1✔
286

1✔
287
        servers := make(map[string]any)
1✔
288
        for name, server := range a.Servers {
2✔
289
                servers[name] = map[string]any{
1✔
290
                        "network":      server.Network,
1✔
291
                        "address":      server.Address,
1✔
292
                        "status":       uint(server.Status),
1✔
293
                        "tickInterval": server.TickInterval.Nanoseconds(),
1✔
294
                        "loadBalancer": map[string]any{"strategy": server.LoadbalancerStrategyName},
1✔
295
                        "isTLSEnabled": server.IsTLSEnabled(),
1✔
296
                }
1✔
297
        }
1✔
298

299
        serversConfig, err := structpb.NewStruct(servers)
1✔
300
        if err != nil {
1✔
301
                metrics.APIRequestsErrors.WithLabelValues(
×
302
                        "GET", "/v1/GatewayDPluginService/GetServers", codes.Internal.String(),
×
303
                ).Inc()
×
304
                a.Options.Logger.Err(err).Msg("Failed to marshal servers config")
×
305
                return nil, status.Errorf(codes.Internal, "failed to marshal servers config: %v", err)
×
306
        }
×
307

308
        metrics.APIRequests.WithLabelValues("GET", "/v1/GatewayDPluginService/GetServers").Inc()
1✔
309
        return serversConfig, nil
1✔
310
}
311

312
// GetPeers returns the raft peers configuration of the GatewayD.
313
func (a *API) GetPeers(context.Context, *emptypb.Empty) (*structpb.Struct, error) {
2✔
314
        _, span := otel.Tracer(config.TracerName).Start(a.ctx, "Get Peers")
2✔
315
        defer span.End()
2✔
316

2✔
317
        if a.Options.RaftNode == nil {
3✔
318
                return nil, status.Errorf(codes.Unavailable, "raft node not initialized")
1✔
319
        }
1✔
320

321
        peers := a.Options.RaftNode.GetPeers()
1✔
322
        peerMap := make(map[string]any)
1✔
323

1✔
324
        // Get current leader ID for comparison
1✔
325
        _, leaderID := a.Options.RaftNode.GetState()
1✔
326

1✔
327
        for _, peer := range peers {
2✔
328
                // Determine peer status
1✔
329
                var status string
1✔
330
                switch {
1✔
331
                case string(peer.ID) == string(leaderID):
1✔
332
                        status = "Leader"
1✔
NEW
333
                case peer.Suffrage == hcraft.Voter:
×
NEW
334
                        status = "Follower"
×
NEW
335
                case peer.Suffrage == hcraft.Nonvoter:
×
NEW
336
                        status = "NonVoter"
×
NEW
337
                default:
×
NEW
338
                        status = "Unknown"
×
339
                }
340

341
                peerMap[string(peer.ID)] = map[string]any{
1✔
342
                        "id":       string(peer.ID),
1✔
343
                        "address":  string(peer.Address),
1✔
344
                        "status":   status,
1✔
345
                        "suffrage": peer.Suffrage.String(),
1✔
346
                }
1✔
347
        }
348

349
        raftPeers, err := structpb.NewStruct(peerMap)
1✔
350
        if err != nil {
1✔
NEW
351
                metrics.APIRequestsErrors.WithLabelValues(
×
NEW
352
                        "GET", "/v1/raft/peers", codes.Internal.String(),
×
NEW
353
                ).Inc()
×
NEW
354
                a.Options.Logger.Err(err).Msg("Failed to marshal peers config")
×
NEW
355
                return nil, status.Errorf(codes.Internal, "failed to marshal peers config: %v", err)
×
NEW
356
        }
×
357

358
        metrics.APIRequests.WithLabelValues("GET", "/v1/raft/peers").Inc()
1✔
359
        return raftPeers, nil
1✔
360
}
361

362
// AddPeer adds a new peer to the raft cluster.
363
func (a *API) AddPeer(ctx context.Context, req *v1.AddPeerRequest) (*v1.AddPeerResponse, error) {
5✔
364
        _, span := otel.Tracer(config.TracerName).Start(ctx, "Add Peer")
5✔
365
        defer span.End()
5✔
366

5✔
367
        if a.Options.RaftNode == nil {
6✔
368
                return nil, status.Errorf(codes.Unavailable, "AddPeer: raft node not initialized")
1✔
369
        }
1✔
370

371
        if req.GetPeerId() == "" || req.GetAddress() == "" || req.GetGrpcAddress() == "" {
7✔
372
                return nil, status.Errorf(codes.InvalidArgument, "AddPeer: peer id, address, and grpc address are required")
3✔
373
        }
3✔
374

375
        err := a.Options.RaftNode.AddPeer(ctx, req.GetPeerId(), req.GetAddress(), req.GetGrpcAddress())
1✔
376
        if err != nil {
1✔
NEW
377
                metrics.APIRequestsErrors.WithLabelValues(
×
NEW
378
                        "POST", "/v1/raft/peers", codes.Internal.String(),
×
NEW
379
                ).Inc()
×
NEW
380
                a.Options.Logger.Err(err).Msg("Failed to add peer")
×
NEW
381
                return nil, status.Errorf(codes.Internal, "AddPeer: failed to add peer: %v", err)
×
NEW
382
        }
×
383

384
        metrics.APIRequests.WithLabelValues("POST", "/v1/raft/peers").Inc()
1✔
385
        return &v1.AddPeerResponse{Success: true}, nil
1✔
386
}
387

388
// RemovePeer removes a peer from the raft cluster.
389
func (a *API) RemovePeer(ctx context.Context, req *v1.RemovePeerRequest) (*v1.RemovePeerResponse, error) {
3✔
390
        _, span := otel.Tracer(config.TracerName).Start(ctx, "Remove Peer")
3✔
391
        defer span.End()
3✔
392

3✔
393
        if a.Options.RaftNode == nil {
4✔
394
                return nil, status.Errorf(codes.Unavailable, "RemovePeer: raft node not initialized")
1✔
395
        }
1✔
396

397
        if req.GetPeerId() == "" {
2✔
NEW
398
                return nil, status.Errorf(codes.InvalidArgument, "RemovePeer: peer id is required")
×
NEW
399
        }
×
400

401
        err := a.Options.RaftNode.RemovePeer(ctx, req.GetPeerId())
2✔
402
        if err != nil {
3✔
403
                metrics.APIRequestsErrors.WithLabelValues(
1✔
404
                        "DELETE", "/v1/raft/peers", codes.Internal.String(),
1✔
405
                ).Inc()
1✔
406
                a.Options.Logger.Err(err).Msg("Failed to remove peer")
1✔
407
                return nil, status.Errorf(codes.Internal, "RemovePeer: failed to remove peer: %v", err)
1✔
408
        }
1✔
409

410
        metrics.APIRequests.WithLabelValues("DELETE", "/v1/raft/peers").Inc()
1✔
411
        return &v1.RemovePeerResponse{Success: true}, nil
1✔
412
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc