• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

openconfig / gribigo / 6895831444

16 Nov 2023 07:58PM UTC coverage: 83.808% (+0.1%) from 83.694%
6895831444

push

github

web-flow
Refactor RIB Add methods to return replaced entries. (#220)

* Working commit.

* Refactor to fix incorrect replace reference counting.

* Fix unit testing, add refcount test.

* Fix static analysis errors.

* 🧹: tidy go.mod

* Clean up logging.

* More `go.mod` housekeeping. 🧹

* Refactor RIB Add routines to return replaced entries.

* Fix implementation of Flush.

 * (M) rib/rib(_test).go
  - Replace `Flush` implementation to be at the RIB level to allow
    for cross-NI references.

* Update server.go for new Flush call.

* Add small binary for validation of gRIBIgo without lemming.

 * (A) cmd/stubrtr/...
  - Add a binary that starts just the gRIBIgo server without gNMI or an
    underlying dataplane. This is lighterweight than lemming and
    allows debugging of issues that are solely in gribigo more easily.

191 of 256 new or added lines in 2 files covered. (74.61%)

5 existing lines in 1 file now uncovered.

5528 of 6596 relevant lines covered (83.81%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.71
/server/server.go
1
// Copyright 2021 Google LLC
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//      http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
// Package server defines the basic elements of a gRIBI server that uses
16
// an in-memory data store for the AFT contents.
17
package server
18

19
import (
20
        "bytes"
21
        "context"
22
        "fmt"
23
        "io"
24
        "sync"
25
        "time"
26

27
        log "github.com/golang/glog"
28
        "github.com/google/uuid"
29
        "github.com/openconfig/gribigo/rib"
30
        "google.golang.org/grpc/codes"
31
        "google.golang.org/grpc/status"
32
        "google.golang.org/protobuf/encoding/prototext"
33
        "lukechampine.com/uint128"
34

35
        spb "github.com/openconfig/gribi/v1/proto/service"
36
)
37

38
const (
39
        // DefaultNetworkInstanceName specifies the name of the default network instance on the system.
40
        DefaultNetworkInstanceName = "DEFAULT"
41
)
42

43
// unixTS is used to determine the current unix timestamp in nanoseconds since the
44
// epoch. It is defined such that it can be overloaded by unit tests.
45
var unixTS = time.Now().UnixNano
46

47
// Server implements the gRIBI service.
48
type Server struct {
49
        *spb.UnimplementedGRIBIServer
50

51
        // csMu protects the cs map.
52
        csMu sync.RWMutex
53
        // cs stores the state for clients that are connected to the server
54
        // this allows the server perform operations such as ensuring consistency
55
        // across different connected clients. The key of the map is a unique string
56
        // identifying each client, which in this implementation is a UUID generated
57
        // at connection time. The client ID is scoped as such to ensure that we don't
58
        // have any state as to where the current connection came from (source address,
59
        // or TCP session handle for example).
60
        //
61
        // This design is acceptable because there is no case in which we have an
62
        // operation within gRIBI whereby a client references some state that is
63
        // associated with a prior connection that it had.
64
        cs map[string]*clientState
65

66
        // elecMu protects the curElecID and curMaster values.
67
        elecMu sync.RWMutex
68
        // curElecID stores the current electionID for cases where the server is
69
        // operating in SINGLE_PRIMARY mode.
70
        curElecID *spb.Uint128
71
        // curMaster stores the current master's UUID.
72
        curMaster string
73

74
        // masterRIB is the single gRIBI RIB that is used for a server that runs with
75
        // a single elected master, where a single RIB is written to by all clients.
76
        masterRIB *rib.RIB
77
}
78

79
// clientState stores information that relates to a specific client
80
// connected to the gRIBI server.
81
type clientState struct {
82
        // params stores parameters that are associated with a single
83
        // client of the server. These parameters are advertised as the
84
        // first message on a Modify stream. It is an error to send
85
        // parameters in any other context (i.e., after other ModifyRequest
86
        // messages have been sent, or to adjust these parameters).
87
        params *clientParams
88
        // setParams indicates whether the parameters have been explicitly
89
        // written.
90
        setParams bool
91
        // lastElecID stores the last election ID that the client
92
        // sent to the server. This is used to validate whether the election
93
        // ID in an operation matches the expected election ID.
94
        lastElecID *spb.Uint128
95
}
96

97
// DeepCopy returns a copy of the clientState struct.
98
func (cs *clientState) DeepCopy() *clientState {
1✔
99
        if cs.params == nil {
1✔
100
                return &clientState{}
×
101
        }
×
102
        return &clientState{
1✔
103
                params: cs.params.DeepCopy(),
1✔
104
        }
1✔
105
}
106

107
// clientParams stores parameters that are set as part of the Modify RPC
108
// initial handshake for a particular client.
109
type clientParams struct {
110
        // Persist indicates whether the client's AFT entries should be
111
        // persisted even after the client disconnects.
112
        Persist bool
113

114
        // ExpectElecID indicates whether the client expects to send
115
        // election IDs (i.e., the ClientRedundancy is SINGLE_PRIMARY).
116
        ExpectElecID bool
117

118
        // FIBAck indicates whether the client expects FIB-level
119
        // acknowledgements.
120
        FIBAck bool
121
}
122

123
// DeepCopy returns a copy of the clientParams struct.
124
func (cp *clientParams) DeepCopy() *clientParams {
1✔
125
        return &clientParams{
1✔
126
                Persist:      cp.Persist,
1✔
127
                ExpectElecID: cp.ExpectElecID,
1✔
128
                FIBAck:       cp.FIBAck,
1✔
129
        }
1✔
130
}
1✔
131

132
// Equal returns true if the candidate clientParams n is equal to the receiver cp.
133
func (cp *clientParams) Equal(n *clientParams) bool {
1✔
134
        return cp.Persist == n.Persist && cp.FIBAck == n.FIBAck && cp.ExpectElecID == n.ExpectElecID
1✔
135
}
1✔
136

137
// ServerOpt is an interface that is implemented by any options to the gRIBI server.
138
type ServerOpt interface {
139
        isServerOpt()
140
}
141

142
// WithPostChangeRIBHook specifies that the given function that is of type rib.RIBHook function
143
// should be executed after each change in the RIB.
144
func WithPostChangeRIBHook(fn rib.RIBHookFn) *postChangeRibHook {
×
145
        return &postChangeRibHook{fn: fn}
×
146
}
×
147

148
// ribHook is the internal implementation of the WithRIBHook option.
149
type postChangeRibHook struct {
150
        fn rib.RIBHookFn
151
}
152

153
// isServerOpt implements the ServerOpt interface.
154
func (*postChangeRibHook) isServerOpt() {}
×
155

156
// hasPostChangeRIBHook extracts the ribHook option from the supplied ServerOpt, returning nil
157
// if one is not found. It will return only the first argument if multiple are specified.
158
func hasPostChangeRIBHook(opt []ServerOpt) *postChangeRibHook {
1✔
159
        for _, o := range opt {
2✔
160
                if v, ok := o.(*postChangeRibHook); ok {
1✔
161
                        return v
×
162
                }
×
163
        }
164
        return nil
1✔
165
}
166

167
// WithResolvedEntryHook is a Server option that allows a function to be run for
168
// each entry that can be fully resolved within the RIB (e.g., IPv4Entry).
169
func WithRIBResolvedEntryHook(fn rib.ResolvedEntryFn) *resolvedEntryHook {
×
170
        return &resolvedEntryHook{fn: fn}
×
171
}
×
172

173
// resolvedEntryHook is the internal implementation of the WithRIBResolvedEntryHook
174
// option.
175
type resolvedEntryHook struct {
176
        fn rib.ResolvedEntryFn
177
}
178

179
// isServerOpt implements the ServerOpt interface.
180
func (r *resolvedEntryHook) isServerOpt() {}
×
181

182
// hasResolvedEntryHook returns the resolvedEntryHook from the specified options
183
// if one exists. It will return only the first argument if multiple are specified.
184
func hasResolvedEntryHook(opt []ServerOpt) *resolvedEntryHook {
1✔
185
        for _, o := range opt {
2✔
186
                if v, ok := o.(*resolvedEntryHook); ok {
1✔
187
                        return v
×
188
                }
×
189
        }
190
        return nil
1✔
191
}
192

193
// DisableRIBCheckFn specifies that the consistency checking functions should
194
// be disabled for the RIB. It is useful for a testing RIB that does not need
195
// to have working references.
196
func DisableRIBCheckFn() *disableCheckFn { return &disableCheckFn{} }
1✔
197

198
// disableCheckFn is the internal implementation of DisableRIBCheckFn.
199
type disableCheckFn struct{}
200

201
// isServerOpt implements the ServerOpt interface
202
func (*disableCheckFn) isServerOpt() {}
×
203

204
// hasDisableCheckFn checks whether the ServerOpt slice supplied contains the
205
// disableCheckFn option.
206
func hasDisableCheckFn(opt []ServerOpt) bool {
1✔
207
        for _, o := range opt {
2✔
208
                if _, ok := o.(*disableCheckFn); ok {
2✔
209
                        return true
1✔
210
                }
1✔
211
        }
212
        return false
1✔
213
}
214

215
// WithVRFs specifies that the server should be initialised with the L3VRF
216
// network instances specified in the names list. Each is created in the
217
// server's RIB such that it can be referenced.
218
func WithVRFs(names []string) *withVRFs { return &withVRFs{names: names} }
1✔
219

220
// withVRFs is the internal implementation of WithVRFs that can be read by the
221
// server.
222
type withVRFs struct {
223
        names []string
224
}
225

226
// isServerOpt implements the ServerOpt interface.
227
func (*withVRFs) isServerOpt() {}
×
228

229
// hasWithVRFs checks whether the ServerOpt slice supplied contains the withVRFs
230
// option and returns it if so.
231
func hasWithVRFs(opt []ServerOpt) []string {
1✔
232
        for _, o := range opt {
2✔
233
                if v, ok := o.(*withVRFs); ok {
2✔
234
                        return v.names
1✔
235
                }
1✔
236
        }
237
        return nil
1✔
238
}
239

240
// New creates a new gRIBI server.
241
func New(opt ...ServerOpt) (*Server, error) {
1✔
242
        ribOpt := []rib.RIBOpt{}
1✔
243
        if hasDisableCheckFn(opt) {
2✔
244
                ribOpt = append(ribOpt, rib.DisableRIBCheckFn())
1✔
245
        }
1✔
246

247
        s := &Server{
1✔
248
                cs: map[string]*clientState{},
1✔
249
                // TODO(robjs): when we implement support for ALL_PRIMARY then we might not
1✔
250
                // want to create a new RIB by default.
1✔
251
                masterRIB: rib.New(DefaultNetworkInstanceName, ribOpt...),
1✔
252
        }
1✔
253

1✔
254
        if v := hasPostChangeRIBHook(opt); v != nil {
1✔
255
                s.masterRIB.SetPostChangeHook(v.fn)
×
256
        }
×
257

258
        if v := hasResolvedEntryHook(opt); v != nil {
1✔
259
                s.masterRIB.SetResolvedEntryHook(v.fn)
×
260
        }
×
261

262
        if vrfs := hasWithVRFs(opt); vrfs != nil {
2✔
263
                for _, n := range vrfs {
2✔
264
                        if err := s.masterRIB.AddNetworkInstance(n); err != nil {
1✔
265
                                return nil, fmt.Errorf("cannot create network instance %s, %v", n, err)
×
266
                        }
×
267
                }
268
        }
269

270
        return s, nil
1✔
271
}
272

273
// Modify implements the gRIBI Modify RPC.
274
func (s *Server) Modify(ms spb.GRIBI_ModifyServer) error {
×
275
        // Initiate the per client state for this client.
×
276
        cid := uuid.New().String()
×
277
        log.V(2).Infof("creating client with ID %s", cid)
×
278
        if err := s.newClient(cid); err != nil {
×
279
                return err
×
280
        }
×
281

282
        resultChan := make(chan *spb.ModifyResponse)
×
283
        errCh := make(chan error)
×
284
        go func() {
×
285
                // Store whether this is the first message on the Modify RPC, some options - like the session
×
286
                // parameters can only be set as the first message.
×
287
                var gotmsg bool
×
288
                for {
×
289
                        in, err := ms.Recv()
×
290
                        if err == io.EOF {
×
291
                                errCh <- nil
×
292
                                return
×
293
                        }
×
294
                        if err != nil {
×
295
                                errCh <- status.Errorf(codes.Unknown, "error reading message from client, %v", err)
×
296
                                return
×
297
                        }
×
298
                        log.V(2).Infof("received message %s on Modify channel", in)
×
299

×
300
                        var (
×
301
                                res       *spb.ModifyResponse
×
302
                                skipWrite bool
×
303
                        )
×
304

×
305
                        switch {
×
306
                        case in == nil:
×
307
                                log.Errorf("received nil message on Modify channel")
×
308
                                skipWrite = true
×
309
                        case (in.Params != nil && in.ElectionId != nil), (in.Params != nil && in.Operation != nil), (in.ElectionId != nil && in.Operation != nil):
×
310
                                errCh <- status.Errorf(codes.InvalidArgument, "invalid input message, cannot specify >1 of parameters, election ID and operation simultaenously, got: %v", in)
×
311
                                return
×
312
                        case in.Params != nil:
×
313
                                var err error
×
314
                                if res, err = s.checkParams(cid, in.Params, gotmsg); err != nil {
×
315
                                        // Invalid parameters is a fatal error, so we take down the Modify RPC.
×
316
                                        errCh <- err
×
317
                                        return
×
318
                                }
×
319
                                if err := s.updateParams(cid, in.Params); err != nil {
×
320
                                        // Not being able to update parameters is a fatal error, so we take down
×
321
                                        // the Modify RPC.
×
322
                                        errCh <- err
×
323
                                        return
×
324
                                }
×
325
                        case in.ElectionId != nil:
×
326
                                var err error
×
327
                                res, err = s.runElection(cid, in.ElectionId)
×
328
                                if err != nil {
×
329
                                        errCh <- err
×
330
                                        return
×
331
                                }
×
332
                        case in.Operation != nil:
×
333
                                s.doModify(cid, in.Operation, resultChan, errCh)
×
334
                                skipWrite = true
×
335
                        default:
×
336
                                errCh <- status.Errorf(codes.Unimplemented, "unimplemented handling of message %s", in)
×
337
                                return
×
338
                        }
339

340
                        gotmsg = true
×
341
                        // write the results to result channel.
×
342
                        if !skipWrite {
×
343
                                resultChan <- res
×
344
                        }
×
345
                }
346
        }()
347

348
        resultDone := make(chan struct{})
×
349
        go func() {
×
350
                for {
×
351
                        select {
×
352
                        case res := <-resultChan:
×
353
                                // update that we have received at least one message.
×
354
                                if err := ms.Send(res); err != nil {
×
355
                                        errCh <- status.Errorf(codes.Internal, "cannot write message to client channel, %s", res)
×
356
                                        return
×
357
                                }
×
358
                        case <-resultDone:
×
359
                                return
×
360
                        }
361
                }
362
        }()
363

364
        err := <-errCh
×
365
        close(resultDone)
×
366

×
367
        // when this client goes away, we need to clean up its state.
×
368
        s.deleteClient(cid)
×
369

×
370
        return err
×
371
}
372

373
// Get implements the gRIBI Get RPC.
374
func (s *Server) Get(req *spb.GetRequest, stream spb.GRIBI_GetServer) error {
×
375
        msgCh := make(chan *spb.GetResponse)
×
376
        errCh := make(chan error)
×
377
        doneCh := make(chan struct{})
×
378
        stopCh := make(chan struct{})
×
379

×
380
        // defer a function to stop the goroutine and close all channels, since this will be called
×
381
        // when we exit, then it will stop the goroutine that we started to do
×
382
        // the get in the case that we exit due to some error.
×
383
        defer func() {
×
384
                // Non-blocking write to the stopCh, since if the goroutine has
×
385
                // already returned then it won't be listening and we'll deadlock.
×
386
                select {
×
387
                case stopCh <- struct{}{}:
×
388
                default:
×
389
                }
390
        }()
391

392
        go s.doGet(req, msgCh, doneCh, stopCh, errCh)
×
393

×
394
        var done bool
×
395

×
396
        for !done {
×
397
                select {
×
398
                case <-doneCh:
×
399
                        done = true
×
400
                case err := <-errCh:
×
401
                        return status.Errorf(codes.Internal, "cannot generate GetResponse, %v", err)
×
402
                case r := <-msgCh:
×
403
                        if err := stream.Send(r); err != nil {
×
404
                                return status.Errorf(codes.Internal, "cannot write message to client channel, %v", err)
×
405
                        }
×
406
                }
407
        }
408
        return nil
×
409
}
410

411
// Flush implements the gRIBI Flush RPC - used for removing entries from the server.
412
func (s *Server) Flush(ctx context.Context, req *spb.FlushRequest) (*spb.FlushResponse, error) {
1✔
413
        if err := s.checkFlushRequest(req); err != nil {
2✔
414
                return nil, err
1✔
415
        }
1✔
416

417
        nis := []string{}
1✔
418
        switch t := req.GetNetworkInstance().(type) {
1✔
419
        case *spb.FlushRequest_All:
1✔
420
                nis = s.masterRIB.KnownNetworkInstances()
1✔
421
        case *spb.FlushRequest_Name:
1✔
422
                if _, ok := s.masterRIB.NetworkInstanceRIB(t.Name); !ok {
2✔
423
                        return nil, addFlushErrDetailsOrReturn(status.Newf(codes.InvalidArgument, "could not find network instance %s", t.Name), &spb.FlushResponseError{
1✔
424
                                Status: spb.FlushResponseError_INVALID_NETWORK_INSTANCE,
1✔
425
                        })
1✔
426
                }
1✔
427
                nis = []string{t.Name}
1✔
428
        }
429

430
        if err := s.masterRIB.Flush(nis); err != nil {
1✔
NEW
431
                fErr, ok := err.(*rib.FlushErr)
×
432
                det := &bytes.Buffer{}
×
NEW
433
                switch ok {
×
NEW
434
                case true:
×
NEW
435
                        for _, msg := range fErr.Errs {
×
NEW
436
                                det.WriteString(fmt.Sprintf("%s\n", msg))
×
NEW
437
                        }
×
NEW
438
                default:
×
NEW
439
                        det.WriteString(fmt.Sprintf("%v", err))
×
440
                }
441
                // Always use codes.Internal here since any error (not being able to flush, or
442
                // not finding an NI that we already checked existed is some internal logic
443
                // error).
444
                return nil, status.Errorf(codes.Internal, det.String())
×
445
        }
446

447
        return &spb.FlushResponse{
1✔
448
                Timestamp: unixTS(),
1✔
449
                Result:    spb.FlushResponse_OK,
1✔
450
        }, nil
1✔
451
}
452

453
// newClient creates a new client context within the server using the specified string
454
// ID.
455
func (s *Server) newClient(id string) error {
1✔
456
        s.csMu.Lock()
1✔
457
        defer s.csMu.Unlock()
1✔
458
        if s.cs[id] != nil {
2✔
459
                return status.Errorf(codes.Internal, "cannot create new client with duplicate ID, %s", id)
1✔
460
        }
1✔
461
        s.cs[id] = &clientState{
1✔
462
                // Set to the default set of parameters.
1✔
463
                params: &clientParams{},
1✔
464
        }
1✔
465

1✔
466
        return nil
1✔
467
}
468

469
// deleteClient removes the client with the specified id from the server. It does not return
470
// an error if the client cannot be deleted, since this action is performed when the other
471
// side has already gone away, so there is no error we can return to them.
472
func (s *Server) deleteClient(id string) {
×
473
        s.csMu.Lock()
×
474
        defer s.csMu.Unlock()
×
475
        delete(s.cs, id)
×
476
}
×
477

478
// updateParams writes the parameters for the client specified by id to the server state
479
// based on the received session parameters supplied in params. It returns errors if
480
// the client is undefined, or the parameters have been set previously. It does not
481
// ensure that the parameters are consistent with other clients on the server - but
482
// rather solely updates for a particular client.
483
func (s *Server) updateParams(id string, params *spb.SessionParameters) error {
1✔
484
        cparam := &clientParams{}
1✔
485

1✔
486
        cparam.ExpectElecID = (params.Redundancy == spb.SessionParameters_SINGLE_PRIMARY)
1✔
487
        cparam.Persist = (params.Persistence == spb.SessionParameters_PRESERVE)
1✔
488
        cparam.FIBAck = (params.AckType == spb.SessionParameters_RIB_AND_FIB_ACK)
1✔
489

1✔
490
        s.csMu.Lock()
1✔
491
        defer s.csMu.Unlock()
1✔
492
        p := s.cs[id]
1✔
493
        if p == nil || p.params == nil {
2✔
494
                return status.Errorf(codes.Internal, "cannot update parameters for a client with no state, %s", id)
1✔
495
        }
1✔
496
        if p.setParams {
2✔
497
                return addModifyErrDetailsOrReturn(status.New(codes.FailedPrecondition, "cannot modify SessionParameters"), &spb.ModifyRPCErrorDetails{
1✔
498
                        Reason: spb.ModifyRPCErrorDetails_MODIFY_NOT_ALLOWED,
1✔
499
                })
1✔
500
        }
1✔
501
        s.cs[id].setParams = true
1✔
502
        s.cs[id].params = cparam
1✔
503
        return nil
1✔
504
}
505

506
// addModifyErrDetailsOrReturn takes an input status (s), and ModifyRPCErrorDetails proto (d) and appends
507
// d to s. If an error is encountered, s is returned, otherwise the appended version is returned. The return
508
// type is a Go error which can be returned directly.
509
func addModifyErrDetailsOrReturn(s *status.Status, d *spb.ModifyRPCErrorDetails) error {
1✔
510
        ns, err := s.WithDetails(d)
1✔
511
        if err != nil {
1✔
512
                return s.Err()
×
513
        }
×
514
        return ns.Err()
1✔
515
}
516

517
// checkParams validates that the parameters that were supplied by the client with the specified
518
// ID are valid within the overall server context. It returns the ModifyResponse that should be sent
519
// to the client, or the populated error.
520
func (s *Server) checkParams(id string, p *spb.SessionParameters, gotMsg bool) (*spb.ModifyResponse, error) {
1✔
521
        if p == nil {
2✔
522
                return nil, status.Newf(codes.Internal, "invalid nil parameters when checking for client %s, got: %v", id, p).Err()
1✔
523
        }
1✔
524

525
        if gotMsg {
2✔
526
                // TODO(robjs): the spec should spell out that this can't be sent after the first message is
1✔
527
                // received, whatever the type.
1✔
528
                return nil, addModifyErrDetailsOrReturn(status.New(codes.FailedPrecondition, "cannot send session parameters after another message"), &spb.ModifyRPCErrorDetails{
1✔
529
                        Reason: spb.ModifyRPCErrorDetails_MODIFY_NOT_ALLOWED,
1✔
530
                })
1✔
531
        }
1✔
532

533
        // TODO(robjs): confirm with folks what their thoughts are on whether we support persistence
534
        // other than DELETE in ALL_PRIMARY. I think that we should not support this since it means
535
        // that we need to externalise the client ID (so that the client can delete its old entries, but
536
        // not others).
537
        if p.Redundancy == spb.SessionParameters_ALL_PRIMARY && p.Persistence == spb.SessionParameters_PRESERVE {
2✔
538
                return nil, addModifyErrDetailsOrReturn(status.New(codes.FailedPrecondition, "cannot have ALL_PRIMARY client with persistence PRESERVE"), &spb.ModifyRPCErrorDetails{
1✔
539
                        Reason: spb.ModifyRPCErrorDetails_UNSUPPORTED_PARAMS,
1✔
540
                })
1✔
541
        }
1✔
542

543
        // The fake server supports both RIB and FIB ACKing (given that we do not have any real
544
        // FIB, they are basically the same :-)). It does not (currently) support ALL_PRIMARY
545
        // mode of operations.
546
        if p.Redundancy == spb.SessionParameters_ALL_PRIMARY {
2✔
547
                return nil, addModifyErrDetailsOrReturn(status.Newf(codes.Unimplemented, "ALL_PRIMARY redundancy are not supported"), &spb.ModifyRPCErrorDetails{
1✔
548
                        Reason: spb.ModifyRPCErrorDetails_UNSUPPORTED_PARAMS,
1✔
549
                })
1✔
550
        }
1✔
551

552
        // The fake server does not (currently) support delete, so we just return an error
553
        // if the client is asking for anything other than persisting the entries.
554
        if p.Persistence == spb.SessionParameters_DELETE {
2✔
555
                return nil, addModifyErrDetailsOrReturn(status.Newf(codes.Unimplemented, "persistence modes other than PRESERVE are not supported"), &spb.ModifyRPCErrorDetails{
1✔
556
                        Reason: spb.ModifyRPCErrorDetails_UNSUPPORTED_PARAMS,
1✔
557
                })
1✔
558
        }
1✔
559

560
        cp := &clientParams{
1✔
561
                FIBAck:       p.GetAckType() == spb.SessionParameters_RIB_AND_FIB_ACK,
1✔
562
                ExpectElecID: p.GetRedundancy() == spb.SessionParameters_SINGLE_PRIMARY,
1✔
563
                Persist:      p.GetPersistence() == spb.SessionParameters_PRESERVE,
1✔
564
        }
1✔
565

1✔
566
        consistent, err := s.checkClientsConsistent(id, cp)
1✔
567
        if err != nil {
1✔
568
                return nil, status.Newf(codes.Internal, "got unexpected error checking for consistency, %v", err).Err()
×
569
        }
×
570
        if !consistent {
2✔
571
                return nil, addModifyErrDetailsOrReturn(status.Newf(codes.FailedPrecondition, "client %s is not consistent with other clients", id), &spb.ModifyRPCErrorDetails{
1✔
572
                        Reason: spb.ModifyRPCErrorDetails_PARAMS_DIFFER_FROM_OTHER_CLIENTS,
1✔
573
                })
1✔
574
        }
1✔
575

576
        if err := s.setClientParams(id, cp); err != nil {
1✔
577
                return nil, status.Errorf(codes.Internal, "internal error setting parameters, %v", err)
×
578
        }
×
579

580
        return &spb.ModifyResponse{
1✔
581
                SessionParamsResult: &spb.SessionParametersResult{
1✔
582
                        Status: spb.SessionParametersResult_OK,
1✔
583
                },
1✔
584
        }, nil
1✔
585
}
586

587
// checkClientsConsistent ensures that the client described by id and the parameters p is consistent
588
// with other clients in the server.
589
func (s *Server) checkClientsConsistent(id string, p *clientParams) (bool, error) {
1✔
590
        if p == nil {
2✔
591
                return false, fmt.Errorf("unexpected nil parameters for client %s", id)
1✔
592
        }
1✔
593

594
        s.csMu.RLock()
1✔
595
        defer s.csMu.RUnlock()
1✔
596
        for cid, state := range s.cs {
2✔
597
                if id == cid {
2✔
598
                        continue
1✔
599
                }
600
                if state == nil || state.params == nil {
2✔
601
                        return false, fmt.Errorf("client %s has invalid nil parameter state", cid)
1✔
602
                }
1✔
603

604
                if !state.params.Equal(p) {
2✔
605
                        return false, nil
1✔
606
                }
1✔
607
        }
608
        return true, nil
1✔
609
}
610

611
// setClientParams sets the parameters for client with id to have the specified parameters.
612
func (s *Server) setClientParams(id string, p *clientParams) error {
1✔
613
        s.csMu.Lock()
1✔
614
        defer s.csMu.Unlock()
1✔
615
        if s.cs[id] == nil {
1✔
616
                return fmt.Errorf("cannot find client %s, known clients: %v", id, s.cs)
×
617
        }
×
618
        s.cs[id].params = p
1✔
619
        return nil
1✔
620
}
621

622
// isNewMaster takes two election IDs and determines whether the candidate (cand) is a new
623
// master given the existing master (exist). It returns:
624
//   - a bool which indicates whether the candidate is the new master
625
//   - a bool which indicates whether this is an election ID with the same election ID
626
//   - an error indicating whether this was an invalid update
627
func isNewMaster(cand, exist *spb.Uint128) (bool, bool, error) {
1✔
628
        if exist == nil {
2✔
629
                return true, false, nil
1✔
630
        }
1✔
631
        if cand.High > exist.High {
2✔
632
                return true, false, nil
1✔
633
        }
1✔
634
        if cand.Low > exist.Low {
2✔
635
                return true, false, nil
1✔
636
        }
1✔
637

638
        // Per comments in gribi.proto - if the two values are equal, then we accept the new
639
        // candidate as the master, this allows for reconnections.
640
        if cand.High == exist.High && cand.Low == exist.Low {
2✔
641
                return true, true, nil
1✔
642
        }
1✔
643
        // TODO(robjs): currently this is not specified in the spec, since this is the
644
        // election ID going backwards, but it seems like we could not return an error here
645
        // since it might just be a stale client telling us that they think that they're the
646
        // master. However, we could return an error just to say "hey, you're somehow out of sync
647
        // with the master election system".
648
        return false, false, nil
1✔
649
}
650

651
// getClientState returns the client state for the client with the specified id, along with
652
// whether the client was found.
653
func (s *Server) getClientState(id string) (*clientState, bool) {
1✔
654
        s.csMu.RLock()
1✔
655
        defer s.csMu.RUnlock()
1✔
656
        cs, ok := s.cs[id]
1✔
657
        return cs, ok
1✔
658
}
1✔
659

660
// storeClientElectionID stores the latest election ID for a client into the
661
// server specified. It returns true if the election ID was stored.
662
func (s *Server) storeClientElectionID(id string, elecID *spb.Uint128) bool {
1✔
663
        s.csMu.Lock()
1✔
664
        defer s.csMu.Unlock()
1✔
665
        cs, ok := s.cs[id]
1✔
666
        if !ok {
1✔
667
                return false
×
668
        }
×
669
        cs.lastElecID = elecID
1✔
670
        return true
1✔
671
}
672

673
// getClientStateCopy returns a copy of the state for the client with the specified ID, since
674
// the state of a client is immutable after the initial creation, we never allow a client to
675
// get a copy of the pointer so that they could change this. It returns a copy of the clientState
676
// an error.
677
func (s *Server) getClientStateCopy(id string) (*clientState, error) {
1✔
678
        s.csMu.RLock()
1✔
679
        defer s.csMu.RUnlock()
1✔
680
        if s.cs[id] == nil {
1✔
681
                return nil, fmt.Errorf("unknown client %s", id)
×
682
        }
×
683
        return s.cs[id].DeepCopy(), nil
1✔
684
}
685

686
// runElection runs an election on the server and checks whether the client with the specified id is
687
// the new master.
688
func (s *Server) runElection(id string, elecID *spb.Uint128) (*spb.ModifyResponse, error) {
1✔
689
        cs, err := s.getClientStateCopy(id)
1✔
690
        if err != nil {
1✔
691
                return nil, err
×
692
        }
×
693
        if !cs.params.ExpectElecID {
2✔
694
                return nil, addModifyErrDetailsOrReturn(status.Newf(codes.FailedPrecondition, "client ID %s does not expect elections", id), &spb.ModifyRPCErrorDetails{
1✔
695
                        Reason: spb.ModifyRPCErrorDetails_ELECTION_ID_IN_ALL_PRIMARY,
1✔
696
                })
1✔
697
        }
1✔
698

699
        // If the election ID that we received is 0, then this is an invalid value
700
        // in the input message, return an error to the client.
701
        if inputID, zero := uint128.New(elecID.Low, elecID.High), uint128.New(0, 0); zero.Cmp(inputID) == 0 {
2✔
702
                return nil, status.Newf(codes.InvalidArgument, "client ID %s, zero is an invalid election ID", id).Err()
1✔
703
        }
1✔
704

705
        // At this point, we store the latest election ID that we've seen from this
706
        // client, even if it does not win the election. This allows us to check
707
        // that the client has the same election ID as it has reported to us in
708
        // subsequent transactions.
709
        if !s.storeClientElectionID(id, elecID) {
1✔
710
                return nil, status.Newf(codes.Internal, "cannot store election ID %s for client %s", elecID, id).Err()
×
711
        }
×
712

713
        s.elecMu.RLock()
1✔
714
        defer s.elecMu.RUnlock()
1✔
715
        nm, _, err := isNewMaster(elecID, s.curElecID)
1✔
716
        if err != nil {
1✔
717
                return nil, err
×
718
        }
×
719

720
        if nm {
2✔
721
                s.curElecID = elecID
1✔
722
                s.curMaster = id
1✔
723
        }
1✔
724

725
        return &spb.ModifyResponse{
1✔
726
                ElectionId: s.curElecID,
1✔
727
        }, nil
1✔
728
}
729

730
// getElection returns the details of the current election on the server.
731
func (s *Server) getElection() *electionDetails {
1✔
732
        s.elecMu.RLock()
1✔
733
        defer s.elecMu.RUnlock()
1✔
734
        return &electionDetails{
1✔
735
                master: s.curMaster,
1✔
736
                ID:     s.curElecID,
1✔
737
        }
1✔
738
}
1✔
739

740
// doModify implements a modify operation for a specific input set of AFTOperation
741
// messages for the client with the specified cid. It writes the result to the supplied
742
// ModifyResponse channel when successful, or writes the error to the supplied errCh.
743
func (s *Server) doModify(cid string, ops []*spb.AFTOperation, resCh chan *spb.ModifyResponse, errCh chan error) {
1✔
744
        cs, ok := s.getClientState(cid)
1✔
745
        switch {
1✔
746
        case !ok:
1✔
747
                errCh <- status.Newf(codes.Internal, "operation received for unknown client, %s", cid).Err()
1✔
748
                return
1✔
749
        case cs.params == nil || !cs.params.ExpectElecID || !cs.params.Persist:
1✔
750
                // these are parameters that we do not support.
1✔
751
                errCh <- addModifyErrDetailsOrReturn(
1✔
752
                        status.New(codes.Unimplemented, "unsupported parameters for client"),
1✔
753
                        &spb.ModifyRPCErrorDetails{
1✔
754
                                Reason: spb.ModifyRPCErrorDetails_UNSUPPORTED_PARAMS,
1✔
755
                        })
1✔
756
                return
1✔
757
        }
758

759
        elec := s.getElection()
1✔
760
        elec.clientLatest = cs.lastElecID
1✔
761
        elec.client = cid
1✔
762

1✔
763
        for _, o := range ops {
2✔
764
                ni := o.GetNetworkInstance()
1✔
765
                if ni == "" {
1✔
766
                        resCh <- &spb.ModifyResponse{
×
767
                                Result: []*spb.AFTResult{{
×
768
                                        Id:     o.Id,
×
769
                                        Status: spb.AFTResult_FAILED,
×
770
                                        ErrorDetails: &spb.AFTErrorDetails{
×
771
                                                ErrorMessage: `invalid network instance name "" specified`,
×
772
                                        },
×
773
                                }},
×
774
                        }
×
775
                }
×
776
                if _, ok := s.masterRIB.NetworkInstanceRIB(ni); !ok {
2✔
777
                        // this is an unknown network instance, we should not return
1✔
778
                        // an error to the client since we do not want the connection
1✔
779
                        // to be torn down.
1✔
780
                        log.Errorf("rejected operation %s since it is an unknown network-instance, %s", o, ni)
1✔
781
                        resCh <- &spb.ModifyResponse{
1✔
782
                                Result: []*spb.AFTResult{{
1✔
783
                                        Id:     o.Id,
1✔
784
                                        Status: spb.AFTResult_FAILED,
1✔
785
                                        ErrorDetails: &spb.AFTErrorDetails{
1✔
786
                                                ErrorMessage: fmt.Sprintf(`unknown network instance "%s" specified`, ni),
1✔
787
                                        },
1✔
788
                                }},
1✔
789
                        }
1✔
790
                        return
1✔
791
                }
1✔
792

793
                // We do not try and modify entries within the operation in parallel
794
                // with each other since this may cause us to duplicate ACK on particular
795
                // operations - for example, if there are two next-hops that are within a
796
                // single next-hop-group, then both of them will cause the NHG to be
797
                // installed in the RIB. If we do this then we might end up ACKing
798
                // one twice if there was >1 different entry that mde a NHG resolvable.
799
                // For a SINGLE_PRIMARY client serialising into a single Modify channel
800
                // ensures that we do not end up with this occuring - but going forward
801
                // for ALL_PRIMARY this situation will need to handled likely by creating
802
                // some form of lock on each transaction as it is attempted, or building
803
                // a more intelligent RIB structure to track missing dependencies.
804
                res, err := modifyEntry(s.masterRIB, ni, o, cs.params.FIBAck, elec)
1✔
805
                switch {
1✔
806
                case err != nil:
×
807
                        errCh <- err
×
808
                default:
1✔
809
                        resCh <- res
1✔
810
                }
811
        }
812

813
}
814

815
// electionDetails provides a summary of a single election from the perspective of one client.
816
type electionDetails struct {
817
        // master is the clientID of the client that is master after the election.
818
        master string
819
        // electionID is the ID of the latest election.
820
        ID *spb.Uint128
821
        // client is the ID of the client that the query is being done on behalf of.
822
        client string
823
        // clientLatest is the latest electionID that the client provided us with.
824
        clientLatest *spb.Uint128
825
}
826

827
// modifyEntry performs the specified modify operation, op, on the RIB, r, within the network
828
// instance ni. The client's request ACK mode is specified by fibACK. The details of the
829
// current election on the server is described in election.
830
// The results are returned as a ModifyResponse and an error which must be a status.Status.
831
func modifyEntry(r *rib.RIB, ni string, op *spb.AFTOperation, fibACK bool, election *electionDetails) (*spb.ModifyResponse, error) {
1✔
832
        if op == nil {
2✔
833
                return nil, status.Newf(codes.Internal, "invalid nil operation received").Err()
1✔
834
        }
1✔
835

836
        res, ok, err := checkElectionForModify(op.Id, op.ElectionId, election)
1✔
837
        if err != nil {
2✔
838
                return nil, err
1✔
839
        }
1✔
840
        if !ok {
2✔
841
                return res, err
1✔
842
        }
1✔
843

844
        if r == nil {
1✔
845
                return nil, status.New(codes.Internal, "invalid RIB state").Err()
×
846
        }
×
847

848
        niR, ok := r.NetworkInstanceRIB(ni)
1✔
849
        if !ok || !niR.IsValid() {
1✔
850
                return nil, status.Newf(codes.Internal, "invalid RIB state for network instance name: '%s'", ni).Err()
×
851
        }
×
852

853
        results := []*spb.AFTResult{}
1✔
854
        okACK := spb.AFTResult_RIB_PROGRAMMED
1✔
855
        // TODO(robjs): today we just say anything that hit
1✔
856
        // the RIB hit the FIB. We need to add a feedback loop
1✔
857
        // that checks this.
1✔
858
        if fibACK {
2✔
859
                okACK = spb.AFTResult_FIB_PROGRAMMED
1✔
860
        }
1✔
861

862
        var (
1✔
863
                oks, faileds []*rib.OpResult
1✔
864
                ribFatalErr  error
1✔
865
        )
1✔
866

1✔
867
        switch op.Op {
1✔
868
        case spb.AFTOperation_ADD, spb.AFTOperation_REPLACE:
1✔
869
                // AddEntry handles replaces, since an ADD can be an explicit replace. It checks
1✔
870
                // whether the entry was an explicit replace from the op, and if so errors if the
1✔
871
                // entry does not already exist.
1✔
872
                log.V(2).Infof("calling AddEntry for operation ID %d", op.GetId())
1✔
873
                oks, faileds, ribFatalErr = r.AddEntry(ni, op)
1✔
874
        case spb.AFTOperation_DELETE:
1✔
875
                oks, faileds, ribFatalErr = r.DeleteEntry(ni, op)
1✔
876
        default:
1✔
877
                return &spb.ModifyResponse{
1✔
878
                        Result: []*spb.AFTResult{
1✔
879
                                {
1✔
880
                                        Id:     op.GetId(),
1✔
881
                                        Status: spb.AFTResult_FAILED,
1✔
882
                                        ErrorDetails: &spb.AFTErrorDetails{
1✔
883
                                                ErrorMessage: fmt.Sprintf("unsupported operation type supplied, %s", op.Op),
1✔
884
                                        },
1✔
885
                                },
1✔
886
                        },
1✔
887
                }, nil
1✔
888
        }
889

890
        if ribFatalErr != nil {
1✔
891
                // RIB action returned fatal error for the connection.
×
892
                return nil, addModifyErrDetailsOrReturn(
×
893
                        status.Newf(codes.Unimplemented, "fatal error processing operation %s, error: %v", op.Op, ribFatalErr),
×
894
                        &spb.ModifyRPCErrorDetails{
×
895
                                Reason: spb.ModifyRPCErrorDetails_UNKNOWN,
×
896
                        },
×
897
                )
×
898
        }
×
899

900
        for _, ok := range oks {
2✔
901
                log.V(2).Infof("received OK for %d in operation %s", ok.ID, prototext.Format(op))
1✔
902
                results = append(results, &spb.AFTResult{
1✔
903
                        Id:     ok.ID,
1✔
904
                        Status: okACK,
1✔
905
                })
1✔
906
        }
1✔
907

908
        for _, fail := range faileds {
2✔
909
                log.Errorf("returning failed to client because the RIB declared it failed, %v", fail)
1✔
910
                results = append(results, &spb.AFTResult{
1✔
911
                        Id:     fail.ID,
1✔
912
                        Status: spb.AFTResult_FAILED,
1✔
913
                        // TODO(robjs): add somewhere for the error that we provide to be
1✔
914
                        // returned.
1✔
915
                })
1✔
916
        }
1✔
917

918
        return &spb.ModifyResponse{
1✔
919
                Result: results,
1✔
920
        }, nil
1✔
921
}
922

923
// checkElectionForModify checks whether the operation with ID opID, and election ID opElecID
924
// with the server that has the election context described by election should proceed. It returns
925
//   - a ModifyResponse which is to be sent to the client
926
//   - a bool determining whether the modify should proceed
927
//   - an error that is returned to the client.
928
//
929
// The bool is set to true in the case that a non-fatal error (e.g., an error whereby the
930
// operation is just ignored) is encountered.
931
// Any returned error is considered fatal to the Modify RPC and can be sent directly back
932
// to the client.
933
func checkElectionForModify(opID uint64, opElecID *spb.Uint128, election *electionDetails) (*spb.ModifyResponse, bool, error) {
1✔
934
        // check whether the election ID is the current one.
1✔
935
        if opElecID == nil {
2✔
936
                // this is an error since we only support election IDs.
1✔
937
                return nil, false, addModifyErrDetailsOrReturn(
1✔
938
                        status.Newf(codes.FailedPrecondition, "no specified election ID when it was required"),
1✔
939
                        &spb.ModifyRPCErrorDetails{
1✔
940
                                // TODO(robjs): we probably need to define what happens here, no specified error code.
1✔
941
                        })
1✔
942
        }
1✔
943

944
        switch {
1✔
945
        case election == nil, election.master == "", election.ID == nil:
1✔
946
                return nil, false, status.Newf(codes.Internal, "invalid election state in server, details of election: %+v", election).Err()
1✔
947
        case election.clientLatest == nil:
1✔
948
                // This client might not have sent us an election ID yet, which means that they are in
1✔
949
                // the wrong.
1✔
950
                return nil, false, addModifyErrDetailsOrReturn(
1✔
951
                        status.Newf(codes.FailedPrecondition, "client has not yet specified an election ID"),
1✔
952
                        &spb.ModifyRPCErrorDetails{},
1✔
953
                )
1✔
954
        case election.client != election.master:
1✔
955
                // this client is not the elected master.
1✔
956
                log.Errorf("returning failed to client %s (id: %s), because they are not the elected master (%s is, id: %s)", election.client, election.clientLatest, election.master, election.ID)
1✔
957
                return &spb.ModifyResponse{
1✔
958
                        Result: []*spb.AFTResult{{
1✔
959
                                Id:     opID,
1✔
960
                                Status: spb.AFTResult_FAILED,
1✔
961
                        }},
1✔
962
                }, false, nil
1✔
963
        }
964

965
        thisID := uint128.New(opElecID.Low, opElecID.High)
1✔
966
        // check that this client sent us the same ID as we had before.
1✔
967
        currentClientID := uint128.New(election.clientLatest.Low, election.clientLatest.High)
1✔
968
        if thisID.Cmp(currentClientID) != 0 {
2✔
969
                log.Errorf("returning failed to client because operation election ID %s != their latest election ID %s (master is: %s with ID %s)", opElecID, election.clientLatest, election.master, election.ID)
1✔
970
                return &spb.ModifyResponse{
1✔
971
                        Result: []*spb.AFTResult{{
1✔
972
                                Id:     opID,
1✔
973
                                Status: spb.AFTResult_FAILED,
1✔
974
                        }},
1✔
975
                }, false, nil
1✔
976
        }
1✔
977

978
        // This is a belt and braces check -- it's not clear that we need to do it. Since we
979
        // checked that the master that is stored in the server is this client. However, we do
980
        // an additional check to ensure that the current ID that we are storing is definitely
981
        // the same as the one that we just received.
982
        currentID := uint128.New(election.ID.Low, election.ID.High)
1✔
983
        switch {
1✔
984
        case thisID.Cmp(currentID) > 0:
1✔
985
                // this value is greater than the known master ID. Return an error
1✔
986
                return nil, false, status.Newf(codes.FailedPrecondition, "specified election ID was greater than existing election, %s > %s", thisID, currentID).Err()
1✔
987
        case thisID.Cmp(currentID) < 0:
1✔
988
                // this value is less as the current master ID, ignore this transaction.
1✔
989
                // note that since we don't respond here, the client at the other side
1✔
990
                // is just going to have a pending transaction forever.
1✔
991
                //
1✔
992
                // TODO(robjs): should we add an error code here?
1✔
993
                log.Errorf("returning failed to client because operation election ID %s < the master election ID %s (master: %s)", opElecID, election.ID, election.master)
1✔
994
                return &spb.ModifyResponse{
1✔
995
                        Result: []*spb.AFTResult{{
1✔
996
                                Id:     opID,
1✔
997
                                Status: spb.AFTResult_FAILED,
1✔
998
                        }},
1✔
999
                }, false, nil
1✔
1000
        }
1001
        return nil, true, nil
1✔
1002
}
1003

1004
// doGet implements the Get RPC for the gRIBI server. It handles the input GetRequest, writing
1005
// the set of GetResponses to the specified msgCh. When the Get is done, the function writes to
1006
// doneCh such that the caller knows that the work that is being done is complete. If a message
1007
// is received on stopCh the function returns. Any errors that are experienced are written to
1008
// errCh.
1009
func (s *Server) doGet(req *spb.GetRequest, msgCh chan *spb.GetResponse, doneCh, stopCh chan struct{}, errCh chan error) {
1✔
1010
        // Any time we return we return we tell the done channel that we're complete.
1✔
1011
        defer func() {
2✔
1012
                doneCh <- struct{}{}
1✔
1013
        }()
1✔
1014

1015
        if req == nil {
1✔
1016
                errCh <- status.Errorf(codes.InvalidArgument, "invalid nil GetRequest received")
×
1017
                return
×
1018
        }
×
1019

1020
        netInstances := []string{}
1✔
1021
        switch nireq := req.NetworkInstance.(type) {
1✔
1022
        case *spb.GetRequest_Name:
1✔
1023
                if nireq.Name == "" {
2✔
1024
                        errCh <- status.Errorf(codes.InvalidArgument, `invalid string "" returned for NetworkInstance name in GetRequest`)
1✔
1025
                        return
1✔
1026
                }
1✔
1027
                netInstances = append(netInstances, nireq.Name)
1✔
1028
        case *spb.GetRequest_All:
1✔
1029
                netInstances = s.masterRIB.KnownNetworkInstances()
1✔
1030
        }
1031

1032
        filter := map[spb.AFTType]bool{}
1✔
1033
        switch v := req.Aft; v {
1✔
1034
        case spb.AFTType_ALL, spb.AFTType_IPV4, spb.AFTType_NEXTHOP, spb.AFTType_NEXTHOP_GROUP, spb.AFTType_MPLS, spb.AFTType_IPV6:
1✔
1035
                filter[v] = true
1✔
1036
        default:
1✔
1037
                errCh <- status.Errorf(codes.Unimplemented, "AFTs other than IPv4, MPLS, IPv6, NHG and NH are unimplemented, requested: %s", v)
1✔
1038
        }
1039

1040
        for _, ni := range netInstances {
2✔
1041
                netInst, ok := s.masterRIB.NetworkInstanceRIB(ni)
1✔
1042
                if !ok {
1✔
1043
                        errCh <- status.Errorf(codes.InvalidArgument, "invalid network instance %s specified", ni)
×
1044
                        return
×
1045
                }
×
1046

1047
                if err := netInst.GetRIB(filter, msgCh, stopCh); err != nil {
1✔
1048
                        errCh <- err
×
1049
                        return
×
1050
                }
×
1051
        }
1052
}
1053

1054
// addFlushErrDetailsOrReturn
1055
func addFlushErrDetailsOrReturn(s *status.Status, d *spb.FlushResponseError) error {
1✔
1056
        se, err := s.WithDetails(d)
1✔
1057
        if err != nil {
1✔
1058
                return s.Err()
×
1059
        }
×
1060
        return se.Err()
1✔
1061
}
1062

1063
// checkFlushRequest ensures that the FlushRequest that was supplied from a client is valid - particularly,
1064
// validating that the election parameters are consistent, and correct, and that the Flush should be
1065
// completed.
1066
func (s *Server) checkFlushRequest(req *spb.FlushRequest) error {
1✔
1067
        switch {
1✔
1068
        case req == nil:
1✔
1069
                return status.Newf(codes.Internal, "FlushRequest can not be nil").Err()
1✔
1070
        case req.GetNetworkInstance() == nil:
1✔
1071
                return addFlushErrDetailsOrReturn(status.Newf(codes.InvalidArgument, "unspecified network instance"), &spb.FlushResponseError{
1✔
1072
                        Status: spb.FlushResponseError_UNSPECIFIED_NETWORK_INSTANCE,
1✔
1073
                })
1✔
1074
        case req.GetOverride() != nil:
1✔
1075
                // The election ID should not be compared, regardless of whether
1✔
1076
                // there are SINGLE_PRIMARY clients on the server.
1✔
1077
                return nil
1✔
1078
        }
1079

1080
        id := req.GetId()
1✔
1081
        switch {
1✔
1082
        case id == nil && s.curElecID == nil:
1✔
1083
                // We are in ALL_PRIMARY mode and not given an election ID, which is fine.
1✔
1084
                return nil
1✔
1085
        case id == nil && s.curElecID != nil:
1✔
1086
                // We are in SINGLE_PRIMARY mode but we were not given an election behaviour.
1✔
1087
                return addFlushErrDetailsOrReturn(status.Newf(codes.FailedPrecondition, "unsupported election behaviour, client in SINGLE_PRIMARY mode"), &spb.FlushResponseError{
1✔
1088
                        Status: spb.FlushResponseError_UNSPECIFIED_ELECTION_BEHAVIOR,
1✔
1089
                })
1✔
1090
        case id != nil && s.curElecID == nil:
1✔
1091
                return addFlushErrDetailsOrReturn(status.Newf(codes.FailedPrecondition, "received election ID in ALL_PRIMARY mode"), &spb.FlushResponseError{
1✔
1092
                        Status: spb.FlushResponseError_ELECTION_ID_IN_ALL_PRIMARY,
1✔
1093
                })
1✔
1094
        }
1095

1096
        // If the Flush specified an ID, then we need to check that it is valid according
1097
        // to the logic that is defined in the specification - it must be either equal to
1098
        // or higher than the value that is the current master,
1099
        candidate := uint128.New(id.Low, id.High)
1✔
1100

1✔
1101
        if uint128.New(0, 0).Equals(candidate) {
2✔
1102
                return addFlushErrDetailsOrReturn(status.Newf(codes.InvalidArgument, "zero is an invalid election ID"), &spb.FlushResponseError{
1✔
1103
                        Status: spb.FlushResponseError_INVALID_ELECTION_ID,
1✔
1104
                })
1✔
1105
        }
1✔
1106

1107
        existing := uint128.New(s.curElecID.Low, s.curElecID.High)
1✔
1108
        if candidate.Cmp(existing) < 0 {
2✔
1109
                return addFlushErrDetailsOrReturn(status.Newf(codes.FailedPrecondition, "election ID specified (%v) is not primary", candidate), &spb.FlushResponseError{
1✔
1110
                        Status: spb.FlushResponseError_NOT_PRIMARY,
1✔
1111
                })
1✔
1112
        }
1✔
1113

1114
        return nil
1✔
1115
}
1116

1117
// FakeServer is a wrapper around the server with functions to enable testing
1118
// to be performed more easily, for example, injecting specific state.
1119
type FakeServer struct {
1120
        *Server
1121
}
1122

1123
// NewFake returns a new version of the fake server. This implementation wraps
1124
// the Server implementation with functions to insert specific state into
1125
// the server without a need to use the public APIs.
1126
func NewFake(opt ...ServerOpt) (*FakeServer, error) {
1✔
1127
        s, err := New(opt...)
1✔
1128
        if err != nil {
1✔
1129
                return nil, err
×
1130
        }
×
1131
        return &FakeServer{Server: s}, nil
1✔
1132
}
1133

1134
// InjectRIB allows a client to inject a RIB into the server as though it was
1135
// received from the master client.
1136
func (f *FakeServer) InjectRIB(r *rib.RIB) {
1✔
1137
        f.Server.masterRIB = r
1✔
1138
}
1✔
1139

1140
// InjectElectionID allows a client to set an initial election ID on the server as
1141
// though it was received from the client.
1142
func (f *FakeServer) InjectElectionID(id *spb.Uint128) {
×
1143
        f.Server.curElecID = id
×
1144
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc