• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

openconfig / gribigo / 12640464692

06 Jan 2025 09:15PM UTC coverage: 69.717% (-4.0%) from 73.705%
12640464692

Pull #249

github

robshakir
Update workflow again!
Pull Request #249: Update OC, accommodate gRPC changes.

65 of 550 new or added lines in 2 files covered. (11.82%)

12 existing lines in 2 files now uncovered.

6149 of 8820 relevant lines covered (69.72%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.56
/server/server.go
1
// Copyright 2021 Google LLC
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//      http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
// Package server defines the basic elements of a gRIBI server that uses
16
// an in-memory data store for the AFT contents.
17
package server
18

19
import (
20
        "bytes"
21
        "context"
22
        "fmt"
23
        "io"
24
        "sync"
25
        "time"
26

27
        log "github.com/golang/glog"
28
        "github.com/google/uuid"
29
        "github.com/openconfig/gribigo/rib"
30
        "google.golang.org/grpc/codes"
31
        "google.golang.org/grpc/status"
32
        "google.golang.org/protobuf/encoding/prototext"
33
        "lukechampine.com/uint128"
34

35
        spb "github.com/openconfig/gribi/v1/proto/service"
36
)
37

38
const (
39
        // DefaultNetworkInstanceName specifies the name of the default network instance on the system.
40
        DefaultNetworkInstanceName = "DEFAULT"
41
)
42

43
// unixTS is used to determine the current unix timestamp in nanoseconds since the
44
// epoch. It is defined such that it can be overloaded by unit tests.
45
var unixTS = time.Now().UnixNano
46

47
// Server implements the gRIBI service.
48
type Server struct {
49
        *spb.UnimplementedGRIBIServer
50

51
        // csMu protects the cs map.
52
        csMu sync.RWMutex
53
        // cs stores the state for clients that are connected to the server
54
        // this allows the server perform operations such as ensuring consistency
55
        // across different connected clients. The key of the map is a unique string
56
        // identifying each client, which in this implementation is a UUID generated
57
        // at connection time. The client ID is scoped as such to ensure that we don't
58
        // have any state as to where the current connection came from (source address,
59
        // or TCP session handle for example).
60
        //
61
        // This design is acceptable because there is no case in which we have an
62
        // operation within gRIBI whereby a client references some state that is
63
        // associated with a prior connection that it had.
64
        cs map[string]*clientState
65

66
        // elecMu protects the curElecID and curMaster values.
67
        elecMu sync.RWMutex
68
        // curElecID stores the current electionID for cases where the server is
69
        // operating in SINGLE_PRIMARY mode.
70
        curElecID *spb.Uint128
71
        // curMaster stores the current master's UUID.
72
        curMaster string
73

74
        // masterRIB is the single gRIBI RIB that is used for a server that runs with
75
        // a single elected master, where a single RIB is written to by all clients.
76
        masterRIB *rib.RIB
77
}
78

79
// clientState stores information that relates to a specific client
80
// connected to the gRIBI server.
81
type clientState struct {
82
        // params stores parameters that are associated with a single
83
        // client of the server. These parameters are advertised as the
84
        // first message on a Modify stream. It is an error to send
85
        // parameters in any other context (i.e., after other ModifyRequest
86
        // messages have been sent, or to adjust these parameters).
87
        params *clientParams
88
        // setParams indicates whether the parameters have been explicitly
89
        // written.
90
        setParams bool
91
        // lastElecID stores the last election ID that the client
92
        // sent to the server. This is used to validate whether the election
93
        // ID in an operation matches the expected election ID.
94
        lastElecID *spb.Uint128
95
}
96

97
// DeepCopy returns a copy of the clientState struct.
98
func (cs *clientState) DeepCopy() *clientState {
1✔
99
        if cs.params == nil {
1✔
100
                return &clientState{}
×
101
        }
×
102
        return &clientState{
1✔
103
                params: cs.params.DeepCopy(),
1✔
104
        }
1✔
105
}
106

107
// clientParams stores parameters that are set as part of the Modify RPC
108
// initial handshake for a particular client.
109
type clientParams struct {
110
        // Persist indicates whether the client's AFT entries should be
111
        // persisted even after the client disconnects.
112
        Persist bool
113

114
        // ExpectElecID indicates whether the client expects to send
115
        // election IDs (i.e., the ClientRedundancy is SINGLE_PRIMARY).
116
        ExpectElecID bool
117

118
        // FIBAck indicates whether the client expects FIB-level
119
        // acknowledgements.
120
        FIBAck bool
121
}
122

123
// DeepCopy returns a copy of the clientParams struct.
124
func (cp *clientParams) DeepCopy() *clientParams {
1✔
125
        return &clientParams{
1✔
126
                Persist:      cp.Persist,
1✔
127
                ExpectElecID: cp.ExpectElecID,
1✔
128
                FIBAck:       cp.FIBAck,
1✔
129
        }
1✔
130
}
1✔
131

132
// Equal returns true if the candidate clientParams n is equal to the receiver cp.
133
func (cp *clientParams) Equal(n *clientParams) bool {
1✔
134
        return cp.Persist == n.Persist && cp.FIBAck == n.FIBAck && cp.ExpectElecID == n.ExpectElecID
1✔
135
}
1✔
136

137
// ServerOpt is an interface that is implemented by any options to the gRIBI server.
138
type ServerOpt interface {
139
        isServerOpt()
140
}
141

142
// WithPostChangeRIBHook specifies that the given function that is of type rib.RIBHook function
143
// should be executed after each change in the RIB.
144
func WithPostChangeRIBHook(fn rib.RIBHookFn) *postChangeRibHook {
×
145
        return &postChangeRibHook{fn: fn}
×
146
}
×
147

148
// ribHook is the internal implementation of the WithRIBHook option.
149
type postChangeRibHook struct {
150
        fn rib.RIBHookFn
151
}
152

153
// isServerOpt implements the ServerOpt interface.
154
func (*postChangeRibHook) isServerOpt() {}
×
155

156
// hasPostChangeRIBHook extracts the ribHook option from the supplied ServerOpt, returning nil
157
// if one is not found. It will return only the first argument if multiple are specified.
158
func hasPostChangeRIBHook(opt []ServerOpt) *postChangeRibHook {
1✔
159
        for _, o := range opt {
2✔
160
                if v, ok := o.(*postChangeRibHook); ok {
1✔
161
                        return v
×
162
                }
×
163
        }
164
        return nil
1✔
165
}
166

167
// WithResolvedEntryHook is a Server option that allows a function to be run for
168
// each entry that can be fully resolved within the RIB (e.g., IPv4Entry).
169
func WithRIBResolvedEntryHook(fn rib.ResolvedEntryFn) *resolvedEntryHook {
×
170
        return &resolvedEntryHook{fn: fn}
×
171
}
×
172

173
// resolvedEntryHook is the internal implementation of the WithRIBResolvedEntryHook
174
// option.
175
type resolvedEntryHook struct {
176
        fn rib.ResolvedEntryFn
177
}
178

179
// isServerOpt implements the ServerOpt interface.
180
func (r *resolvedEntryHook) isServerOpt() {}
×
181

182
// hasResolvedEntryHook returns the resolvedEntryHook from the specified options
183
// if one exists. It will return only the first argument if multiple are specified.
184
func hasResolvedEntryHook(opt []ServerOpt) *resolvedEntryHook {
1✔
185
        for _, o := range opt {
2✔
186
                if v, ok := o.(*resolvedEntryHook); ok {
1✔
187
                        return v
×
188
                }
×
189
        }
190
        return nil
1✔
191
}
192

193
// DisableRIBCheckFn specifies that the consistency checking functions should
194
// be disabled for the RIB. It is useful for a testing RIB that does not need
195
// to have working references.
196
func DisableRIBCheckFn() *disableCheckFn { return &disableCheckFn{} }
1✔
197

198
// disableCheckFn is the internal implementation of DisableRIBCheckFn.
199
type disableCheckFn struct{}
200

201
// isServerOpt implements the ServerOpt interface
202
func (*disableCheckFn) isServerOpt() {}
×
203

204
// hasDisableCheckFn checks whether the ServerOpt slice supplied contains the
205
// disableCheckFn option.
206
func hasDisableCheckFn(opt []ServerOpt) bool {
1✔
207
        for _, o := range opt {
2✔
208
                if _, ok := o.(*disableCheckFn); ok {
2✔
209
                        return true
1✔
210
                }
1✔
211
        }
212
        return false
1✔
213
}
214

215
// WithVRFs specifies that the server should be initialised with the L3VRF
216
// network instances specified in the names list. Each is created in the
217
// server's RIB such that it can be referenced.
218
func WithVRFs(names []string) *withVRFs { return &withVRFs{names: names} }
1✔
219

220
// withVRFs is the internal implementation of WithVRFs that can be read by the
221
// server.
222
type withVRFs struct {
223
        names []string
224
}
225

226
// isServerOpt implements the ServerOpt interface.
227
func (*withVRFs) isServerOpt() {}
×
228

229
// hasWithVRFs checks whether the ServerOpt slice supplied contains the withVRFs
230
// option and returns it if so.
231
func hasWithVRFs(opt []ServerOpt) []string {
1✔
232
        for _, o := range opt {
2✔
233
                if v, ok := o.(*withVRFs); ok {
2✔
234
                        return v.names
1✔
235
                }
1✔
236
        }
237
        return nil
1✔
238
}
239

240
// WithNoRIBForwardReferences specifies that the server should not accept
241
// forward references in the RIB. This means that next-hop-groups should not
242
// be able to reference next-hops that do not exist, and entries cannot reference
243
// missing next-hop-groups.
244
func WithNoRIBForwardReferences() *disableRIBForwardRefs { return &disableRIBForwardRefs{} }
1✔
245

246
// disableRIBForwardRefs is the internal implementation of WithNoRIBForwardReferences.
247
type disableRIBForwardRefs struct{}
248

249
// isServerOpt implements the ServerOpt interface.
250
func (*disableRIBForwardRefs) isServerOpt() {}
×
251

252
// hasWithNoRIBForwardReferences checks whether the ServerOpt slice supplied contains the
253
// disableRIBForwardRefs option and returns true if so.
254
func hasWithNoRIBForwardReferences(opt []ServerOpt) bool {
1✔
255
        for _, o := range opt {
2✔
256
                if _, ok := o.(*disableRIBForwardRefs); ok {
2✔
257
                        return true
1✔
258
                }
1✔
259
        }
260
        return false
1✔
261
}
262

263
// New creates a new gRIBI server.
264
func New(opt ...ServerOpt) (*Server, error) {
1✔
265
        ribOpt := []rib.RIBOpt{}
1✔
266
        if hasDisableCheckFn(opt) {
2✔
267
                ribOpt = append(ribOpt, rib.DisableRIBCheckFn())
1✔
268
        }
1✔
269

270
        if hasWithNoRIBForwardReferences(opt) {
2✔
271
                ribOpt = append(ribOpt, rib.DisableForwardReferences())
1✔
272
        }
1✔
273

274
        s := &Server{
1✔
275
                cs: map[string]*clientState{},
1✔
276
                // TODO(robjs): when we implement support for ALL_PRIMARY then we might not
1✔
277
                // want to create a new RIB by default.
1✔
278
                masterRIB: rib.New(DefaultNetworkInstanceName, ribOpt...),
1✔
279
        }
1✔
280

1✔
281
        if v := hasPostChangeRIBHook(opt); v != nil {
1✔
282
                s.masterRIB.SetPostChangeHook(v.fn)
×
283
        }
×
284

285
        if v := hasResolvedEntryHook(opt); v != nil {
1✔
286
                s.masterRIB.SetResolvedEntryHook(v.fn)
×
287
        }
×
288

289
        if vrfs := hasWithVRFs(opt); vrfs != nil {
2✔
290
                for _, n := range vrfs {
2✔
291
                        if err := s.masterRIB.AddNetworkInstance(n); err != nil {
1✔
292
                                return nil, fmt.Errorf("cannot create network instance %s, %v", n, err)
×
293
                        }
×
294
                }
295
        }
296

297
        return s, nil
1✔
298
}
299

300
// Modify implements the gRIBI Modify RPC.
301
func (s *Server) Modify(ms spb.GRIBI_ModifyServer) error {
×
302
        // Initiate the per client state for this client.
×
303
        cid := uuid.New().String()
×
304
        log.V(2).Infof("creating client with ID %s", cid)
×
305
        if err := s.newClient(cid); err != nil {
×
306
                return err
×
307
        }
×
308

309
        resultChan := make(chan *spb.ModifyResponse)
×
310
        errCh := make(chan error)
×
311
        go func() {
×
312
                // Store whether this is the first message on the Modify RPC, some options - like the session
×
313
                // parameters can only be set as the first message.
×
314
                var gotmsg bool
×
315
                for {
×
316
                        in, err := ms.Recv()
×
317
                        if err == io.EOF {
×
318
                                errCh <- nil
×
319
                                return
×
320
                        }
×
321
                        if err != nil {
×
322
                                errCh <- status.Errorf(codes.Unknown, "error reading message from client, %v", err)
×
323
                                return
×
324
                        }
×
325
                        log.V(2).Infof("received message %s on Modify channel", in)
×
326

×
327
                        var (
×
328
                                res       *spb.ModifyResponse
×
329
                                skipWrite bool
×
330
                        )
×
331

×
332
                        switch {
×
333
                        case in == nil:
×
334
                                log.Errorf("received nil message on Modify channel")
×
335
                                skipWrite = true
×
336
                        case (in.Params != nil && in.ElectionId != nil), (in.Params != nil && in.Operation != nil), (in.ElectionId != nil && in.Operation != nil):
×
337
                                errCh <- status.Errorf(codes.InvalidArgument, "invalid input message, cannot specify >1 of parameters, election ID and operation simultaenously, got: %v", in)
×
338
                                return
×
339
                        case in.Params != nil:
×
340
                                var err error
×
341
                                if res, err = s.checkParams(cid, in.Params, gotmsg); err != nil {
×
342
                                        // Invalid parameters is a fatal error, so we take down the Modify RPC.
×
343
                                        errCh <- err
×
344
                                        return
×
345
                                }
×
346
                                if err := s.updateParams(cid, in.Params); err != nil {
×
347
                                        // Not being able to update parameters is a fatal error, so we take down
×
348
                                        // the Modify RPC.
×
349
                                        errCh <- err
×
350
                                        return
×
351
                                }
×
352
                        case in.ElectionId != nil:
×
353
                                var err error
×
354
                                res, err = s.runElection(cid, in.ElectionId)
×
355
                                if err != nil {
×
356
                                        errCh <- err
×
357
                                        return
×
358
                                }
×
359
                        case in.Operation != nil:
×
360
                                s.doModify(cid, in.Operation, resultChan, errCh)
×
361
                                skipWrite = true
×
362
                        default:
×
363
                                errCh <- status.Errorf(codes.Unimplemented, "unimplemented handling of message %s", in)
×
364
                                return
×
365
                        }
366

367
                        // update that we have received at least one message, to detect that we cannot
368
                        // now re-set the parameters of the RPC.
369
                        gotmsg = true
×
370
                        // write the results to result channel.
×
371
                        if !skipWrite {
×
372
                                resultChan <- res
×
373
                        }
×
374
                }
375
        }()
376

377
        resultDone := make(chan struct{})
×
378
        go func() {
×
379
                for {
×
380
                        select {
×
381
                        case res := <-resultChan:
×
382
                                if err := ms.Send(res); err != nil {
×
383
                                        errCh <- status.Errorf(codes.Internal, "cannot write message to client channel, %s", res)
×
384
                                        return
×
385
                                }
×
386
                        case <-resultDone:
×
387
                                return
×
388
                        }
389
                }
390
        }()
391

392
        err := <-errCh
×
393
        close(resultDone)
×
394

×
395
        // when this client goes away, we need to clean up its state.
×
396
        s.deleteClient(cid)
×
397

×
398
        return err
×
399
}
400

401
// Get implements the gRIBI Get RPC.
402
func (s *Server) Get(req *spb.GetRequest, stream spb.GRIBI_GetServer) error {
×
403
        msgCh := make(chan *spb.GetResponse)
×
404
        errCh := make(chan error)
×
405
        doneCh := make(chan struct{})
×
406
        stopCh := make(chan struct{})
×
407

×
408
        // defer a function to stop the goroutine and close all channels, since this will be called
×
409
        // when we exit, then it will stop the goroutine that we started to do
×
410
        // the get in the case that we exit due to some error.
×
411
        defer func() {
×
412
                // Non-blocking write to the stopCh, since if the goroutine has
×
413
                // already returned then it won't be listening and we'll deadlock.
×
414
                select {
×
415
                case stopCh <- struct{}{}:
×
416
                default:
×
417
                }
418
        }()
419

420
        go s.doGet(req, msgCh, doneCh, stopCh, errCh)
×
421

×
422
        var done bool
×
423

×
424
        for !done {
×
425
                select {
×
426
                case <-doneCh:
×
427
                        done = true
×
428
                case err := <-errCh:
×
429
                        return status.Errorf(codes.Internal, "cannot generate GetResponse, %v", err)
×
430
                case r := <-msgCh:
×
431
                        if err := stream.Send(r); err != nil {
×
432
                                return status.Errorf(codes.Internal, "cannot write message to client channel, %v", err)
×
433
                        }
×
434
                }
435
        }
436
        return nil
×
437
}
438

439
// Flush implements the gRIBI Flush RPC - used for removing entries from the server.
440
func (s *Server) Flush(ctx context.Context, req *spb.FlushRequest) (*spb.FlushResponse, error) {
1✔
441
        if err := s.checkFlushRequest(req); err != nil {
2✔
442
                return nil, err
1✔
443
        }
1✔
444

445
        nis := []string{}
1✔
446
        switch t := req.GetNetworkInstance().(type) {
1✔
447
        case *spb.FlushRequest_All:
1✔
448
                nis = s.masterRIB.KnownNetworkInstances()
1✔
449
        case *spb.FlushRequest_Name:
1✔
450
                if _, ok := s.masterRIB.NetworkInstanceRIB(t.Name); !ok {
2✔
451
                        return nil, addFlushErrDetailsOrReturn(status.Newf(codes.InvalidArgument, "could not find network instance %s", t.Name), &spb.FlushResponseError{
1✔
452
                                Status: spb.FlushResponseError_INVALID_NETWORK_INSTANCE,
1✔
453
                        })
1✔
454
                }
1✔
455
                nis = []string{t.Name}
1✔
456
        }
457

458
        if err := s.masterRIB.Flush(nis); err != nil {
1✔
459
                fErr, ok := err.(*rib.FlushErr)
×
460
                det := &bytes.Buffer{}
×
461
                switch ok {
×
462
                case true:
×
463
                        for _, msg := range fErr.Errs {
×
464
                                det.WriteString(fmt.Sprintf("%s\n", msg))
×
465
                        }
×
466
                default:
×
467
                        det.WriteString(fmt.Sprintf("%v", err))
×
468
                }
469
                // Always use codes.Internal here since any error (not being able to flush, or
470
                // not finding an NI that we already checked existed is some internal logic
471
                // error).
NEW
472
                return nil, status.Errorf(codes.Internal, "%s", det.String())
×
473
        }
474

475
        return &spb.FlushResponse{
1✔
476
                Timestamp: unixTS(),
1✔
477
                Result:    spb.FlushResponse_OK,
1✔
478
        }, nil
1✔
479
}
480

481
// newClient creates a new client context within the server using the specified string
482
// ID.
483
func (s *Server) newClient(id string) error {
1✔
484
        s.csMu.Lock()
1✔
485
        defer s.csMu.Unlock()
1✔
486
        if s.cs[id] != nil {
2✔
487
                return status.Errorf(codes.Internal, "cannot create new client with duplicate ID, %s", id)
1✔
488
        }
1✔
489
        s.cs[id] = &clientState{
1✔
490
                // Set to the default set of parameters.
1✔
491
                params: &clientParams{},
1✔
492
        }
1✔
493

1✔
494
        return nil
1✔
495
}
496

497
// deleteClient removes the client with the specified id from the server. It does not return
498
// an error if the client cannot be deleted, since this action is performed when the other
499
// side has already gone away, so there is no error we can return to them.
500
func (s *Server) deleteClient(id string) {
×
501
        s.csMu.Lock()
×
502
        defer s.csMu.Unlock()
×
503
        delete(s.cs, id)
×
504
}
×
505

506
// updateParams writes the parameters for the client specified by id to the server state
507
// based on the received session parameters supplied in params. It returns errors if
508
// the client is undefined, or the parameters have been set previously. It does not
509
// ensure that the parameters are consistent with other clients on the server - but
510
// rather solely updates for a particular client.
511
func (s *Server) updateParams(id string, params *spb.SessionParameters) error {
1✔
512
        cparam := &clientParams{}
1✔
513

1✔
514
        cparam.ExpectElecID = (params.Redundancy == spb.SessionParameters_SINGLE_PRIMARY)
1✔
515
        cparam.Persist = (params.Persistence == spb.SessionParameters_PRESERVE)
1✔
516
        cparam.FIBAck = (params.AckType == spb.SessionParameters_RIB_AND_FIB_ACK)
1✔
517

1✔
518
        s.csMu.Lock()
1✔
519
        defer s.csMu.Unlock()
1✔
520
        p := s.cs[id]
1✔
521
        if p == nil || p.params == nil {
2✔
522
                return status.Errorf(codes.Internal, "cannot update parameters for a client with no state, %s", id)
1✔
523
        }
1✔
524
        if p.setParams {
2✔
525
                return addModifyErrDetailsOrReturn(status.New(codes.FailedPrecondition, "cannot modify SessionParameters"), &spb.ModifyRPCErrorDetails{
1✔
526
                        Reason: spb.ModifyRPCErrorDetails_MODIFY_NOT_ALLOWED,
1✔
527
                })
1✔
528
        }
1✔
529
        s.cs[id].setParams = true
1✔
530
        s.cs[id].params = cparam
1✔
531
        return nil
1✔
532
}
533

534
// addModifyErrDetailsOrReturn takes an input status (s), and ModifyRPCErrorDetails proto (d) and appends
535
// d to s. If an error is encountered, s is returned, otherwise the appended version is returned. The return
536
// type is a Go error which can be returned directly.
537
func addModifyErrDetailsOrReturn(s *status.Status, d *spb.ModifyRPCErrorDetails) error {
1✔
538
        ns, err := s.WithDetails(d)
1✔
539
        if err != nil {
1✔
540
                return s.Err()
×
541
        }
×
542
        return ns.Err()
1✔
543
}
544

545
// checkParams validates that the parameters that were supplied by the client with the specified
546
// ID are valid within the overall server context. It returns the ModifyResponse that should be sent
547
// to the client, or the populated error.
548
func (s *Server) checkParams(id string, p *spb.SessionParameters, gotMsg bool) (*spb.ModifyResponse, error) {
1✔
549
        if p == nil {
2✔
550
                return nil, status.Newf(codes.Internal, "invalid nil parameters when checking for client %s, got: %v", id, p).Err()
1✔
551
        }
1✔
552

553
        if gotMsg {
2✔
554
                // TODO(robjs): the spec should spell out that this can't be sent after the first message is
1✔
555
                // received, whatever the type.
1✔
556
                return nil, addModifyErrDetailsOrReturn(status.New(codes.FailedPrecondition, "cannot send session parameters after another message"), &spb.ModifyRPCErrorDetails{
1✔
557
                        Reason: spb.ModifyRPCErrorDetails_MODIFY_NOT_ALLOWED,
1✔
558
                })
1✔
559
        }
1✔
560

561
        // TODO(robjs): confirm with folks what their thoughts are on whether we support persistence
562
        // other than DELETE in ALL_PRIMARY. I think that we should not support this since it means
563
        // that we need to externalise the client ID (so that the client can delete its old entries, but
564
        // not others).
565
        if p.Redundancy == spb.SessionParameters_ALL_PRIMARY && p.Persistence == spb.SessionParameters_PRESERVE {
2✔
566
                return nil, addModifyErrDetailsOrReturn(status.New(codes.FailedPrecondition, "cannot have ALL_PRIMARY client with persistence PRESERVE"), &spb.ModifyRPCErrorDetails{
1✔
567
                        Reason: spb.ModifyRPCErrorDetails_UNSUPPORTED_PARAMS,
1✔
568
                })
1✔
569
        }
1✔
570

571
        // The fake server supports both RIB and FIB ACKing (given that we do not have any real
572
        // FIB, they are basically the same :-)). It does not (currently) support ALL_PRIMARY
573
        // mode of operations.
574
        if p.Redundancy == spb.SessionParameters_ALL_PRIMARY {
2✔
575
                return nil, addModifyErrDetailsOrReturn(status.Newf(codes.Unimplemented, "ALL_PRIMARY redundancy are not supported"), &spb.ModifyRPCErrorDetails{
1✔
576
                        Reason: spb.ModifyRPCErrorDetails_UNSUPPORTED_PARAMS,
1✔
577
                })
1✔
578
        }
1✔
579

580
        // The fake server does not (currently) support delete, so we just return an error
581
        // if the client is asking for anything other than persisting the entries.
582
        if p.Persistence == spb.SessionParameters_DELETE {
2✔
583
                return nil, addModifyErrDetailsOrReturn(status.Newf(codes.Unimplemented, "persistence modes other than PRESERVE are not supported"), &spb.ModifyRPCErrorDetails{
1✔
584
                        Reason: spb.ModifyRPCErrorDetails_UNSUPPORTED_PARAMS,
1✔
585
                })
1✔
586
        }
1✔
587

588
        cp := &clientParams{
1✔
589
                FIBAck:       p.GetAckType() == spb.SessionParameters_RIB_AND_FIB_ACK,
1✔
590
                ExpectElecID: p.GetRedundancy() == spb.SessionParameters_SINGLE_PRIMARY,
1✔
591
                Persist:      p.GetPersistence() == spb.SessionParameters_PRESERVE,
1✔
592
        }
1✔
593

1✔
594
        consistent, err := s.checkClientsConsistent(id, cp)
1✔
595
        if err != nil {
1✔
596
                return nil, status.Newf(codes.Internal, "got unexpected error checking for consistency, %v", err).Err()
×
597
        }
×
598
        if !consistent {
2✔
599
                return nil, addModifyErrDetailsOrReturn(status.Newf(codes.FailedPrecondition, "client %s is not consistent with other clients", id), &spb.ModifyRPCErrorDetails{
1✔
600
                        Reason: spb.ModifyRPCErrorDetails_PARAMS_DIFFER_FROM_OTHER_CLIENTS,
1✔
601
                })
1✔
602
        }
1✔
603

604
        if err := s.setClientParams(id, cp); err != nil {
1✔
605
                return nil, status.Errorf(codes.Internal, "internal error setting parameters, %v", err)
×
606
        }
×
607

608
        return &spb.ModifyResponse{
1✔
609
                SessionParamsResult: &spb.SessionParametersResult{
1✔
610
                        Status: spb.SessionParametersResult_OK,
1✔
611
                },
1✔
612
        }, nil
1✔
613
}
614

615
// checkClientsConsistent ensures that the client described by id and the parameters p is consistent
616
// with other clients in the server.
617
func (s *Server) checkClientsConsistent(id string, p *clientParams) (bool, error) {
1✔
618
        if p == nil {
2✔
619
                return false, fmt.Errorf("unexpected nil parameters for client %s", id)
1✔
620
        }
1✔
621

622
        s.csMu.RLock()
1✔
623
        defer s.csMu.RUnlock()
1✔
624
        for cid, state := range s.cs {
2✔
625
                if id == cid {
2✔
626
                        continue
1✔
627
                }
628
                if state == nil || state.params == nil {
2✔
629
                        return false, fmt.Errorf("client %s has invalid nil parameter state", cid)
1✔
630
                }
1✔
631

632
                if !state.params.Equal(p) {
2✔
633
                        return false, nil
1✔
634
                }
1✔
635
        }
636
        return true, nil
1✔
637
}
638

639
// setClientParams sets the parameters for client with id to have the specified parameters.
640
func (s *Server) setClientParams(id string, p *clientParams) error {
1✔
641
        s.csMu.Lock()
1✔
642
        defer s.csMu.Unlock()
1✔
643
        if s.cs[id] == nil {
1✔
644
                return fmt.Errorf("cannot find client %s, known clients: %v", id, s.cs)
×
645
        }
×
646
        s.cs[id].params = p
1✔
647
        return nil
1✔
648
}
649

650
// isNewMaster takes two election IDs and determines whether the candidate (cand) is a new
651
// master given the existing master (exist). It returns:
652
//   - a bool which indicates whether the candidate is the new master
653
//   - a bool which indicates whether this is an election ID with the same election ID
654
//   - an error indicating whether this was an invalid update
655
func isNewMaster(cand, exist *spb.Uint128) (bool, bool, error) {
1✔
656
        if exist == nil {
2✔
657
                return true, false, nil
1✔
658
        }
1✔
659
        if cand.High > exist.High {
2✔
660
                return true, false, nil
1✔
661
        }
1✔
662
        if cand.Low > exist.Low {
2✔
663
                return true, false, nil
1✔
664
        }
1✔
665

666
        // Per comments in gribi.proto - if the two values are equal, then we accept the new
667
        // candidate as the master, this allows for reconnections.
668
        if cand.High == exist.High && cand.Low == exist.Low {
2✔
669
                return true, true, nil
1✔
670
        }
1✔
671
        // TODO(robjs): currently this is not specified in the spec, since this is the
672
        // election ID going backwards, but it seems like we could not return an error here
673
        // since it might just be a stale client telling us that they think that they're the
674
        // master. However, we could return an error just to say "hey, you're somehow out of sync
675
        // with the master election system".
676
        return false, false, nil
1✔
677
}
678

679
// getClientState returns the client state for the client with the specified id, along with
680
// whether the client was found.
681
func (s *Server) getClientState(id string) (*clientState, bool) {
1✔
682
        s.csMu.RLock()
1✔
683
        defer s.csMu.RUnlock()
1✔
684
        cs, ok := s.cs[id]
1✔
685
        return cs, ok
1✔
686
}
1✔
687

688
// storeClientElectionID stores the latest election ID for a client into the
689
// server specified. It returns true if the election ID was stored.
690
func (s *Server) storeClientElectionID(id string, elecID *spb.Uint128) bool {
1✔
691
        s.csMu.Lock()
1✔
692
        defer s.csMu.Unlock()
1✔
693
        cs, ok := s.cs[id]
1✔
694
        if !ok {
1✔
695
                return false
×
696
        }
×
697
        cs.lastElecID = elecID
1✔
698
        return true
1✔
699
}
700

701
// getClientStateCopy returns a copy of the state for the client with the specified ID, since
702
// the state of a client is immutable after the initial creation, we never allow a client to
703
// get a copy of the pointer so that they could change this. It returns a copy of the clientState
704
// an error.
705
func (s *Server) getClientStateCopy(id string) (*clientState, error) {
1✔
706
        s.csMu.RLock()
1✔
707
        defer s.csMu.RUnlock()
1✔
708
        if s.cs[id] == nil {
1✔
709
                return nil, fmt.Errorf("unknown client %s", id)
×
710
        }
×
711
        return s.cs[id].DeepCopy(), nil
1✔
712
}
713

714
// runElection runs an election on the server and checks whether the client with the specified id is
715
// the new master.
716
func (s *Server) runElection(id string, elecID *spb.Uint128) (*spb.ModifyResponse, error) {
1✔
717
        cs, err := s.getClientStateCopy(id)
1✔
718
        if err != nil {
1✔
719
                return nil, err
×
720
        }
×
721
        if !cs.params.ExpectElecID {
2✔
722
                return nil, addModifyErrDetailsOrReturn(status.Newf(codes.FailedPrecondition, "client ID %s does not expect elections", id), &spb.ModifyRPCErrorDetails{
1✔
723
                        Reason: spb.ModifyRPCErrorDetails_ELECTION_ID_IN_ALL_PRIMARY,
1✔
724
                })
1✔
725
        }
1✔
726

727
        // If the election ID that we received is 0, then this is an invalid value
728
        // in the input message, return an error to the client.
729
        if inputID, zero := uint128.New(elecID.Low, elecID.High), uint128.New(0, 0); zero.Cmp(inputID) == 0 {
2✔
730
                return nil, status.Newf(codes.InvalidArgument, "client ID %s, zero is an invalid election ID", id).Err()
1✔
731
        }
1✔
732

733
        // At this point, we store the latest election ID that we've seen from this
734
        // client, even if it does not win the election. This allows us to check
735
        // that the client has the same election ID as it has reported to us in
736
        // subsequent transactions.
737
        if !s.storeClientElectionID(id, elecID) {
1✔
738
                return nil, status.Newf(codes.Internal, "cannot store election ID %s for client %s", elecID, id).Err()
×
739
        }
×
740

741
        s.elecMu.RLock()
1✔
742
        defer s.elecMu.RUnlock()
1✔
743
        nm, _, err := isNewMaster(elecID, s.curElecID)
1✔
744
        if err != nil {
1✔
745
                return nil, err
×
746
        }
×
747

748
        if nm {
2✔
749
                s.curElecID = elecID
1✔
750
                s.curMaster = id
1✔
751
        }
1✔
752

753
        return &spb.ModifyResponse{
1✔
754
                ElectionId: s.curElecID,
1✔
755
        }, nil
1✔
756
}
757

758
// getElection returns the details of the current election on the server.
759
func (s *Server) getElection() *electionDetails {
1✔
760
        s.elecMu.RLock()
1✔
761
        defer s.elecMu.RUnlock()
1✔
762
        return &electionDetails{
1✔
763
                master: s.curMaster,
1✔
764
                ID:     s.curElecID,
1✔
765
        }
1✔
766
}
1✔
767

768
// doModify implements a modify operation for a specific input set of AFTOperation
769
// messages for the client with the specified cid. It writes the result to the supplied
770
// ModifyResponse channel when successful, or writes the error to the supplied errCh.
771
func (s *Server) doModify(cid string, ops []*spb.AFTOperation, resCh chan *spb.ModifyResponse, errCh chan error) {
1✔
772
        cs, ok := s.getClientState(cid)
1✔
773
        switch {
1✔
774
        case !ok:
1✔
775
                errCh <- status.Newf(codes.Internal, "operation received for unknown client, %s", cid).Err()
1✔
776
                return
1✔
777
        case cs.params == nil || !cs.params.ExpectElecID || !cs.params.Persist:
1✔
778
                // these are parameters that we do not support.
1✔
779
                errCh <- addModifyErrDetailsOrReturn(
1✔
780
                        status.New(codes.Unimplemented, "unsupported parameters for client"),
1✔
781
                        &spb.ModifyRPCErrorDetails{
1✔
782
                                Reason: spb.ModifyRPCErrorDetails_UNSUPPORTED_PARAMS,
1✔
783
                        })
1✔
784
                return
1✔
785
        }
786

787
        elec := s.getElection()
1✔
788
        elec.clientLatest = cs.lastElecID
1✔
789
        elec.client = cid
1✔
790

1✔
791
        for _, o := range ops {
2✔
792
                ni := o.GetNetworkInstance()
1✔
793
                if ni == "" {
1✔
794
                        resCh <- &spb.ModifyResponse{
×
795
                                Result: []*spb.AFTResult{{
×
796
                                        Id:     o.Id,
×
797
                                        Status: spb.AFTResult_FAILED,
×
798
                                        ErrorDetails: &spb.AFTErrorDetails{
×
799
                                                ErrorMessage: `invalid network instance name "" specified`,
×
800
                                        },
×
801
                                }},
×
802
                        }
×
803
                }
×
804
                if _, ok := s.masterRIB.NetworkInstanceRIB(ni); !ok {
2✔
805
                        // this is an unknown network instance, we should not return
1✔
806
                        // an error to the client since we do not want the connection
1✔
807
                        // to be torn down.
1✔
808
                        log.Errorf("rejected operation %s since it is an unknown network-instance, %s", o, ni)
1✔
809
                        resCh <- &spb.ModifyResponse{
1✔
810
                                Result: []*spb.AFTResult{{
1✔
811
                                        Id:     o.Id,
1✔
812
                                        Status: spb.AFTResult_FAILED,
1✔
813
                                        ErrorDetails: &spb.AFTErrorDetails{
1✔
814
                                                ErrorMessage: fmt.Sprintf(`unknown network instance "%s" specified`, ni),
1✔
815
                                        },
1✔
816
                                }},
1✔
817
                        }
1✔
818
                        return
1✔
819
                }
1✔
820

821
                // We do not try and modify entries within the operation in parallel
822
                // with each other since this may cause us to duplicate ACK on particular
823
                // operations - for example, if there are two next-hops that are within a
824
                // single next-hop-group, then both of them will cause the NHG to be
825
                // installed in the RIB. If we do this then we might end up ACKing
826
                // one twice if there was >1 different entry that mde a NHG resolvable.
827
                // For a SINGLE_PRIMARY client serialising into a single Modify channel
828
                // ensures that we do not end up with this occuring - but going forward
829
                // for ALL_PRIMARY this situation will need to handled likely by creating
830
                // some form of lock on each transaction as it is attempted, or building
831
                // a more intelligent RIB structure to track missing dependencies.
832
                res, err := modifyEntry(s.masterRIB, ni, o, cs.params.FIBAck, elec)
1✔
833
                switch {
1✔
834
                case err != nil:
×
835
                        errCh <- err
×
836
                default:
1✔
837
                        resCh <- res
1✔
838
                }
839
        }
840
}
841

842
// electionDetails provides a summary of a single election from the perspective of one client.
843
type electionDetails struct {
844
        // master is the clientID of the client that is master after the election.
845
        master string
846
        // electionID is the ID of the latest election.
847
        ID *spb.Uint128
848
        // client is the ID of the client that the query is being done on behalf of.
849
        client string
850
        // clientLatest is the latest electionID that the client provided us with.
851
        clientLatest *spb.Uint128
852
}
853

854
// modifyEntry performs the specified modify operation, op, on the RIB, r, within the network
855
// instance ni. The client's request ACK mode is specified by fibACK. The details of the
856
// current election on the server is described in election.
857
// The results are returned as a ModifyResponse and an error which must be a status.Status.
858
func modifyEntry(r *rib.RIB, ni string, op *spb.AFTOperation, fibACK bool, election *electionDetails) (*spb.ModifyResponse, error) {
1✔
859
        if op == nil {
2✔
860
                return nil, status.Newf(codes.Internal, "invalid nil operation received").Err()
1✔
861
        }
1✔
862

863
        res, ok, err := checkElectionForModify(op.Id, op.ElectionId, election)
1✔
864
        if err != nil {
2✔
865
                return nil, err
1✔
866
        }
1✔
867
        if !ok {
2✔
868
                return res, err
1✔
869
        }
1✔
870

871
        if r == nil {
1✔
872
                return nil, status.New(codes.Internal, "invalid RIB state").Err()
×
873
        }
×
874

875
        niR, ok := r.NetworkInstanceRIB(ni)
1✔
876
        if !ok || !niR.IsValid() {
1✔
877
                return nil, status.Newf(codes.Internal, "invalid RIB state for network instance name: '%s'", ni).Err()
×
878
        }
×
879

880
        results := []*spb.AFTResult{}
1✔
881

1✔
882
        var (
1✔
883
                oks, faileds []*rib.OpResult
1✔
884
                ribFatalErr  error
1✔
885
        )
1✔
886

1✔
887
        switch op.Op {
1✔
888
        case spb.AFTOperation_ADD, spb.AFTOperation_REPLACE:
1✔
889
                // AddEntry handles replaces, since an ADD can be an explicit replace. It checks
1✔
890
                // whether the entry was an explicit replace from the op, and if so errors if the
1✔
891
                // entry does not already exist.
1✔
892
                log.V(2).Infof("calling AddEntry for operation ID %d", op.GetId())
1✔
893
                oks, faileds, ribFatalErr = r.AddEntry(ni, op)
1✔
894
        case spb.AFTOperation_DELETE:
1✔
895
                oks, faileds, ribFatalErr = r.DeleteEntry(ni, op)
1✔
896
        default:
1✔
897
                return &spb.ModifyResponse{
1✔
898
                        Result: []*spb.AFTResult{
1✔
899
                                {
1✔
900
                                        Id:     op.GetId(),
1✔
901
                                        Status: spb.AFTResult_FAILED,
1✔
902
                                        ErrorDetails: &spb.AFTErrorDetails{
1✔
903
                                                ErrorMessage: fmt.Sprintf("unsupported operation type supplied, %s", op.Op),
1✔
904
                                        },
1✔
905
                                },
1✔
906
                        },
1✔
907
                }, nil
1✔
908
        }
909

910
        if ribFatalErr != nil {
1✔
911
                // RIB action returned fatal error for the connection.
×
912
                return nil, addModifyErrDetailsOrReturn(
×
913
                        status.Newf(codes.Unimplemented, "fatal error processing operation %s, error: %v", op.Op, ribFatalErr),
×
914
                        &spb.ModifyRPCErrorDetails{
×
915
                                Reason: spb.ModifyRPCErrorDetails_UNKNOWN,
×
916
                        },
×
917
                )
×
918
        }
×
919

920
        for _, ok := range oks {
2✔
921
                log.V(2).Infof("received OK for %d in operation %s", ok.ID, prototext.Format(op))
1✔
922
                results = append(results, &spb.AFTResult{
1✔
923
                        Id:     ok.ID,
1✔
924
                        Status: spb.AFTResult_RIB_PROGRAMMED,
1✔
925
                })
1✔
926
                // For RIB_AND_FIB_ACK we sent both the RIB programmed and FIB programmed
1✔
927
                // signals back.
1✔
928
                //
1✔
929
                // TODO(robjs): Currently, we just say everything that was RIB programmed was
1✔
930
                // FIB programmed. Add a feedback loop for this.
1✔
931
                if fibACK {
2✔
932
                        results = append(results, &spb.AFTResult{
1✔
933
                                Id:     ok.ID,
1✔
934
                                Status: spb.AFTResult_FIB_PROGRAMMED,
1✔
935
                        })
1✔
936
                }
1✔
937
        }
938

939
        for _, fail := range faileds {
2✔
940
                log.Errorf("returning failed to client because the RIB declared it failed, %v", fail)
1✔
941
                results = append(results, &spb.AFTResult{
1✔
942
                        Id:     fail.ID,
1✔
943
                        Status: spb.AFTResult_FAILED,
1✔
944
                        ErrorDetails: &spb.AFTErrorDetails{
1✔
945
                                ErrorMessage: fail.Error,
1✔
946
                        },
1✔
947
                })
1✔
948
        }
1✔
949

950
        return &spb.ModifyResponse{
1✔
951
                Result: results,
1✔
952
        }, nil
1✔
953
}
954

955
// checkElectionForModify checks whether the operation with ID opID, and election ID opElecID
956
// with the server that has the election context described by election should proceed. It returns
957
//   - a ModifyResponse which is to be sent to the client
958
//   - a bool determining whether the modify should proceed
959
//   - an error that is returned to the client.
960
//
961
// The bool is set to true in the case that a non-fatal error (e.g., an error whereby the
962
// operation is just ignored) is encountered.
963
// Any returned error is considered fatal to the Modify RPC and can be sent directly back
964
// to the client.
965
func checkElectionForModify(opID uint64, opElecID *spb.Uint128, election *electionDetails) (*spb.ModifyResponse, bool, error) {
1✔
966
        // check whether the election ID is the current one.
1✔
967
        if opElecID == nil {
2✔
968
                // this is an error since we only support election IDs.
1✔
969
                return nil, false, addModifyErrDetailsOrReturn(
1✔
970
                        status.Newf(codes.FailedPrecondition, "no specified election ID when it was required"),
1✔
971
                        &spb.ModifyRPCErrorDetails{
1✔
972
                                // TODO(robjs): we probably need to define what happens here, no specified error code.
1✔
973
                        })
1✔
974
        }
1✔
975

976
        switch {
1✔
977
        case election == nil, election.master == "", election.ID == nil:
1✔
978
                return nil, false, status.Newf(codes.Internal, "invalid election state in server, details of election: %+v", election).Err()
1✔
979
        case election.clientLatest == nil:
1✔
980
                // This client might not have sent us an election ID yet, which means that they are in
1✔
981
                // the wrong.
1✔
982
                return nil, false, addModifyErrDetailsOrReturn(
1✔
983
                        status.Newf(codes.FailedPrecondition, "client has not yet specified an election ID"),
1✔
984
                        &spb.ModifyRPCErrorDetails{},
1✔
985
                )
1✔
986
        case election.client != election.master:
1✔
987
                // this client is not the elected master.
1✔
988
                log.Errorf("returning failed to client %s (id: %s), because they are not the elected master (%s is, id: %s)", election.client, election.clientLatest, election.master, election.ID)
1✔
989
                return &spb.ModifyResponse{
1✔
990
                        Result: []*spb.AFTResult{{
1✔
991
                                Id:     opID,
1✔
992
                                Status: spb.AFTResult_FAILED,
1✔
993
                        }},
1✔
994
                }, false, nil
1✔
995
        }
996

997
        thisID := uint128.New(opElecID.Low, opElecID.High)
1✔
998
        // check that this client sent us the same ID as we had before.
1✔
999
        currentClientID := uint128.New(election.clientLatest.Low, election.clientLatest.High)
1✔
1000
        if thisID.Cmp(currentClientID) != 0 {
2✔
1001
                log.Errorf("returning failed to client because operation election ID %s != their latest election ID %s (master is: %s with ID %s)", opElecID, election.clientLatest, election.master, election.ID)
1✔
1002
                return &spb.ModifyResponse{
1✔
1003
                        Result: []*spb.AFTResult{{
1✔
1004
                                Id:     opID,
1✔
1005
                                Status: spb.AFTResult_FAILED,
1✔
1006
                        }},
1✔
1007
                }, false, nil
1✔
1008
        }
1✔
1009

1010
        // This is a belt and braces check -- it's not clear that we need to do it. Since we
1011
        // checked that the master that is stored in the server is this client. However, we do
1012
        // an additional check to ensure that the current ID that we are storing is definitely
1013
        // the same as the one that we just received.
1014
        currentID := uint128.New(election.ID.Low, election.ID.High)
1✔
1015
        switch {
1✔
1016
        case thisID.Cmp(currentID) > 0:
1✔
1017
                // this value is greater than the known master ID. Return an error
1✔
1018
                return nil, false, status.Newf(codes.FailedPrecondition, "specified election ID was greater than existing election, %s > %s", thisID, currentID).Err()
1✔
1019
        case thisID.Cmp(currentID) < 0:
1✔
1020
                // this value is less as the current master ID, ignore this transaction.
1✔
1021
                // note that since we don't respond here, the client at the other side
1✔
1022
                // is just going to have a pending transaction forever.
1✔
1023
                //
1✔
1024
                // TODO(robjs): should we add an error code here?
1✔
1025
                log.Errorf("returning failed to client because operation election ID %s < the master election ID %s (master: %s)", opElecID, election.ID, election.master)
1✔
1026
                return &spb.ModifyResponse{
1✔
1027
                        Result: []*spb.AFTResult{{
1✔
1028
                                Id:     opID,
1✔
1029
                                Status: spb.AFTResult_FAILED,
1✔
1030
                        }},
1✔
1031
                }, false, nil
1✔
1032
        }
1033
        return nil, true, nil
1✔
1034
}
1035

1036
// doGet implements the Get RPC for the gRIBI server. It handles the input GetRequest, writing
1037
// the set of GetResponses to the specified msgCh. When the Get is done, the function writes to
1038
// doneCh such that the caller knows that the work that is being done is complete. If a message
1039
// is received on stopCh the function returns. Any errors that are experienced are written to
1040
// errCh.
1041
func (s *Server) doGet(req *spb.GetRequest, msgCh chan *spb.GetResponse, doneCh, stopCh chan struct{}, errCh chan error) {
1✔
1042
        // Any time we return we return we tell the done channel that we're complete.
1✔
1043
        defer func() {
2✔
1044
                doneCh <- struct{}{}
1✔
1045
        }()
1✔
1046

1047
        if req == nil {
1✔
1048
                errCh <- status.Errorf(codes.InvalidArgument, "invalid nil GetRequest received")
×
1049
                return
×
1050
        }
×
1051

1052
        netInstances := []string{}
1✔
1053
        switch nireq := req.NetworkInstance.(type) {
1✔
1054
        case *spb.GetRequest_Name:
1✔
1055
                if nireq.Name == "" {
2✔
1056
                        errCh <- status.Errorf(codes.InvalidArgument, `invalid string "" returned for NetworkInstance name in GetRequest`)
1✔
1057
                        return
1✔
1058
                }
1✔
1059
                netInstances = append(netInstances, nireq.Name)
1✔
1060
        case *spb.GetRequest_All:
1✔
1061
                netInstances = s.masterRIB.KnownNetworkInstances()
1✔
1062
        }
1063

1064
        filter := map[spb.AFTType]bool{}
1✔
1065
        switch v := req.Aft; v {
1✔
1066
        case spb.AFTType_ALL, spb.AFTType_IPV4, spb.AFTType_NEXTHOP, spb.AFTType_NEXTHOP_GROUP, spb.AFTType_MPLS, spb.AFTType_IPV6:
1✔
1067
                filter[v] = true
1✔
1068
        default:
1✔
1069
                errCh <- status.Errorf(codes.Unimplemented, "AFTs other than IPv4, MPLS, IPv6, NHG and NH are unimplemented, requested: %s", v)
1✔
1070
        }
1071

1072
        for _, ni := range netInstances {
2✔
1073
                netInst, ok := s.masterRIB.NetworkInstanceRIB(ni)
1✔
1074
                if !ok {
1✔
1075
                        errCh <- status.Errorf(codes.InvalidArgument, "invalid network instance %s specified", ni)
×
1076
                        return
×
1077
                }
×
1078

1079
                if err := netInst.GetRIB(filter, msgCh, stopCh); err != nil {
1✔
1080
                        errCh <- err
×
1081
                        return
×
1082
                }
×
1083
        }
1084
}
1085

1086
// addFlushErrDetailsOrReturn
1087
func addFlushErrDetailsOrReturn(s *status.Status, d *spb.FlushResponseError) error {
1✔
1088
        se, err := s.WithDetails(d)
1✔
1089
        if err != nil {
1✔
1090
                return s.Err()
×
1091
        }
×
1092
        return se.Err()
1✔
1093
}
1094

1095
// checkFlushRequest ensures that the FlushRequest that was supplied from a client is valid - particularly,
1096
// validating that the election parameters are consistent, and correct, and that the Flush should be
1097
// completed.
1098
func (s *Server) checkFlushRequest(req *spb.FlushRequest) error {
1✔
1099
        switch {
1✔
1100
        case req == nil:
1✔
1101
                return status.Newf(codes.Internal, "FlushRequest can not be nil").Err()
1✔
1102
        case req.GetNetworkInstance() == nil:
1✔
1103
                return addFlushErrDetailsOrReturn(status.Newf(codes.InvalidArgument, "unspecified network instance"), &spb.FlushResponseError{
1✔
1104
                        Status: spb.FlushResponseError_UNSPECIFIED_NETWORK_INSTANCE,
1✔
1105
                })
1✔
1106
        case req.GetOverride() != nil:
1✔
1107
                // The election ID should not be compared, regardless of whether
1✔
1108
                // there are SINGLE_PRIMARY clients on the server.
1✔
1109
                return nil
1✔
1110
        }
1111

1112
        id := req.GetId()
1✔
1113
        switch {
1✔
1114
        case id == nil && s.curElecID == nil:
1✔
1115
                // We are in ALL_PRIMARY mode and not given an election ID, which is fine.
1✔
1116
                return nil
1✔
1117
        case id == nil && s.curElecID != nil:
1✔
1118
                // We are in SINGLE_PRIMARY mode but we were not given an election behaviour.
1✔
1119
                return addFlushErrDetailsOrReturn(status.Newf(codes.FailedPrecondition, "unsupported election behaviour, client in SINGLE_PRIMARY mode"), &spb.FlushResponseError{
1✔
1120
                        Status: spb.FlushResponseError_UNSPECIFIED_ELECTION_BEHAVIOR,
1✔
1121
                })
1✔
1122
        case id != nil && s.curElecID == nil:
1✔
1123
                return addFlushErrDetailsOrReturn(status.Newf(codes.FailedPrecondition, "received election ID in ALL_PRIMARY mode"), &spb.FlushResponseError{
1✔
1124
                        Status: spb.FlushResponseError_ELECTION_ID_IN_ALL_PRIMARY,
1✔
1125
                })
1✔
1126
        }
1127

1128
        // If the Flush specified an ID, then we need to check that it is valid according
1129
        // to the logic that is defined in the specification - it must be either equal to
1130
        // or higher than the value that is the current master,
1131
        candidate := uint128.New(id.Low, id.High)
1✔
1132

1✔
1133
        if uint128.New(0, 0).Equals(candidate) {
2✔
1134
                return addFlushErrDetailsOrReturn(status.Newf(codes.InvalidArgument, "zero is an invalid election ID"), &spb.FlushResponseError{
1✔
1135
                        Status: spb.FlushResponseError_INVALID_ELECTION_ID,
1✔
1136
                })
1✔
1137
        }
1✔
1138

1139
        existing := uint128.New(s.curElecID.Low, s.curElecID.High)
1✔
1140
        if candidate.Cmp(existing) < 0 {
2✔
1141
                return addFlushErrDetailsOrReturn(status.Newf(codes.FailedPrecondition, "election ID specified (%v) is not primary", candidate), &spb.FlushResponseError{
1✔
1142
                        Status: spb.FlushResponseError_NOT_PRIMARY,
1✔
1143
                })
1✔
1144
        }
1✔
1145

1146
        return nil
1✔
1147
}
1148

1149
// FakeServer is a wrapper around the server with functions to enable testing
1150
// to be performed more easily, for example, injecting specific state.
1151
type FakeServer struct {
1152
        *Server
1153
}
1154

1155
// NewFake returns a new version of the fake server. This implementation wraps
1156
// the Server implementation with functions to insert specific state into
1157
// the server without a need to use the public APIs.
1158
func NewFake(opt ...ServerOpt) (*FakeServer, error) {
1✔
1159
        s, err := New(opt...)
1✔
1160
        // Because we embed the unimplemented gRIBI server in the Server, and
1✔
1161
        // then further embed this, then we need to initialise it to make sure that
1✔
1162
        // it is not nil. This keeps checks on unimplemented methods happy. gRPC
1✔
1163
        // checks this itself so will cause panics even if we do implement all methods.
1✔
1164
        s.UnimplementedGRIBIServer = &spb.UnimplementedGRIBIServer{}
1✔
1165
        if err != nil {
1✔
1166
                return nil, err
×
1167
        }
×
1168
        return &FakeServer{Server: s}, nil
1✔
1169
}
1170

1171
// InjectRIB allows a client to inject a RIB into the server as though it was
1172
// received from the master client.
1173
func (f *FakeServer) InjectRIB(r *rib.RIB) {
1✔
1174
        f.Server.masterRIB = r
1✔
1175
}
1✔
1176

1177
// InjectElectionID allows a client to set an initial election ID on the server as
1178
// though it was received from the client.
1179
func (f *FakeServer) InjectElectionID(id *spb.Uint128) {
×
1180
        f.Server.curElecID = id
×
1181
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc