• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

openconfig / gribigo / 10624416804

29 Aug 2024 11:16PM UTC coverage: 73.752% (+0.06%) from 73.697%
10624416804

push

github

web-flow
Re-anchor comments in server/server.go. (#243)

* Re-anchor comments in server/server.go.

* Fix staticcheck errors regarding Print vs. Printf.

* Add `RequiresIPv6` to tests per #244.

1 of 1 new or added line in 1 file covered. (100.0%)

5 existing lines in 2 files now uncovered.

6395 of 8671 relevant lines covered (73.75%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.28
/client/gribiclient.go
1
// Copyright 2021 Google LLC
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//      http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
// Package client defines a gRIBI stub client implementation in Go.
16
package client
17

18
import (
19
        "bytes"
20
        "context"
21
        "crypto/tls"
22
        "errors"
23
        "fmt"
24
        "io"
25
        "sort"
26
        "sync"
27
        "time"
28

29
        log "github.com/golang/glog"
30
        "github.com/google/uuid"
31
        "github.com/openconfig/gnmi/errlist"
32
        "github.com/openconfig/gribigo/constants"
33
        "go.uber.org/atomic"
34
        "google.golang.org/grpc"
35
        "google.golang.org/grpc/credentials"
36
        "lukechampine.com/uint128"
37

38
        spb "github.com/openconfig/gribi/v1/proto/service"
39
)
40

41
var (
42
        // debug enables detailed debug reporting throughout the client.
43
        debug = false
44
        // modifyBuffer specifies the depth of the modify channel's buffer.
45
        modifyBuffer = 5
46
)
47

48
var (
49
        // unixTS is a function that returns a timestamp in nanoseconds for the current time.
50
        // It can be overloaded in unit tests to ensure that deterministic output is received.
51
        unixTS = time.Now().UnixNano
52

53
        // BusyLoopDelay specifies a delay that should be used when looping around pending
54
        // transactions. By default, the client tries not to busy loop, but this can introduce
55
        // artificial delays when benchmarking server operations. It is NOT RECOMMENDED to
56
        // change the default value other than when measuring convergence time at the client.
57
        BusyLoopDelay = 100 * time.Millisecond
58

59
        // TreatRIBACKAsCompletedInFIBACKMode allows the caller to modify the client behaviour
60
        // to treat a RIB ACK from the target as the end of an operation even though the session
61
        // has requested FIB_ACK. This allows benchmarking of the time to receive a RIB_ACK from
62
        // a server in FIB_ACK mode. In production clients that require FIB_ACK it is ACTIVELY
63
        // HARMFUL to change this value, since the client will consider a server to have converged
64
        // on receipt of a RIB_ACK despite the fact that the session requested FIB_ACK.
65
        TreatRIBACKAsCompletedInFIBACKMode = false
66
)
67

68
// Client is a wrapper for the gRIBI client.
69
type Client struct {
70
        // state stores the configured state of the client, used to store
71
        // initial immutable parameters, as well as mutable parameters that
72
        // may change during the clients's lifetime.
73
        state *clientState
74

75
        // c is the current gRIBI client.
76
        c spb.GRIBIClient
77
        // conn is the current gRPC connection that the client uses.
78
        conn *grpc.ClientConn
79

80
        // qs is the set of queues that are associated with the current
81
        // client.
82
        qs *clientQs
83

84
        // shut indicates that RPCs should continue to run, when set
85
        // to true, all goroutines that are serving RPCs shut down.
86
        shut *atomic.Bool
87

88
        // sendErrMu protects the sendErr slice.
89
        sendErrMu sync.RWMutex
90
        // sErr stores sending errors that cause termination of the sending
91
        // GoRoutine.
92
        sendErr []error
93

94
        // readErrMu protects the readErr slice.
95
        readErrMu sync.RWMutex
96
        // rErr stores receiving errors that cause termination of the receiving
97
        // GoRoutine.
98
        readErr []error
99

100
        // awaiting is a read-write mutex that is used to ensure that we know
101
        // when any operation that might affect the convergence state of the
102
        // client is in-flight. Functions that are performing operations that may
103
        // result in a change take a read-lock, functions that are dependent upon
104
        // the client being in a consistent state take a write-lock on this mutex.
105
        awaiting sync.RWMutex
106

107
        // wg tells disconnect() when all the goroutines started by Connect() have exited.
108
        wg sync.WaitGroup
109

110
        // doneCh is a channel that is written to when the client disconnects, it can be
111
        // used by a caller to ensure that the client reconnects.
112
        doneCh chan struct{}
113

114
        // sendExitCh is a channel that is used to indicate that the sender for the
115
        // client is exited, such that other goroutines can clean up.
116
        sendExitCh chan struct{}
117
}
118

119
// clientState is used to store the configured (immutable) state of the client.
120
type clientState struct {
121
        // sessParams stores the session parameters that are associated with
122
        // the client, they will be resent as the first message if the client
123
        // reconnects to the server.
124
        SessParams *spb.SessionParameters
125
        // electionID stores the current election ID for this client in the
126
        // case that the client participates in a group of SINGLE_PRIMARY
127
        // clients.
128
        ElectionID *spb.Uint128
129
}
130

131
// Opt is an interface that is implemented for all options that
132
// can be supplied when creating a new client. This captures parameters
133
// that are sent at the start of a gRIBI session that
134
type Opt interface {
135
        isClientOpt()
136
}
137

138
// New creates a new gRIBI client with the specified set of options. The options
139
// provided control parameters that live for the lifetime of the client, such as those
140
// that are within the session parameters. A new client, or error, is returned.
141
func New(opts ...Opt) (*Client, error) {
1✔
142
        c := &Client{
1✔
143
                shut:   atomic.NewBool(false),
1✔
144
                doneCh: make(chan struct{}, 1),
1✔
145
        }
1✔
146

1✔
147
        s, err := handleParams(opts...)
1✔
148
        if err != nil {
1✔
149
                return nil, fmt.Errorf("cannot create client, error with parameters, %v", err)
×
150
        }
×
151
        c.state = s
1✔
152

1✔
153
        c.qs = &clientQs{
1✔
154
                sendq: []*spb.ModifyRequest{},
1✔
155
                pendq: &pendingQueue{
1✔
156
                        Ops: map[uint64]*PendingOp{},
1✔
157
                },
1✔
158

1✔
159
                // modifyCh is buffered to ensure that there are no races writing to it - we
1✔
160
                // expect that 5 messages is sufficient to ensure that there is time for the
1✔
161
                // sender to shutdown.
1✔
162
                modifyCh: make(chan *spb.ModifyRequest, modifyBuffer),
1✔
163
                resultq:  []*OpResult{},
1✔
164

1✔
165
                sending: atomic.NewBool(false),
1✔
166
        }
1✔
167

1✔
168
        return c, nil
1✔
169
}
170

171
// Done returns a channel which is written to when the client is disconnected from the
172
// server. It can be used to trigger reconnections.
173
func (c *Client) Done() <-chan struct{} {
1✔
174
        return c.doneCh
1✔
175
}
1✔
176

177
// handleParams takes the set of gRIBI client options that are provided and uses them
178
// to populate the session parameters that they are translated into. It returns a
179
// populate SessionParameters protobuf along with any errors when parsing the supplied
180
// set of options.
181
func handleParams(opts ...Opt) (*clientState, error) {
1✔
182
        if len(opts) == 0 {
2✔
183
                return &clientState{}, nil
1✔
184
        }
1✔
185
        s := &clientState{
1✔
186
                SessParams: &spb.SessionParameters{},
1✔
187
        }
1✔
188

1✔
189
        for _, o := range opts {
2✔
190
                switch v := o.(type) {
1✔
191
                case *allPrimaryClients:
1✔
192
                        if s.ElectionID != nil {
2✔
193
                                return nil, fmt.Errorf("cannot specify both ALL_PRIMARY and SINGLE_PRIMARY redundancy modes")
1✔
194
                        }
1✔
195
                        s.SessParams.Redundancy = spb.SessionParameters_ALL_PRIMARY
1✔
196
                case *electedPrimaryClient:
1✔
197
                        s.SessParams.Redundancy = spb.SessionParameters_SINGLE_PRIMARY
1✔
198
                        s.ElectionID = v.electionID
1✔
199
                case *persistEntries:
1✔
200
                        s.SessParams.Persistence = spb.SessionParameters_PRESERVE
1✔
201
                case *fibACK:
×
202
                        s.SessParams.AckType = spb.SessionParameters_RIB_AND_FIB_ACK
×
203
                }
204
        }
205

206
        return s, nil
1✔
207
}
208

209
// DialOpt specifies options that can be used when dialing the gRIBI server
210
// specified by the client.
211
type DialOpt interface {
212
        isDialOpt()
213
}
214

215
// Dial dials the server specified in the addr string, using the specified
216
// set of dial options.
217
func (c *Client) Dial(ctx context.Context, addr string, opts ...DialOpt) error {
1✔
218
        // TODO(robjs): translate any options within the dial options here, we may
1✔
219
        // want to consider just accepting some gRPC dialoptions directly.
1✔
220

1✔
221
        dialOpts := []grpc.DialOption{}
1✔
222
        tlsc := credentials.NewTLS(&tls.Config{
1✔
223
                InsecureSkipVerify: true,
1✔
224
        })
1✔
225
        dialOpts = append(dialOpts, grpc.WithTransportCredentials(tlsc))
1✔
226

1✔
227
        conn, err := grpc.NewClient(addr, dialOpts...)
1✔
228
        if err != nil {
1✔
229
                return fmt.Errorf("cannot dial remote system, %v", err)
×
230
        }
×
231
        c.conn = conn
1✔
232
        c.c = spb.NewGRIBIClient(conn)
1✔
233
        return nil
1✔
234
}
235

236
// UseStub instructs the client to use the gRPC stub implementing the
237
// GRIBIClient service.
238
func (c *Client) UseStub(stub spb.GRIBIClient) error {
×
239
        if c.c != nil {
×
240
                return errors.New("client already has a stub")
×
241
        }
×
242
        c.c = stub
×
243
        return nil
×
244
}
245

246
// ReplaceStub replaces the gRPC stub used by the client only if the client
247
// is not in a sending state.
248
func (c *Client) ReplaceStub(stub spb.GRIBIClient) error {
1✔
249
        c.qs.sendMu.Lock()
1✔
250
        defer c.qs.sendMu.Unlock()
1✔
251
        if c.qs.sending.Load() {
2✔
252
                return errors.New("must be in stopped sending state to replace stub")
1✔
253
        }
1✔
254

255
        c.c = stub
1✔
256
        return nil
1✔
257
}
258

259
// Reset clears the client's transient state - is is recommended to call this method between
260
// reconnections at a server to clear pending queues and results which are no longer valid.
261
func (c *Client) Reset() {
1✔
262
        c.StopSending()
1✔
263
        c.disconnect()
1✔
264

1✔
265
        c.sendErrMu.Lock()
1✔
266
        defer c.sendErrMu.Unlock()
1✔
267
        c.sendErr = nil
1✔
268

1✔
269
        c.readErrMu.Lock()
1✔
270
        defer c.readErrMu.Unlock()
1✔
271
        c.readErr = nil
1✔
272

1✔
273
        c.qs.sendMu.Lock()
1✔
274
        defer c.qs.sendMu.Unlock()
1✔
275
        c.qs.sendq = nil
1✔
276

1✔
277
        c.qs.pendMu.Lock()
1✔
278
        defer c.qs.pendMu.Unlock()
1✔
279
        c.qs.pendq = &pendingQueue{
1✔
280
                Ops: map[uint64]*PendingOp{},
1✔
281
        }
1✔
282

1✔
283
        c.qs.resultMu.Lock()
1✔
284
        defer c.qs.resultMu.Unlock()
1✔
285
        c.qs.resultq = nil
1✔
286

1✔
287
        c.qs.modifyCh = make(chan *spb.ModifyRequest, 5)
1✔
288

1✔
289
        // Empty the done channel if a reader did not take the message from it.
1✔
290
        select {
1✔
291
        case <-c.doneCh:
1✔
292
        default:
1✔
293
        }
294
}
295

296
// disconnect shuts down the goroutines started by Connect().
297
func (c *Client) disconnect() {
1✔
298
        skipClose := false
1✔
299
        if c.sendExitCh == nil || chIsClosed(c.sendExitCh) {
2✔
300
                skipClose = true
1✔
301
        }
1✔
302
        if !skipClose {
2✔
303
                // Close the modifyCh to signal to the request handler to exit.
1✔
304
                close(c.qs.modifyCh)
1✔
305
        }
1✔
306
        c.wg.Wait()
1✔
307
}
308

309
// Close disconnects the underlying gRPC connection to the gRIBI server.
310
func (c *Client) Close() error {
1✔
311
        c.disconnect()
1✔
312
        if c.conn == nil {
1✔
313
                return nil
×
314
        }
×
315
        return c.conn.Close()
1✔
316
}
317

318
// AllPrimaryClients is an option that is used when creating a new client that
319
// specifies that this client will be part of a group of clients that are all
320
// considered primary sources of routing information.
321
//
322
// In this mode of operation, the server is expected to store all entries that
323
// are written to it over gRIBI regardless of their source client. If multiple
324
// clients write the same entry, selection amongst them is done based on the
325
// lowest client identifier (the remote address when expressed as a 128-bit
326
// number) as per the explanation in
327
// https://github.com/openconfig/gribi/blob/master/doc/motivation.md#tying-injected-entries-to-the-liveliness-of-grpc-client
328
func AllPrimaryClients() *allPrimaryClients {
1✔
329
        return &allPrimaryClients{}
1✔
330
}
1✔
331

332
type allPrimaryClients struct{}
333

334
func (allPrimaryClients) isClientOpt() {}
×
335

336
// ElectedPrimaryClient is an option used when creating a new client that
337
// specifies that this client will be part of a group of clients elect a
338
// master amongst them, such that exactly one client is considered the primary
339
// source of routing information. The server is not expected to know any of
340
// the details of this election process - but arbitrates based on a supplied
341
// election ID (expressed as a 128-bit number). The client with the highest
342
// election ID is considered the primary and hence has active entries within
343
// the device's RIB.
344
//
345
// The initial election ID to be used is stored
346
func ElectedPrimaryClient(initialID *spb.Uint128) *electedPrimaryClient {
1✔
347
        return &electedPrimaryClient{electionID: initialID}
1✔
348
}
1✔
349

350
type electedPrimaryClient struct {
351
        electionID *spb.Uint128
352
}
353

354
func (electedPrimaryClient) isClientOpt() {}
×
355

356
// PersistEntries indicates that the client should request that the server
357
// persists entries after it has disconnected. By default, a gRIBI server will
358
// purge all entries when the client disconnects. This persistence mode may
359
// only be used when the redundancy mode is AllPrimaryClients.
360
func PersistEntries() *persistEntries {
1✔
361
        return &persistEntries{}
1✔
362
}
1✔
363

364
type persistEntries struct{}
365

366
func (persistEntries) isClientOpt() {}
×
367

368
// FIBACK indicates that the client should request that the server
369
// sends acknowledgements for gRIBI transactions only after they have been
370
// programmed into the FIB. By default, a gRIBI server will send an acknowledgement
371
// when the entry has been installed into the RIB.
372
func FIBACK() *fibACK {
×
373
        return &fibACK{}
×
374
}
×
375

376
type fibACK struct{}
377

378
func (fibACK) isClientOpt() {}
×
379

380
func debugWatcher(ctx context.Context, role, id string) {
1✔
381
        for {
2✔
382
                select {
1✔
383
                case <-ctx.Done():
1✔
384
                        log.Errorf("goroutine %s:%s: exiting at %s", role, id, time.Now())
1✔
385
                        return
1✔
386
                default:
1✔
387
                }
388
                log.Errorf("goroutine %s:%s: still running at %s", role, id, time.Now())
1✔
389
                time.Sleep(1 * time.Second)
1✔
390
        }
391
}
392

393
// Connect establishes a Modify RPC to the client and sends the initial session
394
// parameters/election ID if required. The Modify RPC is stored within the client
395
// such that it can be used for subsequent calls - such that Connect must be called
396
// before subsequent operations that should send to the target.
397
//
398
// An error is returned if the Modify RPC cannot be established or there is an error
399
// response to the initial messages that are sent on the connection.
400
func (c *Client) Connect(ctx context.Context) error {
1✔
401
        // Store that we are no longer shut down, since Connect can be called multiple
1✔
402
        // times on the same client.
1✔
403
        c.shut.Store(false)
1✔
404
        c.sendExitCh = make(chan struct{}, 1)
1✔
405

1✔
406
        stream, err := c.c.Modify(ctx)
1✔
407
        if err != nil {
2✔
408
                return fmt.Errorf("cannot open Modify RPC, %v", err)
1✔
409
        }
1✔
410

411
        // TODO(robjs): if we made these functions not niladic, and
412
        // supplied to the client, then we could allow the user to
413
        // overload functions that read and write - and hence do
414
        // lower-layer operations to the gRIBI server directly.
415
        // Modify this code to do this (make these just be default
416
        // handler functions, they could still be inline).
417

418
        informDone := func(who string) {
2✔
419
                select {
1✔
420
                case c.doneCh <- struct{}{}:
1✔
421
                        log.Infof("writing to done channel, requested by %s", who)
1✔
422
                default:
1✔
423
                        log.Infof("dropped message informing caller the client is done.")
1✔
424
                }
425
        }
426

427
        // respHandler takes a received modify response and error, and returns
428
        // a bool indicating that the loop within which it is called should
429
        // exit.
430
        respHandler := func(in *spb.ModifyResponse, err error) bool {
2✔
431
                log.V(2).Infof("received message on Modify stream: %s", in)
1✔
432
                c.awaiting.RLock()
1✔
433
                defer c.awaiting.RUnlock()
1✔
434
                if err == io.EOF {
2✔
435
                        // reading is done, so write should shut down too.
1✔
436
                        c.shut.Store(true)
1✔
437
                        return true
1✔
438
                }
1✔
439
                if err != nil {
2✔
440
                        log.Errorf("got error receiving message, %v", err)
1✔
441
                        c.addReadErr(err)
1✔
442
                        return true
1✔
443
                }
1✔
444
                if err := c.handleModifyResponse(in); err != nil {
1✔
445
                        log.Errorf("got error processing message received from server, %v", err)
×
446
                        c.addReadErr(err)
×
447
                        return true
×
448
                }
×
449

450
                return false
1✔
451
        }
452

453
        id := uuid.New().String()
1✔
454
        c.wg.Add(1)
1✔
455
        go func() {
2✔
456
                defer c.wg.Done()
1✔
457
                defer informDone("receiver")
1✔
458
                if debug {
2✔
459
                        debugCtx, cancel := context.WithCancel(ctx)
1✔
460
                        defer cancel()
1✔
461
                        go debugWatcher(debugCtx, "recv", id)
1✔
462
                }
1✔
463
                for {
2✔
464
                        if c.shut.Load() {
1✔
465
                                log.V(2).Infof("shutting down recv goroutine, id: %s, cause: SHUTDOWN", id)
×
466
                                return
×
467
                        }
×
468
                        if done := respHandler(stream.Recv()); done {
2✔
469
                                log.V(2).Infof("shuttting down recv goroutine, id: %s, cause: HANDLER", id)
1✔
470
                                return
1✔
471
                        }
1✔
472
                }
473
        }()
474

475
        // reqHandler handles an input modify request and returns a bool when
476
        // the loop within which it is called should exit.
477
        reqHandler := func(m *spb.ModifyRequest, readOK bool) bool {
2✔
478
                if !readOK {
2✔
479
                        // client close requested by disconnect(), since the modifyCh is closed.
1✔
480
                        if err := stream.CloseSend(); err != nil {
1✔
481
                                log.Errorf("got error closing session: %v", err)
×
482
                        }
×
483
                        return true
1✔
484
                }
485

486
                c.awaiting.RLock()
1✔
487
                defer c.awaiting.RUnlock()
1✔
488
                if err := stream.Send(m); err != nil {
2✔
489
                        log.Errorf("got error sending message: %v", err)
1✔
490
                        c.addSendErr(err)
1✔
491
                        return true
1✔
492
                }
1✔
493
                log.V(2).Infof("sent Modify message %s", m)
1✔
494
                return false
1✔
495
        }
496

497
        c.wg.Add(1)
1✔
498
        go func() {
2✔
499
                defer c.wg.Done()
1✔
500
                defer informDone("sender")
1✔
501
                if debug {
2✔
502
                        debugCtx, cancel := context.WithCancel(ctx)
1✔
503
                        defer cancel()
1✔
504
                        go debugWatcher(debugCtx, "send", id)
1✔
505
                }
1✔
506
                defer func() {
2✔
507
                        // Signal that we are exiting, this allows us to avoid the case that
1✔
508
                        // a race causes the modifyCh to become blocking.
1✔
509
                        log.V(2).Infof("closing send channel in id: %s", id)
1✔
510
                        c.sendExitCh <- struct{}{}
1✔
511
                        close(c.sendExitCh)
1✔
512
                }()
1✔
513
                for {
2✔
514
                        if c.shut.Load() {
1✔
UNCOV
515
                                log.V(2).Infof("shutting down send goroutine, id: %s, cause: SHUTDOWN", id)
×
UNCOV
516
                                return
×
UNCOV
517
                        }
×
518

519
                        v, ok := <-c.qs.modifyCh
1✔
520
                        if done := reqHandler(v, ok); done {
2✔
521
                                log.V(2).Infof("shutting down send goroutine, id: %s, cause: HANDLER", id)
1✔
522
                                return
1✔
523
                        }
1✔
524
                }
525
        }()
526

527
        return nil
1✔
528
}
529

530
// clientQs defines the set of queues that are used for the Modify RPC.
531
type clientQs struct {
532
        // sendMu protects the send queue.
533
        sendMu sync.RWMutex
534
        // sendq contains any ModifyRequest messages that are queued to be
535
        // sent to the target. The send queue is emptied after the client is
536
        // instructed to start sending messages.
537
        sendq []*spb.ModifyRequest
538

539
        // pendMu protects the pending queue.
540
        pendMu sync.RWMutex
541
        // pendq stores the pending transactions to the target. Once a message
542
        // is taken from the sendq, and sent over the Modify stream, an entry
543
        // in the pendq is created for each AFTOperation within the ModifyRequest.
544
        // The key for the pendq is the ID of the operation to speed up lookups.
545
        // The value stored in the pendq is a bool - such that the library discards
546
        // the contents of a Modify request after it was sent.
547
        //
548
        // TODO(robjs): currently, the implementation we have here just handles the
549
        // positive case that we get an acknowledged transaction - consider whether
550
        // we want the ability to have the client handle timeouts and inject these
551
        // as failed per the controller implementation.
552
        pendq *pendingQueue
553

554
        // modifyCh is the channel that is used to write to the goroutine that is
555
        // the sole source of writes onto the modify stream.
556
        modifyCh chan *spb.ModifyRequest
557

558
        // resultMu protects the results queue.
559
        resultMu sync.RWMutex
560
        // resultq stores the responses that are received from the target. When a
561
        // ModifyResponse is received from the target, the entry is removed from the
562
        // pendq (based on the ID) for any included AFTResult, and the result
563
        // is written to the result queue.
564
        resultq []*OpResult
565

566
        // sending indicates whether the client will empty the sendq. By default,
567
        // messages are queued into the sendq and not sent to the target.
568
        sending *atomic.Bool
569
}
570

571
// pendingQueue provides a queue type that determines the set of pending
572
// operations for the client. Since pending operations are added and removed
573
// based on receiving a ModifyRequest or ModifyResponse in the client, there is
574
// no case in which we need to individually access these elements, so they are
575
// to be protected by a single mutex.
576
type pendingQueue struct {
577
        // ops is the set of AFT operations that are pending in the client.
578
        Ops map[uint64]*PendingOp
579
        // Election is the pending election that has been sent to the client, there can
580
        // only be one pending update - since each election replaces the previous one. A
581
        // single response is expected to clear all the election updates that have been sent
582
        // to the server.
583
        Election *ElectionReqDetails
584

585
        // SessionParams indicates that there is a pending SessionParameters
586
        // request sent to the server. There can only be one such request since
587
        // we only send the request at the start of the connection, and MUST NOT
588
        // send it again.
589
        SessionParams *SessionParamReqDetails
590
}
591

592
// ElectionReqDetails stores the details of an individual election update that was sent
593
// to the server.
594
type ElectionReqDetails struct {
595
        // Timestamp is the time at which the update was sent, specified in nanoseconds
596
        // since the epoch.
597
        Timestamp int64
598
        // ID is the election ID that was sent to the server. This can be used to determine
599
        // whether the response shows that this client actually became the master.
600
        ID *spb.Uint128
601
}
602

603
// isPendingRequest implements the PendingRequest interface for ElectionReqDetails.
604
func (*ElectionReqDetails) isPendingRequest() {}
×
605

606
// SessionParamReqDetails stores the details of an individual session parameters update
607
// that was sent to the server.
608
type SessionParamReqDetails struct {
609
        // Timestamp is the time at which the update was sent, specified in nanoseconds
610
        // since the epoch.
611
        Timestamp int64
612
        // Outgoing is the parameters that were sent to the server.
613
        Outgoing *spb.SessionParameters
614
}
615

616
// isPendingRequest implements the PendingRequest interface for SessionParamReqDetails.
617
func (*SessionParamReqDetails) isPendingRequest() {}
×
618

619
// Len returns the length of the pending queue.
620
func (p *pendingQueue) Len() int {
1✔
621
        if p == nil {
2✔
622
                return 0
1✔
623
        }
1✔
624
        var i int
1✔
625
        if p.SessionParams != nil {
2✔
626
                i++
1✔
627
        }
1✔
628
        if p.Election != nil {
2✔
629
                i++
1✔
630
        }
1✔
631

632
        return i + len(p.Ops)
1✔
633
}
634

635
// PendingRequest is an interface implemented by all types that can be reported back
636
// to callers describing a pending request in the client.
637
type PendingRequest interface {
638
        isPendingRequest()
639
}
640

641
// PendingOp stores details pertaining to a pending request in the client.
642
type PendingOp struct {
643
        // Timestamp is the timestamp that the operation was queued at.
644
        Timestamp int64
645
        // Op is the operation that the pending request pertains to.
646
        Op *spb.AFTOperation
647
}
648

649
// isPendingRequest implements the PendingRequest interface for the PendingOp type.
650
func (*PendingOp) isPendingRequest() {}
×
651

652
// OpResult stores details pertaining to a result that was received from
653
// the server.
654
type OpResult struct {
655
        // Timestamp is the timestamp that the result was received.
656
        Timestamp int64
657
        // Latency is the latency of the request from the server. This is calculated
658
        // based on the time that the request was sent to the client (became pending)
659
        // and the time that the response was received from the server. It is expressed
660
        // in nanoseconds.
661
        Latency int64
662

663
        // CurrentServerElectionID indicates that the message that was received from the server
664
        // was an ModifyResponse sent in response to an updated election ID, its value is the
665
        // current master election ID (maximum election ID seen from any client) reported from
666
        // the server.
667
        CurrentServerElectionID *spb.Uint128
668

669
        // SessionParameters contains the parameters that were received from the server in
670
        // response to the parameters sent to the server.
671
        SessionParameters *spb.SessionParametersResult
672

673
        // OperationID indicates that the message that was received from the server was a
674
        // ModifyResponse sent in response to an AFTOperation, its value is the ID of the
675
        // operation to which it corresponds.
676
        OperationID uint64
677

678
        // ClientError describes an error that is internal to the client.
679
        ClientError string
680

681
        // ServerError describes an error provided from the gRIBI server.
682
        ServerError string
683

684
        // ProgrammingResult stores the result of an AFT operation on the server.
685
        ProgrammingResult spb.AFTResult_Status
686

687
        // Details stores detailed information about the operation over the ID
688
        // and the result.
689
        Details *OpDetailsResults
690
}
691

692
// String returns a string for an OpResult for debugging purposes.
693
func (o *OpResult) String() string {
1✔
694
        if o == nil {
2✔
695
                return "<nil>"
1✔
696
        }
1✔
697

698
        buf := &bytes.Buffer{}
1✔
699
        buf.WriteString("<")
1✔
700
        buf.WriteString(fmt.Sprintf("%d (%d nsec):", o.Timestamp, o.Latency))
1✔
701

1✔
702
        if v := o.CurrentServerElectionID; v != nil {
1✔
703
                e := uint128.New(v.Low, v.High)
×
704
                buf.WriteString(fmt.Sprintf(" ElectionID: %s", e))
×
705
        }
×
706

707
        if opID, pr := o.OperationID, o.ProgrammingResult; opID != 0 || pr != spb.AFTResult_UNSET {
2✔
708
                buf.WriteString(fmt.Sprintf(" AFTOperation { ID: %d, Details: %s, Status: %s }", opID, o.Details, o.ProgrammingResult))
1✔
709
        }
1✔
710

711
        if v := o.SessionParameters; v != nil {
1✔
712
                buf.WriteString(fmt.Sprintf(" SessionParameterResult: OK (%s)", v.String()))
×
713
        }
×
714

715
        if v := o.ClientError; v != "" {
1✔
716
                buf.WriteString(fmt.Sprintf(" With Client Error: %s", v))
×
717
        }
×
718

719
        if v := o.ServerError; v != "" {
1✔
720
                buf.WriteString(fmt.Sprintf(" With Server Error: %s", v))
×
721
        }
×
722
        buf.WriteString(">")
1✔
723

1✔
724
        return buf.String()
1✔
725
}
726

727
// OpDetailsResults provides details of an operation for use in the results.
728
type OpDetailsResults struct {
729
        // Type is the type of the operation (i.e., ADD, MODIFY, DELETE)
730
        Type constants.OpType
731

732
        // NextHopIndex is the identifier for a next-hop modified by the operation.
733
        NextHopIndex uint64
734
        // NextHopGroupID is the identifier for a next-hop-group modified by the
735
        // operation.
736
        NextHopGroupID uint64
737
        // IPv4Prefix is the IPv4 prefix modified by the operation.
738
        IPv4Prefix string
739
        // IPv6Prefix is the IPv6 prefix modified by the operation.
740
        IPv6Prefix string
741
        // MPLSLabel is the MPLS label that was modified by the operation.
742
        MPLSLabel uint64
743
}
744

745
// String returns a human-readable form of the OpDetailsResults
746
func (o *OpDetailsResults) String() string {
1✔
747
        if o == nil {
2✔
748
                return "<nil>"
1✔
749
        }
1✔
750
        buf := &bytes.Buffer{}
×
751
        buf.WriteString(fmt.Sprintf("<Type: %s ", o.Type))
×
752
        switch {
×
753
        case o.NextHopIndex != 0:
×
754
                buf.WriteString(fmt.Sprintf("NH Index: %d", o.NextHopIndex))
×
755
        case o.NextHopGroupID != 0:
×
756
                buf.WriteString(fmt.Sprintf("NHG ID: %d", o.NextHopGroupID))
×
757
        case o.IPv4Prefix != "":
×
758
                buf.WriteString(fmt.Sprintf("IPv4: %s", o.IPv4Prefix))
×
759
        case o.IPv6Prefix != "":
×
760
                buf.WriteString(fmt.Sprintf("IPv6: %s", o.IPv6Prefix))
×
761
        case o.MPLSLabel != 0:
×
762
                buf.WriteString(fmt.Sprintf("MPLS: %d", o.MPLSLabel))
×
763
        }
764
        buf.WriteString(">")
×
765

×
766
        return buf.String()
×
767
}
768

769
// Q enqueues a ModifyRequest to be sent to the target.
770
func (c *Client) Q(m *spb.ModifyRequest) {
1✔
771
        if err := c.handleModifyRequest(m); err != nil {
1✔
772
                log.Errorf("got error processing message that was to be sent, %v", err)
×
773
                c.addSendErr(err)
×
774
        }
×
775

776
        if !c.qs.sending.Load() {
2✔
777
                c.qs.sendMu.Lock()
1✔
778
                defer c.qs.sendMu.Unlock()
1✔
779
                log.V(2).Infof("appended %s to sending queue", m)
1✔
780
                c.qs.sendq = append(c.qs.sendq, m)
1✔
781
                return
1✔
782
        }
1✔
783
        log.V(2).Infof("sending %s directly to queue", m)
1✔
784

1✔
785
        c.q(m)
1✔
786
}
787

788
// chIsClosed returns true if the channel supplied has been written to,
789
// or is closed - otherwise it returns false indicating it is still open. This
790
// check can be used to determine whether a goroutine that writes to a channel
791
// on exit is still running.
792
func chIsClosed(ch <-chan struct{}) bool {
1✔
793
        select {
1✔
794
        case v, ok := <-ch:
1✔
795
                if v == struct{}{} || !ok {
2✔
796
                        return true
1✔
797
                }
1✔
798
        default:
1✔
799
                return false
1✔
800
        }
801
        return false
×
802
}
803

804
// q is the internal implementation of queue that writes the ModifyRequest to
805
// the channel to be sent.
806
func (c *Client) q(m *spb.ModifyRequest) {
1✔
807
        c.awaiting.RLock()
1✔
808
        defer c.awaiting.RUnlock()
1✔
809

1✔
810
        if !chIsClosed(c.sendExitCh) {
2✔
811
                c.qs.modifyCh <- m
1✔
812
        }
1✔
813
}
814

815
// StartSending toggles the client to begin sending messages that are in the send
816
// queue (enqued by Q) to the connection established by Connect.
817
func (c *Client) StartSending() {
1✔
818
        c.qs.sending.Store(true)
1✔
819

1✔
820
        if c.state.SessParams != nil {
2✔
821
                c.Q(&spb.ModifyRequest{
1✔
822
                        Params: c.state.SessParams,
1✔
823
                })
1✔
824
        }
1✔
825

826
        if c.state.ElectionID != nil {
2✔
827
                c.Q(&spb.ModifyRequest{
1✔
828
                        ElectionId: c.state.ElectionID,
1✔
829
                })
1✔
830
        }
1✔
831

832
        // take the initial set of messages that were enqueued and queue them.
833
        c.qs.sendMu.Lock()
1✔
834
        defer c.qs.sendMu.Unlock()
1✔
835
        for _, m := range c.qs.sendq {
2✔
836
                log.V(2).Infof("sending %s to modify channel", m)
1✔
837
                c.q(m)
1✔
838
        }
1✔
839
        c.qs.sendq = []*spb.ModifyRequest{}
1✔
840
}
841

842
// StopSending toggles the client to stop sending messages to the server, meaning
843
// that entries that are enqueued will be stored until StartSending is called.
844
func (c *Client) StopSending() {
1✔
845
        c.qs.sending.Store(false)
1✔
846
}
1✔
847

848
// handleModifyRequest performs any required post-processing after having sent a
849
// ModifyRequest onto the gRPC channel to the server. Particularly, this ensures
850
// that pending transactions are enqueued into the pending queue where they have
851
// a specified ID.
852
func (c *Client) handleModifyRequest(m *spb.ModifyRequest) error {
1✔
853
        // Add any pending operations to the pending queue.
1✔
854
        for _, o := range m.Operation {
2✔
855
                if err := c.addPendingOp(o); err != nil {
2✔
856
                        return err
1✔
857
                }
1✔
858
        }
859

860
        if m.ElectionId != nil {
2✔
861
                c.updatePendingElection(m.ElectionId)
1✔
862
        }
1✔
863

864
        if m.Params != nil {
2✔
865
                c.pendingSessionParams(m.Params)
1✔
866
        }
1✔
867

868
        return nil
1✔
869
}
870

871
// handleModifyResponse performs any required post-processing after having received
872
// a ModifyResponse from the gRPC channel from the server. Particularly, this
873
// ensures that pending transactions are dequeued into the results queue. An error
874
// is returned if the post processing is not possible.
875
func (c *Client) handleModifyResponse(m *spb.ModifyResponse) error {
1✔
876
        if m == nil {
2✔
877
                return errors.New("invalid nil modify response returned")
1✔
878
        }
1✔
879

880
        resPop := m.Result != nil
1✔
881
        elecPop := m.ElectionId != nil
1✔
882
        sessPop := m.SessionParamsResult != nil
1✔
883
        pop := 0
1✔
884
        for _, v := range []bool{resPop, elecPop, sessPop} {
2✔
885
                if v {
2✔
886
                        pop++
1✔
887
                }
1✔
888
        }
889
        if pop > 1 {
2✔
890
                return fmt.Errorf("invalid returned message, ElectionID, Result, and SessionParametersResult are mutually exclusive, got: %s", m)
1✔
891
        }
1✔
892

893
        // At this point we know we will have something to add to the results, so ensure that we
894
        // hold the results lock. This also stops the client from converging.
895
        c.qs.resultMu.Lock()
1✔
896
        defer c.qs.resultMu.Unlock()
1✔
897

1✔
898
        if m.ElectionId != nil {
2✔
899
                er := c.clearPendingElection()
1✔
900
                er.CurrentServerElectionID = m.ElectionId
1✔
901
                // This is an update from the server in response to an updated master election ID.
1✔
902
                c.qs.resultq = append(c.qs.resultq, er)
1✔
903
        }
1✔
904

905
        if m.SessionParamsResult != nil {
2✔
906
                sr := c.clearPendingSessionParams()
1✔
907
                sr.SessionParameters = m.SessionParamsResult
1✔
908
                c.qs.resultq = append(c.qs.resultq, sr)
1✔
909
        }
1✔
910

911
        for _, r := range m.Result {
2✔
912
                res, err := c.clearPendingOp(r)
1✔
913
                c.qs.resultq = append(c.qs.resultq, res)
1✔
914
                if err != nil {
1✔
915
                        return fmt.Errorf("cannot remove pending operation %d, %v", r.Id, err)
×
916
                }
×
917
        }
918

919
        return nil
1✔
920
}
921

922
// isConverged indicates whether the client is converged.
923
func (c *Client) isConverged() bool {
1✔
924
        c.qs.sendMu.RLock()
1✔
925
        defer c.qs.sendMu.RUnlock()
1✔
926
        c.qs.pendMu.RLock()
1✔
927
        defer c.qs.pendMu.RUnlock()
1✔
928

1✔
929
        return len(c.qs.sendq) == 0 && c.qs.pendq.Len() == 0
1✔
930
}
1✔
931

932
// addPendingOp adds the operation specified by op to the pending transaction queue
933
// on the client. This queue stores the operations that have been sent to the server
934
// but have not yet been reported on. It returns an error if the pending transaction
935
// cannot be added.
936
//
937
// TODO(robjs): if there is a pending operation for the same key (IPv4 prefix, NHG, NH)
938
// then we need to ensure that we correctly coalesce these values. This means that we need
939
// to keep some map of all the requests that corresponded to the same key.
940
func (c *Client) addPendingOp(op *spb.AFTOperation) error {
1✔
941
        c.qs.pendMu.Lock()
1✔
942
        defer c.qs.pendMu.Unlock()
1✔
943
        if v := c.qs.pendq.Ops[op.Id]; v != nil {
2✔
944
                return fmt.Errorf("could not enqueue operation %d, duplicate pending ID (pending: %v)", op.Id, v)
1✔
945
        }
1✔
946
        c.qs.pendq.Ops[op.Id] = &PendingOp{
1✔
947
                Timestamp: unixTS(),
1✔
948
                Op:        op,
1✔
949
        }
1✔
950
        return nil
1✔
951
}
952

953
// clearPendingOp removes the operation with the ID in the specified result from the
954
// pending queue and returns the result.
955
func (c *Client) clearPendingOp(op *spb.AFTResult) (*OpResult, error) {
1✔
956
        c.qs.pendMu.Lock()
1✔
957
        defer c.qs.pendMu.Unlock()
1✔
958

1✔
959
        if TreatRIBACKAsCompletedInFIBACKMode && c.state.SessParams.GetAckType() != spb.SessionParameters_RIB_AND_FIB_ACK {
1✔
960
                return nil, fmt.Errorf("logic error, TreatRIBACKAsCompletedInFIBACKMode set to true in %s mode", c.state.SessParams.GetAckType())
×
961
        }
×
962

963
        v, ok := c.qs.pendq.Ops[op.Id]
1✔
964
        if !ok {
2✔
965
                switch {
1✔
966
                case op.GetStatus() == spb.AFTResult_FIB_PROGRAMMED && TreatRIBACKAsCompletedInFIBACKMode:
1✔
967
                        // Expected condition, we hav already dequeued this operation because we are treating RIB_ACK as completed
1✔
968
                        // even though we are in FIB programmed mode.
1✔
969
                        return nil, nil
1✔
970
                case op.GetStatus() == spb.AFTResult_RIB_PROGRAMMED && c.state.SessParams.GetAckType() == spb.SessionParameters_RIB_AND_FIB_ACK:
1✔
971
                        // This condition occurs when the server sends up a FIB_ACK before a RIB_ACK and hence we have dequeued
1✔
972
                        // the operation. In this case, we don't return an error and simply log that this happened. This is based
1✔
973
                        // on being permissive, but is unexpected since gRPC should maintain the order, and there's no reason that
1✔
974
                        // we would expect that something can be programmed in the FIB before it has hit the RIB.
1✔
975
                        log.Warningf("operation %d, unexpectedly saw RIB_ACK after FIB_ACK", op.Id)
1✔
976

1✔
977
                        return &OpResult{
1✔
978
                                Timestamp:         unixTS(),
1✔
979
                                OperationID:       op.GetId(),
1✔
980
                                ProgrammingResult: op.GetStatus(),
1✔
981
                        }, nil
1✔
982
                }
983
                return nil, fmt.Errorf("could not dequeue operation %d, unknown operation", op.Id)
×
984
        }
985

986
        // We know that the operation exists - determine how to dequeue it.
987
        switch op.GetStatus() {
1✔
988
        case spb.AFTResult_FIB_FAILED, spb.AFTResult_FIB_PROGRAMMED:
1✔
989
                if TreatRIBACKAsCompletedInFIBACKMode {
1✔
990
                        log.Infof("RIB ACK not received for operation %d - dequeue is based on FIB ACK", op.Id)
×
991
                }
×
992
                delete(c.qs.pendq.Ops, op.Id)
1✔
993
        case spb.AFTResult_RIB_PROGRAMMED:
1✔
994
                switch {
1✔
995
                case TreatRIBACKAsCompletedInFIBACKMode:
1✔
996
                        // Dequeue, since we're specifically being asked to use RIB ACKs as the completion of the
1✔
997
                        // transaction.
1✔
998
                        delete(c.qs.pendq.Ops, op.Id)
1✔
999
                case c.state.SessParams.GetAckType() != spb.SessionParameters_RIB_AND_FIB_ACK:
1✔
1000
                        // RIB ACK dequeues when we are not expecting FIB ACK.
1✔
1001
                        delete(c.qs.pendq.Ops, op.Id)
1✔
1002
                default:
1✔
1003
                        // We're in FIB_ACK mode and this is a RIB_ACK, so the transaction is not complete.
1004
                }
1005
        case spb.AFTResult_FAILED:
1✔
1006
                delete(c.qs.pendq.Ops, op.Id)
1✔
1007
        }
1008

1009
        if v == nil {
1✔
1010
                return nil, fmt.Errorf("could not dequeue operation %d, unknown operation", op.Id)
×
1011
        }
×
1012

1013
        det := &OpDetailsResults{
1✔
1014
                Type: constants.OpFromAFTOp(v.Op.Op),
1✔
1015
        }
1✔
1016
        switch opEntry := v.Op.Entry.(type) {
1✔
1017
        case *spb.AFTOperation_Ipv4:
×
1018
                det.IPv4Prefix = opEntry.Ipv4.GetPrefix()
×
1019
        case *spb.AFTOperation_Ipv6:
×
1020
                det.IPv6Prefix = opEntry.Ipv6.GetPrefix()
×
1021
        case *spb.AFTOperation_Mpls:
×
1022
                det.MPLSLabel = opEntry.Mpls.GetLabelUint64()
×
1023
        case *spb.AFTOperation_NextHopGroup:
×
1024
                det.NextHopGroupID = opEntry.NextHopGroup.GetId()
×
1025
        case *spb.AFTOperation_NextHop:
1✔
1026
                det.NextHopIndex = opEntry.NextHop.GetIndex()
1✔
1027
        }
1028

1029
        n := unixTS()
1✔
1030
        return &OpResult{
1✔
1031
                Timestamp:         n,
1✔
1032
                Latency:           n - v.Timestamp,
1✔
1033
                OperationID:       op.GetId(),
1✔
1034
                ProgrammingResult: op.GetStatus(),
1✔
1035
                ServerError:       op.GetErrorDetails().GetErrorMessage(),
1✔
1036
                Details:           det,
1✔
1037
        }, nil
1✔
1038
}
1039

1040
// updatePendingElection adds the election ID specified by id to the pending transaction
1041
// queue on the client. There can only be a single pending election, and hence the
1042
// election ID is updated in place.
1043
func (c *Client) updatePendingElection(id *spb.Uint128) {
1✔
1044
        c.qs.pendMu.Lock()
1✔
1045
        defer c.qs.pendMu.Unlock()
1✔
1046
        c.qs.pendq.Election = &ElectionReqDetails{
1✔
1047
                Timestamp: unixTS(),
1✔
1048
                ID:        id,
1✔
1049
        }
1✔
1050
}
1✔
1051

1052
// clearPendingElection clears the pending election ID and returns a result determining
1053
// the latency and timestamp. If there is no pending election, a result describing an error
1054
// is returned.
1055
func (c *Client) clearPendingElection() *OpResult {
1✔
1056
        c.qs.pendMu.Lock()
1✔
1057
        defer c.qs.pendMu.Unlock()
1✔
1058
        e := c.qs.pendq.Election
1✔
1059
        if e == nil {
2✔
1060
                return &OpResult{
1✔
1061
                        Timestamp:   unixTS(),
1✔
1062
                        ClientError: "received a election update when there was none pending",
1✔
1063
                }
1✔
1064
        }
1✔
1065

1066
        n := unixTS()
1✔
1067
        c.qs.pendq.Election = nil
1✔
1068
        return &OpResult{
1✔
1069
                Timestamp: n,
1✔
1070
                Latency:   n - e.Timestamp,
1✔
1071
        }
1✔
1072
}
1073

1074
// pendingSessionParams updates the client's outgoing queue to indicate that there
1075
// is a pending session parameters request.
1076
func (c *Client) pendingSessionParams(out *spb.SessionParameters) {
1✔
1077
        c.qs.pendMu.Lock()
1✔
1078
        defer c.qs.pendMu.Unlock()
1✔
1079
        c.qs.pendq.SessionParams = &SessionParamReqDetails{
1✔
1080
                Timestamp: unixTS(),
1✔
1081
                Outgoing:  out,
1✔
1082
        }
1✔
1083
}
1✔
1084

1085
// clearPendingSessionParams clears the pending session parameters request in the client
1086
// and returns an OpResult describing the timing of the response.
1087
func (c *Client) clearPendingSessionParams() *OpResult {
1✔
1088
        c.qs.pendMu.Lock()
1✔
1089
        defer c.qs.pendMu.Unlock()
1✔
1090
        e := c.qs.pendq.SessionParams
1✔
1091

1✔
1092
        if e == nil {
2✔
1093
                return &OpResult{
1✔
1094
                        Timestamp:   unixTS(),
1✔
1095
                        ClientError: "received a session parameter result when there was none pending",
1✔
1096
                }
1✔
1097
        }
1✔
1098

1099
        c.qs.pendq.SessionParams = nil
1✔
1100
        n := unixTS()
1✔
1101
        return &OpResult{
1✔
1102
                Timestamp: n,
1✔
1103
                Latency:   n - e.Timestamp,
1✔
1104
        }
1✔
1105
}
1106

1107
// addReadErr adds an error experienced when reading from the server to the readErr
1108
// slice on the client.
1109
func (c *Client) addReadErr(err error) {
1✔
1110
        c.readErrMu.Lock()
1✔
1111
        defer c.readErrMu.Unlock()
1✔
1112
        c.readErr = append(c.readErr, err)
1✔
1113
}
1✔
1114

1115
// addSendErr adds an error experienced when writing to the server to the sendErr
1116
// slice on the client.
1117
func (c *Client) addSendErr(err error) {
1✔
1118
        c.sendErrMu.Lock()
1✔
1119
        defer c.sendErrMu.Unlock()
1✔
1120
        c.sendErr = append(c.sendErr, err)
1✔
1121
}
1✔
1122

1123
// Pending returns the set of AFTOperations that are pending response from the
1124
// target, it returns a slice of PendingRequest interfaces which describes the contents
1125
// of the pending queues.
1126
func (c *Client) Pending() ([]PendingRequest, error) {
1✔
1127
        if c.qs == nil {
2✔
1128
                return nil, errors.New("invalid (nil) queues in client")
1✔
1129
        }
1✔
1130
        c.qs.pendMu.RLock()
1✔
1131
        defer c.qs.pendMu.RUnlock()
1✔
1132
        ret := []PendingRequest{}
1✔
1133

1✔
1134
        ids := []uint64{}
1✔
1135
        for k, o := range c.qs.pendq.Ops {
2✔
1136
                if o == nil || o.Op == nil {
2✔
1137
                        return nil, errors.New("invalid (nil) operation in pending queue")
1✔
1138
                }
1✔
1139
                ids = append(ids, k)
1✔
1140
        }
1141

1142
        sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
2✔
1143
        for _, v := range ids {
2✔
1144
                ret = append(ret, c.qs.pendq.Ops[v])
1✔
1145
        }
1✔
1146
        if c.qs.pendq.Election != nil {
2✔
1147
                ret = append(ret, c.qs.pendq.Election)
1✔
1148
        }
1✔
1149
        if c.qs.pendq.SessionParams != nil {
2✔
1150
                ret = append(ret, c.qs.pendq.SessionParams)
1✔
1151
        }
1✔
1152
        return ret, nil
1✔
1153
}
1154

1155
// Results returns the set of ModifyResponses that have been received from the
1156
// target.
1157
func (c *Client) Results() ([]*OpResult, error) {
1✔
1158
        if c.qs == nil {
2✔
1159
                return nil, errors.New("invalid (nil) queues in client")
1✔
1160
        }
1✔
1161
        c.qs.resultMu.RLock()
1✔
1162
        defer c.qs.resultMu.RUnlock()
1✔
1163
        return append(make([]*OpResult, 0, len(c.qs.resultq)), c.qs.resultq...), nil
1✔
1164
}
1165

1166
// AckResult allows a caller to acknowledge a specific result in the client's
1167
// queue removing it from the queue stored in the client. If results are not
1168
// acknowledged, the client will store all results indefinitely.
1169
func (c *Client) AckResult(res ...*OpResult) error {
1✔
1170
        if c.qs == nil {
1✔
1171
                return errors.New("invalid (nil) queue in client")
×
1172
        }
×
1173

1174
        toACK := map[uint64]bool{}
1✔
1175
        for _, r := range res {
2✔
1176
                toACK[r.OperationID] = false
1✔
1177
        }
1✔
1178

1179
        c.qs.resultMu.RLock()
1✔
1180
        defer c.qs.resultMu.RUnlock()
1✔
1181
        nrq := []*OpResult{}
1✔
1182
        for _, r := range c.qs.resultq {
2✔
1183
                _, ok := toACK[r.OperationID]
1✔
1184
                if !ok {
2✔
1185
                        nrq = append(nrq, r)
1✔
1186
                }
1✔
1187
                if ok {
2✔
1188
                        toACK[r.OperationID] = true
1✔
1189
                }
1✔
1190
        }
1191
        c.qs.resultq = nrq
1✔
1192

1✔
1193
        var errs errlist.List
1✔
1194
        for k, v := range toACK {
2✔
1195
                if !v {
2✔
1196
                        errs.Add(fmt.Errorf("cannot find operation ID %d to acknowledge", k))
1✔
1197
                }
1✔
1198
        }
1199

1200
        return errs.Err()
1✔
1201
}
1202

1203
// ClientStatus is the overview status of the client, timestamped according to the
1204
// time at which the state was retrieved.
1205
type ClientStatus struct {
1206
        // Timestamp expressed in nanoseconds since the epoch that the status was retrieved.
1207
        Timestamp int64
1208
        // PendingTransactions is the slice of pending operations on the client.
1209
        PendingTransactions []PendingRequest
1210
        Results             []*OpResult
1211
        SendErrs            []error
1212
        ReadErrs            []error
1213
}
1214

1215
// Status returns a composite status of the client at the time that the caller specified.
1216
func (c *Client) Status() (*ClientStatus, error) {
1✔
1217
        pt, err := c.Pending()
1✔
1218
        if err != nil {
2✔
1219
                return nil, fmt.Errorf("invalid pending queue, %v", err)
1✔
1220
        }
1✔
1221
        r, err := c.Results()
1✔
1222
        if err != nil {
1✔
1223
                return nil, fmt.Errorf("invalid results queue, %v", err)
×
1224
        }
×
1225
        cs := &ClientStatus{
1✔
1226
                // we allow unixTS to be overloaded to allow us to unit test this function
1✔
1227
                // by overloading it.
1✔
1228
                Timestamp:           unixTS(),
1✔
1229
                PendingTransactions: pt,
1✔
1230
                Results:             r,
1✔
1231
        }
1✔
1232

1✔
1233
        c.sendErrMu.RLock()
1✔
1234
        defer c.sendErrMu.RUnlock()
1✔
1235
        cs.SendErrs = append(make([]error, 0, len(c.sendErr)), c.sendErr...)
1✔
1236

1✔
1237
        c.readErrMu.RLock()
1✔
1238
        defer c.readErrMu.RUnlock()
1✔
1239
        cs.ReadErrs = append(make([]error, 0, len(c.readErr)), c.readErr...)
1✔
1240

1✔
1241
        return cs, nil
1✔
1242
}
1243

1244
// hasErrors reports whether there are errors that have been encountered
1245
// in the client. It returns two slices of errors containing the send
1246
// and recv errors
1247
func (c *Client) hasErrors() ([]error, []error) {
1✔
1248
        c.readErrMu.RLock()
1✔
1249
        defer c.readErrMu.RUnlock()
1✔
1250
        c.sendErrMu.RLock()
1✔
1251
        defer c.sendErrMu.RUnlock()
1✔
1252
        if len(c.readErr) != 0 || len(c.sendErr) != 0 {
2✔
1253
                return c.sendErr, c.readErr
1✔
1254
        }
1✔
1255
        return nil, nil
1✔
1256
}
1257

1258
// ClientError encapsulates send and receive errors for the client.
1259
type ClientErr struct {
1260
        Send, Recv []error
1261
}
1262

1263
// Error implements the error interface.
1264
func (c *ClientErr) Error() string { return fmt.Sprintf("errors: send: %v, recv: %v", c.Send, c.Recv) }
×
1265

1266
// AwaitConverged waits until the client is converged and writes to the supplied
1267
// channel. The function blocks until such time as the client returns or when the
1268
// context is done.
1269
func (c *Client) AwaitConverged(ctx context.Context) error {
1✔
1270
        for {
2✔
1271
                select {
1✔
1272
                case <-ctx.Done():
×
1273
                        return ctx.Err()
×
1274
                default:
1✔
1275
                }
1276

1277
                // we need to check here that no-one is doing an operation that we might care about
1278
                // impacting convergence. this is:
1279
                //  - sending a message
1280
                //        - post-processing a sent message
1281
                //        - reading a message
1282
                //        - post-procesing a read message
1283
                // we do this by holding the awaiting mutex.
1284
                done, err := func() (bool, error) {
2✔
1285
                        c.awaiting.Lock()
1✔
1286
                        defer c.awaiting.Unlock()
1✔
1287
                        if sendE, recvE := c.hasErrors(); len(sendE) != 0 || len(recvE) != 0 {
1✔
1288
                                return true, &ClientErr{Send: sendE, Recv: recvE}
×
1289
                        }
×
1290
                        if c.isConverged() {
2✔
1291
                                return true, nil
1✔
1292
                        }
1✔
1293
                        return false, nil
1✔
1294
                }()
1295
                if err != nil {
1✔
1296
                        return err
×
1297
                }
×
1298
                if done {
2✔
1299
                        return nil
1✔
1300
                }
1✔
1301
                time.Sleep(BusyLoopDelay) // avoid busy looping.
1✔
1302
        }
1303
}
1304

1305
// Get implements the Get RPC to the gRIBI server. It takes an input context and a
1306
// GetRequest and returns a single GetResponse with all contained results within
1307
// it.
1308
func (c *Client) Get(ctx context.Context, sreq *spb.GetRequest) (*spb.GetResponse, error) {
1✔
1309
        if sreq == nil {
1✔
1310
                return nil, errors.New("get request cannot be nil")
×
1311
        }
×
1312

1313
        ni := sreq.GetNetworkInstance()
1✔
1314
        if ni == nil {
2✔
1315
                return nil, errors.New("network instance cannot be nil")
1✔
1316
        }
1✔
1317

1318
        switch ni.(type) {
1✔
1319
        case *spb.GetRequest_All:
1✔
1320
                if sreq.GetAll() == nil {
1✔
1321
                        return nil, errors.New("network instance All cannot be nil")
×
1322
                }
×
1323
        case *spb.GetRequest_Name:
1✔
1324
                if sreq.GetName() == "" {
1✔
1325
                        return nil, errors.New("network instance name is required")
×
1326
                }
×
1327
        }
1328

1329
        if sreq.GetAft() == spb.AFTType_INVALID {
2✔
1330
                return nil, errors.New("AFT is required")
1✔
1331
        }
1✔
1332

1333
        result := &spb.GetResponse{}
1✔
1334

1✔
1335
        stream, err := c.c.Get(ctx, sreq)
1✔
1336
        if err != nil {
1✔
1337
                return nil, fmt.Errorf("cannot send Get RPC, %v", err)
×
1338
        }
×
1339

1340
        for {
2✔
1341
                getres, err := stream.Recv()
1✔
1342
                if err == io.EOF {
2✔
1343
                        break
1✔
1344
                }
1345
                if err != nil {
1✔
1346
                        return nil, fmt.Errorf("error in Get RPC, %v", err)
×
1347
                }
×
1348
                result.Entry = append(result.Entry, getres.Entry...)
1✔
1349

1350
        }
1351
        return result, nil
1✔
1352
}
1353

1354
// Flush implements the gRIBI Flush RPC.
1355
func (c *Client) Flush(ctx context.Context, req *spb.FlushRequest) (*spb.FlushResponse, error) {
1✔
1356
        if req == nil {
2✔
1357
                return nil, errors.New("flush request cannot be nil")
1✔
1358
        }
1✔
1359

1360
        switch t := req.GetNetworkInstance().(type) {
1✔
1361
        case *spb.FlushRequest_All:
1✔
1362
        case *spb.FlushRequest_Name:
1✔
1363
                if t.Name == "" {
2✔
1364
                        return nil, errors.New("cannot specify an empty network instance name")
1✔
1365
                }
1✔
1366
        }
1367

1368
        if req.GetElection() == nil && c.state.ElectionID != nil {
2✔
1369
                return nil, fmt.Errorf("must specify an election behaviour for a SINGLE_PRIMARY client")
1✔
1370
        }
1✔
1371

1372
        switch req.GetElection().(type) {
1✔
1373
        case *spb.FlushRequest_Id:
1✔
1374
                if c.state.SessParams == nil || c.state.SessParams.Redundancy != spb.SessionParameters_SINGLE_PRIMARY {
2✔
1375
                        return nil, fmt.Errorf("invalid to specify an election ID when the client is not in SINGLE_PRIMARY mode")
1✔
1376
                }
1✔
1377
        case *spb.FlushRequest_Override:
1✔
1378
                if c.state.SessParams == nil || c.state.SessParams.Redundancy != spb.SessionParameters_SINGLE_PRIMARY {
2✔
1379
                        return nil, fmt.Errorf("cannot override election ID when the client is in ALL_PRIMARY mode")
1✔
1380
                }
1✔
1381
        }
1382

1383
        res, err := c.c.Flush(ctx, req)
1✔
1384
        if err != nil {
2✔
1385
                // return err directly here so that the client can receive the type of error.
1✔
1386
                return nil, err
1✔
1387
        }
1✔
1388
        return res, nil
1✔
1389
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc