• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

openconfig / gribigo / 8311396303

14 Mar 2024 12:29AM UTC coverage: 72.858% (-0.1%) from 73.002%
8311396303

push

github

web-flow
Bump google.golang.org/protobuf from 1.31.0 to 1.33.0 (#230)

Bumps google.golang.org/protobuf from 1.31.0 to 1.33.0.

---
updated-dependencies:
- dependency-name: google.golang.org/protobuf
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

6080 of 8345 relevant lines covered (72.86%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.29
/client/gribiclient.go
1
// Copyright 2021 Google LLC
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//      http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
// Package client defines a gRIBI stub client implementation in Go.
16
package client
17

18
import (
19
        "bytes"
20
        "context"
21
        "crypto/tls"
22
        "errors"
23
        "fmt"
24
        "io"
25
        "sort"
26
        "sync"
27
        "time"
28

29
        log "github.com/golang/glog"
30
        "github.com/google/uuid"
31
        "github.com/openconfig/gnmi/errlist"
32
        "github.com/openconfig/gribigo/constants"
33
        "go.uber.org/atomic"
34
        "google.golang.org/grpc"
35
        "google.golang.org/grpc/credentials"
36
        "lukechampine.com/uint128"
37

38
        spb "github.com/openconfig/gribi/v1/proto/service"
39
)
40

41
var (
42
        // debug enables detailed debug reporting throughout the client.
43
        debug = false
44
        // modifyBuffer specifies the depth of the modify channel's buffer.
45
        modifyBuffer = 5
46
)
47

48
var (
49
        // unixTS is a function that returns a timestamp in nanoseconds for the current time.
50
        // It can be overloaded in unit tests to ensure that deterministic output is received.
51
        unixTS = time.Now().UnixNano
52

53
        // BusyLoopDelay specifies a delay that should be used when looping around pending
54
        // transactions. By default, the client tries not to busy loop, but this can introduce
55
        // artificial delays when benchmarking server operations. It is NOT RECOMMENDED to
56
        // change the default value other than when measuring convergence time at the client.
57
        BusyLoopDelay = 100 * time.Millisecond
58

59
        // TreatRIBACKAsCompletedInFIBACKMode allows the caller to modify the client behaviour
60
        // to treat a RIB ACK from the target as the end of an operation even though the session
61
        // has requested FIB_ACK. This allows benchmarking of the time to receive a RIB_ACK from
62
        // a server in FIB_ACK mode. In production clients that require FIB_ACK it is ACTIVELY
63
        // HARMFUL to change this value, since the client will consider a server to have converged
64
        // on receipt of a RIB_ACK despite the fact that the session requested FIB_ACK.
65
        TreatRIBACKAsCompletedInFIBACKMode = false
66
)
67

68
// Client is a wrapper for the gRIBI client.
69
type Client struct {
70
        // state stores the configured state of the client, used to store
71
        // initial immutable parameters, as well as mutable parameters that
72
        // may change during the clients's lifetime.
73
        state *clientState
74

75
        // c is the current gRIBI client.
76
        c spb.GRIBIClient
77
        // conn is the current gRPC connection that the client uses.
78
        conn *grpc.ClientConn
79

80
        // qs is the set of queues that are associated with the current
81
        // client.
82
        qs *clientQs
83

84
        // shut indicates that RPCs should continue to run, when set
85
        // to true, all goroutines that are serving RPCs shut down.
86
        shut *atomic.Bool
87

88
        // sendErrMu protects the sendErr slice.
89
        sendErrMu sync.RWMutex
90
        // sErr stores sending errors that cause termination of the sending
91
        // GoRoutine.
92
        sendErr []error
93

94
        // readErrMu protects the readErr slice.
95
        readErrMu sync.RWMutex
96
        // rErr stores receiving errors that cause termination of the receiving
97
        // GoRoutine.
98
        readErr []error
99

100
        // awaiting is a read-write mutex that is used to ensure that we know
101
        // when any operation that might affect the convergence state of the
102
        // client is in-flight. Functions that are performing operations that may
103
        // result in a change take a read-lock, functions that are dependent upon
104
        // the client being in a consistent state take a write-lock on this mutex.
105
        awaiting sync.RWMutex
106

107
        // wg tells disconnect() when all the goroutines started by Connect() have exited.
108
        wg sync.WaitGroup
109

110
        // doneCh is a channel that is written to when the client disconnects, it can be
111
        // used by a caller to ensure that the client reconnects.
112
        doneCh chan struct{}
113

114
        // sendExitCh is a channel that is used to indicate that the sender for the
115
        // client is exited, such that other goroutines can clean up.
116
        sendExitCh chan struct{}
117
}
118

119
// clientState is used to store the configured (immutable) state of the client.
120
type clientState struct {
121
        // sessParams stores the session parameters that are associated with
122
        // the client, they will be resent as the first message if the client
123
        // reconnects to the server.
124
        SessParams *spb.SessionParameters
125
        // electionID stores the current election ID for this client in the
126
        // case that the client participates in a group of SINGLE_PRIMARY
127
        // clients.
128
        ElectionID *spb.Uint128
129
}
130

131
// Opt is an interface that is implemented for all options that
132
// can be supplied when creating a new client. This captures parameters
133
// that are sent at the start of a gRIBI session that
134
type Opt interface {
135
        isClientOpt()
136
}
137

138
// New creates a new gRIBI client with the specified set of options. The options
139
// provided control parameters that live for the lifetime of the client, such as those
140
// that are within the session parameters. A new client, or error, is returned.
141
func New(opts ...Opt) (*Client, error) {
1✔
142
        c := &Client{
1✔
143
                shut:   atomic.NewBool(false),
1✔
144
                doneCh: make(chan struct{}, 1),
1✔
145
        }
1✔
146

1✔
147
        s, err := handleParams(opts...)
1✔
148
        if err != nil {
1✔
149
                return nil, fmt.Errorf("cannot create client, error with parameters, %v", err)
×
150
        }
×
151
        c.state = s
1✔
152

1✔
153
        c.qs = &clientQs{
1✔
154
                sendq: []*spb.ModifyRequest{},
1✔
155
                pendq: &pendingQueue{
1✔
156
                        Ops: map[uint64]*PendingOp{},
1✔
157
                },
1✔
158

1✔
159
                // modifyCh is buffered to ensure that there are no races writing to it - we
1✔
160
                // expect that 5 messages is sufficient to ensure that there is time for the
1✔
161
                // sender to shutdown.
1✔
162
                modifyCh: make(chan *spb.ModifyRequest, modifyBuffer),
1✔
163
                resultq:  []*OpResult{},
1✔
164

1✔
165
                sending: atomic.NewBool(false),
1✔
166
        }
1✔
167

1✔
168
        return c, nil
1✔
169
}
170

171
// Done returns a channel which is written to when the client is disconnected from the
172
// server. It can be used to trigger reconnections.
173
func (c *Client) Done() <-chan struct{} {
1✔
174
        return c.doneCh
1✔
175
}
1✔
176

177
// handleParams takes the set of gRIBI client options that are provided and uses them
178
// to populate the session parameters that they are translated into. It returns a
179
// populate SessionParameters protobuf along with any errors when parsing the supplied
180
// set of options.
181
func handleParams(opts ...Opt) (*clientState, error) {
1✔
182
        if len(opts) == 0 {
2✔
183
                return &clientState{}, nil
1✔
184
        }
1✔
185
        s := &clientState{
1✔
186
                SessParams: &spb.SessionParameters{},
1✔
187
        }
1✔
188

1✔
189
        for _, o := range opts {
2✔
190
                switch v := o.(type) {
1✔
191
                case *allPrimaryClients:
1✔
192
                        if s.ElectionID != nil {
2✔
193
                                return nil, fmt.Errorf("cannot specify both ALL_PRIMARY and SINGLE_PRIMARY redundancy modes")
1✔
194
                        }
1✔
195
                        s.SessParams.Redundancy = spb.SessionParameters_ALL_PRIMARY
1✔
196
                case *electedPrimaryClient:
1✔
197
                        s.SessParams.Redundancy = spb.SessionParameters_SINGLE_PRIMARY
1✔
198
                        s.ElectionID = v.electionID
1✔
199
                case *persistEntries:
1✔
200
                        s.SessParams.Persistence = spb.SessionParameters_PRESERVE
1✔
201
                case *fibACK:
×
202
                        s.SessParams.AckType = spb.SessionParameters_RIB_AND_FIB_ACK
×
203
                }
204
        }
205

206
        return s, nil
1✔
207
}
208

209
// DialOpt specifies options that can be used when dialing the gRIBI server
210
// specified by the client.
211
type DialOpt interface {
212
        isDialOpt()
213
}
214

215
// Dial dials the server specified in the addr string, using the specified
216
// set of dial options.
217
func (c *Client) Dial(ctx context.Context, addr string, opts ...DialOpt) error {
1✔
218
        // TODO(robjs): translate any options within the dial options here, we may
1✔
219
        // want to consider just accepting some gRPC dialoptions directly.
1✔
220

1✔
221
        dialOpts := []grpc.DialOption{grpc.WithBlock()}
1✔
222

1✔
223
        tlsc := credentials.NewTLS(&tls.Config{
1✔
224
                InsecureSkipVerify: true,
1✔
225
        })
1✔
226
        dialOpts = append(dialOpts, grpc.WithTransportCredentials(tlsc))
1✔
227

1✔
228
        conn, err := grpc.DialContext(ctx, addr, dialOpts...)
1✔
229
        if err != nil {
1✔
230
                return fmt.Errorf("cannot dial remote system, %v", err)
×
231
        }
×
232
        c.conn = conn
1✔
233
        c.c = spb.NewGRIBIClient(conn)
1✔
234
        return nil
1✔
235
}
236

237
// UseStub instructs the client to use the gRPC stub implementing the
238
// GRIBIClient service.
239
func (c *Client) UseStub(stub spb.GRIBIClient) error {
×
240
        if c.c != nil {
×
241
                return errors.New("client already has a stub")
×
242
        }
×
243
        c.c = stub
×
244
        return nil
×
245
}
246

247
// ReplaceStub replaces the gRPC stub used by the client only if the client
248
// is not in a sending state.
249
func (c *Client) ReplaceStub(stub spb.GRIBIClient) error {
1✔
250
        c.qs.sendMu.Lock()
1✔
251
        defer c.qs.sendMu.Unlock()
1✔
252
        if c.qs.sending.Load() {
2✔
253
                return errors.New("must be in stopped sending state to replace stub")
1✔
254
        }
1✔
255

256
        c.c = stub
1✔
257
        return nil
1✔
258
}
259

260
// Reset clears the client's transient state - is is recommended to call this method between
261
// reconnections at a server to clear pending queues and results which are no longer valid.
262
func (c *Client) Reset() {
1✔
263
        c.StopSending()
1✔
264
        c.disconnect()
1✔
265

1✔
266
        c.sendErrMu.Lock()
1✔
267
        defer c.sendErrMu.Unlock()
1✔
268
        c.sendErr = nil
1✔
269

1✔
270
        c.readErrMu.Lock()
1✔
271
        defer c.readErrMu.Unlock()
1✔
272
        c.readErr = nil
1✔
273

1✔
274
        c.qs.sendMu.Lock()
1✔
275
        defer c.qs.sendMu.Unlock()
1✔
276
        c.qs.sendq = nil
1✔
277

1✔
278
        c.qs.pendMu.Lock()
1✔
279
        defer c.qs.pendMu.Unlock()
1✔
280
        c.qs.pendq = &pendingQueue{
1✔
281
                Ops: map[uint64]*PendingOp{},
1✔
282
        }
1✔
283

1✔
284
        c.qs.resultMu.Lock()
1✔
285
        defer c.qs.resultMu.Unlock()
1✔
286
        c.qs.resultq = nil
1✔
287

1✔
288
        c.qs.modifyCh = make(chan *spb.ModifyRequest, 5)
1✔
289

1✔
290
        // Empty the done channel if a reader did not take the message from it.
1✔
291
        select {
1✔
292
        case <-c.doneCh:
1✔
293
        default:
1✔
294
        }
295
}
296

297
// disconnect shuts down the goroutines started by Connect().
298
func (c *Client) disconnect() {
1✔
299
        skipClose := false
1✔
300
        if c.sendExitCh == nil || chIsClosed(c.sendExitCh) {
2✔
301
                skipClose = true
1✔
302
        }
1✔
303
        if !skipClose {
2✔
304
                // Close the modifyCh to signal to the request handler to exit.
1✔
305
                close(c.qs.modifyCh)
1✔
306
        }
1✔
307
        c.wg.Wait()
1✔
308
}
309

310
// Close disconnects the underlying gRPC connection to the gRIBI server.
311
func (c *Client) Close() error {
1✔
312
        c.disconnect()
1✔
313
        if c.conn == nil {
1✔
314
                return nil
×
315
        }
×
316
        return c.conn.Close()
1✔
317
}
318

319
// AllPrimaryClients is an option that is used when creating a new client that
320
// specifies that this client will be part of a group of clients that are all
321
// considered primary sources of routing information.
322
//
323
// In this mode of operation, the server is expected to store all entries that
324
// are written to it over gRIBI regardless of their source client. If multiple
325
// clients write the same entry, selection amongst them is done based on the
326
// lowest client identifier (the remote address when expressed as a 128-bit
327
// number) as per the explanation in
328
// https://github.com/openconfig/gribi/blob/master/doc/motivation.md#tying-injected-entries-to-the-liveliness-of-grpc-client
329
func AllPrimaryClients() *allPrimaryClients {
1✔
330
        return &allPrimaryClients{}
1✔
331
}
1✔
332

333
type allPrimaryClients struct{}
334

335
func (allPrimaryClients) isClientOpt() {}
×
336

337
// ElectedPrimaryClient is an option used when creating a new client that
338
// specifies that this client will be part of a group of clients elect a
339
// master amongst them, such that exactly one client is considered the primary
340
// source of routing information. The server is not expected to know any of
341
// the details of this election process - but arbitrates based on a supplied
342
// election ID (expressed as a 128-bit number). The client with the highest
343
// election ID is considered the primary and hence has active entries within
344
// the device's RIB.
345
//
346
// The initial election ID to be used is stored
347
func ElectedPrimaryClient(initialID *spb.Uint128) *electedPrimaryClient {
1✔
348
        return &electedPrimaryClient{electionID: initialID}
1✔
349
}
1✔
350

351
type electedPrimaryClient struct {
352
        electionID *spb.Uint128
353
}
354

355
func (electedPrimaryClient) isClientOpt() {}
×
356

357
// PersistEntries indicates that the client should request that the server
358
// persists entries after it has disconnected. By default, a gRIBI server will
359
// purge all entries when the client disconnects. This persistence mode may
360
// only be used when the redundancy mode is AllPrimaryClients.
361
func PersistEntries() *persistEntries {
1✔
362
        return &persistEntries{}
1✔
363
}
1✔
364

365
type persistEntries struct{}
366

367
func (persistEntries) isClientOpt() {}
×
368

369
// FIBACK indicates that the client should request that the server
370
// sends acknowledgements for gRIBI transactions only after they have been
371
// programmed into the FIB. By default, a gRIBI server will send an acknowledgement
372
// when the entry has been installed into the RIB.
373
func FIBACK() *fibACK {
×
374
        return &fibACK{}
×
375
}
×
376

377
type fibACK struct{}
378

379
func (fibACK) isClientOpt() {}
×
380

381
func debugWatcher(ctx context.Context, role, id string) {
1✔
382
        for {
2✔
383
                select {
1✔
384
                case <-ctx.Done():
1✔
385
                        log.Errorf("goroutine %s:%s: exiting at %s", role, id, time.Now())
1✔
386
                        return
1✔
387
                default:
1✔
388
                }
389
                log.Errorf("goroutine %s:%s: still running at %s", role, id, time.Now())
1✔
390
                time.Sleep(1 * time.Second)
1✔
391
        }
392
}
393

394
// Connect establishes a Modify RPC to the client and sends the initial session
395
// parameters/election ID if required. The Modify RPC is stored within the client
396
// such that it can be used for subsequent calls - such that Connect must be called
397
// before subsequent operations that should send to the target.
398
//
399
// An error is returned if the Modify RPC cannot be established or there is an error
400
// response to the initial messages that are sent on the connection.
401
func (c *Client) Connect(ctx context.Context) error {
1✔
402
        // Store that we are no longer shut down, since Connect can be called multiple
1✔
403
        // times on the same client.
1✔
404
        c.shut.Store(false)
1✔
405
        c.sendExitCh = make(chan struct{}, 1)
1✔
406

1✔
407
        stream, err := c.c.Modify(ctx)
1✔
408
        if err != nil {
2✔
409
                return fmt.Errorf("cannot open Modify RPC, %v", err)
1✔
410
        }
1✔
411

412
        // TODO(robjs): if we made these functions not niladic, and
413
        // supplied to the client, then we could allow the user to
414
        // overload functions that read and write - and hence do
415
        // lower-layer operations to the gRIBI server directly.
416
        // Modify this code to do this (make these just be default
417
        // handler functions, they could still be inline).
418

419
        informDone := func(who string) {
2✔
420
                select {
1✔
421
                case c.doneCh <- struct{}{}:
1✔
422
                        log.Infof("writing to done channel, requested by %s", who)
1✔
423
                default:
1✔
424
                        log.Infof("dropped message informing caller the client is done.")
1✔
425
                }
426
        }
427

428
        // respHandler takes a received modify response and error, and returns
429
        // a bool indicating that the loop within which it is called should
430
        // exit.
431
        respHandler := func(in *spb.ModifyResponse, err error) bool {
2✔
432
                log.V(2).Infof("received message on Modify stream: %s", in)
1✔
433
                c.awaiting.RLock()
1✔
434
                defer c.awaiting.RUnlock()
1✔
435
                if err == io.EOF {
2✔
436
                        // reading is done, so write should shut down too.
1✔
437
                        c.shut.Store(true)
1✔
438
                        return true
1✔
439
                }
1✔
440
                if err != nil {
2✔
441
                        log.Errorf("got error receiving message, %v", err)
1✔
442
                        c.addReadErr(err)
1✔
443
                        return true
1✔
444
                }
1✔
445
                if err := c.handleModifyResponse(in); err != nil {
1✔
446
                        log.Errorf("got error processing message received from server, %v", err)
×
447
                        c.addReadErr(err)
×
448
                        return true
×
449
                }
×
450

451
                return false
1✔
452
        }
453

454
        id := uuid.New().String()
1✔
455
        c.wg.Add(1)
1✔
456
        go func() {
2✔
457
                defer c.wg.Done()
1✔
458
                defer informDone("receiver")
1✔
459
                if debug {
2✔
460
                        debugCtx, cancel := context.WithCancel(ctx)
1✔
461
                        defer cancel()
1✔
462
                        go debugWatcher(debugCtx, "recv", id)
1✔
463
                }
1✔
464
                for {
2✔
465
                        if c.shut.Load() {
1✔
466
                                log.V(2).Infof("shutting down recv goroutine, id: %s, cause: SHUTDOWN", id)
×
467
                                return
×
468
                        }
×
469
                        if done := respHandler(stream.Recv()); done {
2✔
470
                                log.V(2).Infof("shuttting down recv goroutine, id: %s, cause: HANDLER", id)
1✔
471
                                return
1✔
472
                        }
1✔
473
                }
474
        }()
475

476
        // reqHandler handles an input modify request and returns a bool when
477
        // the loop within which it is called should exit.
478
        reqHandler := func(m *spb.ModifyRequest, readOK bool) bool {
2✔
479
                if !readOK {
2✔
480
                        // client close requested by disconnect(), since the modifyCh is closed.
1✔
481
                        if err := stream.CloseSend(); err != nil {
1✔
482
                                log.Errorf("got error closing session: %v", err)
×
483
                        }
×
484
                        return true
1✔
485
                }
486

487
                c.awaiting.RLock()
1✔
488
                defer c.awaiting.RUnlock()
1✔
489
                if err := stream.Send(m); err != nil {
1✔
490
                        log.Errorf("got error sending message: %v", err)
×
491
                        c.addSendErr(err)
×
492
                        return true
×
493
                }
×
494
                log.V(2).Infof("sent Modify message %s", m)
1✔
495
                return false
1✔
496
        }
497

498
        c.wg.Add(1)
1✔
499
        go func() {
2✔
500
                defer c.wg.Done()
1✔
501
                defer informDone("sender")
1✔
502
                if debug {
2✔
503
                        debugCtx, cancel := context.WithCancel(ctx)
1✔
504
                        defer cancel()
1✔
505
                        go debugWatcher(debugCtx, "send", id)
1✔
506
                }
1✔
507
                defer func() {
2✔
508
                        // Signal that we are exiting, this allows us to avoid the case that
1✔
509
                        // a race causes the modifyCh to become blocking.
1✔
510
                        log.V(2).Infof("closing send channel in id: %s", id)
1✔
511
                        c.sendExitCh <- struct{}{}
1✔
512
                        close(c.sendExitCh)
1✔
513
                }()
1✔
514
                for {
2✔
515
                        if c.shut.Load() {
1✔
516
                                log.V(2).Infof("shutting down send goroutine, id: %s, cause: SHUTDOWN", id)
×
517
                                return
×
518
                        }
×
519

520
                        v, ok := <-c.qs.modifyCh
1✔
521
                        if done := reqHandler(v, ok); done {
2✔
522
                                log.V(2).Infof("shutting down send goroutine, id: %s, cause: HANDLER", id)
1✔
523
                                return
1✔
524
                        }
1✔
525
                }
526
        }()
527

528
        return nil
1✔
529
}
530

531
// clientQs defines the set of queues that are used for the Modify RPC.
532
type clientQs struct {
533
        // sendMu protects the send queue.
534
        sendMu sync.RWMutex
535
        // sendq contains any ModifyRequest messages that are queued to be
536
        // sent to the target. The send queue is emptied after the client is
537
        // instructed to start sending messages.
538
        sendq []*spb.ModifyRequest
539

540
        // pendMu protects the pending queue.
541
        pendMu sync.RWMutex
542
        // pendq stores the pending transactions to the target. Once a message
543
        // is taken from the sendq, and sent over the Modify stream, an entry
544
        // in the pendq is created for each AFTOperation within the ModifyRequest.
545
        // The key for the pendq is the ID of the operation to speed up lookups.
546
        // The value stored in the pendq is a bool - such that the library discards
547
        // the contents of a Modify request after it was sent.
548
        //
549
        // TODO(robjs): currently, the implementation we have here just handles the
550
        // positive case that we get an acknowledged transaction - consider whether
551
        // we want the ability to have the client handle timeouts and inject these
552
        // as failed per the controller implementation.
553
        pendq *pendingQueue
554

555
        // modifyCh is the channel that is used to write to the goroutine that is
556
        // the sole source of writes onto the modify stream.
557
        modifyCh chan *spb.ModifyRequest
558

559
        // resultMu protects the results queue.
560
        resultMu sync.RWMutex
561
        // resultq stores the responses that are received from the target. When a
562
        // ModifyResponse is received from the target, the entry is removed from the
563
        // pendq (based on the ID) for any included AFTResult, and the result
564
        // is written to the result queue.
565
        resultq []*OpResult
566

567
        // sending indicates whether the client will empty the sendq. By default,
568
        // messages are queued into the sendq and not sent to the target.
569
        sending *atomic.Bool
570
}
571

572
// pendingQueue provides a queue type that determines the set of pending
573
// operations for the client. Since pending operations are added and removed
574
// based on receiving a ModifyRequest or ModifyResponse in the client, there is
575
// no case in which we need to individually access these elements, so they are
576
// to be protected by a single mutex.
577
type pendingQueue struct {
578
        // ops is the set of AFT operations that are pending in the client.
579
        Ops map[uint64]*PendingOp
580
        // Election is the pending election that has been sent to the client, there can
581
        // only be one pending update - since each election replaces the previous one. A
582
        // single response is expected to clear all the election updates that have been sent
583
        // to the server.
584
        Election *ElectionReqDetails
585

586
        // SessionParams indicates that there is a pending SessionParameters
587
        // request sent to the server. There can only be one such request since
588
        // we only send the request at the start of the connection, and MUST NOT
589
        // send it again.
590
        SessionParams *SessionParamReqDetails
591
}
592

593
// ElectionReqDetails stores the details of an individual election update that was sent
594
// to the server.
595
type ElectionReqDetails struct {
596
        // Timestamp is the time at which the update was sent, specified in nanoseconds
597
        // since the epoch.
598
        Timestamp int64
599
        // ID is the election ID that was sent to the server. This can be used to determine
600
        // whether the response shows that this client actually became the master.
601
        ID *spb.Uint128
602
}
603

604
// isPendingRequest implements the PendingRequest interface for ElectionReqDetails.
605
func (*ElectionReqDetails) isPendingRequest() {}
×
606

607
// SessionParamReqDetails stores the details of an individual session parameters update
608
// that was sent to the server.
609
type SessionParamReqDetails struct {
610
        // Timestamp is the time at which the update was sent, specified in nanoseconds
611
        // since the epoch.
612
        Timestamp int64
613
        // Outgoing is the parameters that were sent to the server.
614
        Outgoing *spb.SessionParameters
615
}
616

617
// isPendingRequest implements the PendingRequest interface for SessionParamReqDetails.
618
func (*SessionParamReqDetails) isPendingRequest() {}
×
619

620
// Len returns the length of the pending queue.
621
func (p *pendingQueue) Len() int {
1✔
622
        if p == nil {
2✔
623
                return 0
1✔
624
        }
1✔
625
        var i int
1✔
626
        if p.SessionParams != nil {
2✔
627
                i++
1✔
628
        }
1✔
629
        if p.Election != nil {
2✔
630
                i++
1✔
631
        }
1✔
632

633
        return i + len(p.Ops)
1✔
634
}
635

636
// PendingRequest is an interface implemented by all types that can be reported back
637
// to callers describing a pending request in the client.
638
type PendingRequest interface {
639
        isPendingRequest()
640
}
641

642
// PendingOp stores details pertaining to a pending request in the client.
643
type PendingOp struct {
644
        // Timestamp is the timestamp that the operation was queued at.
645
        Timestamp int64
646
        // Op is the operation that the pending request pertains to.
647
        Op *spb.AFTOperation
648
}
649

650
// isPendingRequest implements the PendingRequest interface for the PendingOp type.
651
func (*PendingOp) isPendingRequest() {}
×
652

653
// OpResult stores details pertaining to a result that was received from
654
// the server.
655
type OpResult struct {
656
        // Timestamp is the timestamp that the result was received.
657
        Timestamp int64
658
        // Latency is the latency of the request from the server. This is calculated
659
        // based on the time that the request was sent to the client (became pending)
660
        // and the time that the response was received from the server. It is expressed
661
        // in nanoseconds.
662
        Latency int64
663

664
        // CurrentServerElectionID indicates that the message that was received from the server
665
        // was an ModifyResponse sent in response to an updated election ID, its value is the
666
        // current master election ID (maximum election ID seen from any client) reported from
667
        // the server.
668
        CurrentServerElectionID *spb.Uint128
669

670
        // SessionParameters contains the parameters that were received from the server in
671
        // response to the parameters sent to the server.
672
        SessionParameters *spb.SessionParametersResult
673

674
        // OperationID indicates that the message that was received from the server was a
675
        // ModifyResponse sent in response to an AFTOperation, its value is the ID of the
676
        // operation to which it corresponds.
677
        OperationID uint64
678

679
        // ClientError describes an error that is internal to the client.
680
        ClientError string
681

682
        // ProgrammingResult stores the result of an AFT operation on the server.
683
        ProgrammingResult spb.AFTResult_Status
684

685
        // Details stores detailed information about the operation over the ID
686
        // and the result.
687
        Details *OpDetailsResults
688
}
689

690
// String returns a string for an OpResult for debugging purposes.
691
func (o *OpResult) String() string {
1✔
692
        if o == nil {
2✔
693
                return "<nil>"
1✔
694
        }
1✔
695

696
        buf := &bytes.Buffer{}
1✔
697
        buf.WriteString("<")
1✔
698
        buf.WriteString(fmt.Sprintf("%d (%d nsec):", o.Timestamp, o.Latency))
1✔
699

1✔
700
        if v := o.CurrentServerElectionID; v != nil {
1✔
701
                e := uint128.New(v.Low, v.High)
×
702
                buf.WriteString(fmt.Sprintf(" ElectionID: %s", e))
×
703
        }
×
704

705
        if opID, pr := o.OperationID, o.ProgrammingResult; opID != 0 || pr != spb.AFTResult_UNSET {
2✔
706
                buf.WriteString(fmt.Sprintf(" AFTOperation { ID: %d, Details: %s, Status: %s }", opID, o.Details, o.ProgrammingResult))
1✔
707
        }
1✔
708

709
        if v := o.SessionParameters; v != nil {
1✔
710
                buf.WriteString(fmt.Sprintf(" SessionParameterResult: OK (%s)", v.String()))
×
711
        }
×
712

713
        if v := o.ClientError; v != "" {
1✔
714
                buf.WriteString(fmt.Sprintf(" With Error: %s", v))
×
715
        }
×
716
        buf.WriteString(">")
1✔
717

1✔
718
        return buf.String()
1✔
719
}
720

721
// OpDetailsResults provides details of an operation for use in the results.
722
type OpDetailsResults struct {
723
        // Type is the type of the operation (i.e., ADD, MODIFY, DELETE)
724
        Type constants.OpType
725

726
        // NextHopIndex is the identifier for a next-hop modified by the operation.
727
        NextHopIndex uint64
728
        // NextHopGroupID is the identifier for a next-hop-group modified by the
729
        // operation.
730
        NextHopGroupID uint64
731
        // IPv4Prefix is the IPv4 prefix modified by the operation.
732
        IPv4Prefix string
733
        // IPv6Prefix is the IPv6 prefix modified by the operation.
734
        IPv6Prefix string
735
        // MPLSLabel is the MPLS label that was modified by the operation.
736
        MPLSLabel uint64
737
}
738

739
// String returns a human-readable form of the OpDetailsResults
740
func (o *OpDetailsResults) String() string {
1✔
741
        if o == nil {
2✔
742
                return "<nil>"
1✔
743
        }
1✔
744
        buf := &bytes.Buffer{}
×
745
        buf.WriteString(fmt.Sprintf("<Type: %s ", o.Type))
×
746
        switch {
×
747
        case o.NextHopIndex != 0:
×
748
                buf.WriteString(fmt.Sprintf("NH Index: %d", o.NextHopIndex))
×
749
        case o.NextHopGroupID != 0:
×
750
                buf.WriteString(fmt.Sprintf("NHG ID: %d", o.NextHopGroupID))
×
751
        case o.IPv4Prefix != "":
×
752
                buf.WriteString(fmt.Sprintf("IPv4: %s", o.IPv4Prefix))
×
753
        case o.IPv6Prefix != "":
×
754
                buf.WriteString(fmt.Sprintf("IPv6: %s", o.IPv6Prefix))
×
755
        case o.MPLSLabel != 0:
×
756
                buf.WriteString(fmt.Sprintf("MPLS: %d", o.MPLSLabel))
×
757
        }
758
        buf.WriteString(">")
×
759

×
760
        return buf.String()
×
761
}
762

763
// Q enqueues a ModifyRequest to be sent to the target.
764
func (c *Client) Q(m *spb.ModifyRequest) {
1✔
765
        if err := c.handleModifyRequest(m); err != nil {
1✔
766
                log.Errorf("got error processing message that was to be sent, %v", err)
×
767
                c.addSendErr(err)
×
768
        }
×
769

770
        if !c.qs.sending.Load() {
2✔
771
                c.qs.sendMu.Lock()
1✔
772
                defer c.qs.sendMu.Unlock()
1✔
773
                log.V(2).Infof("appended %s to sending queue", m)
1✔
774
                c.qs.sendq = append(c.qs.sendq, m)
1✔
775
                return
1✔
776
        }
1✔
777
        log.V(2).Infof("sending %s directly to queue", m)
1✔
778

1✔
779
        c.q(m)
1✔
780
}
781

782
// chIsClosed returns true if the channel supplied has been written to,
783
// or is closed - otherwise it returns false indicating it is still open. This
784
// check can be used to determine whether a goroutine that writes to a channel
785
// on exit is still running.
786
func chIsClosed(ch <-chan struct{}) bool {
1✔
787
        select {
1✔
788
        case v, ok := <-ch:
1✔
789
                if v == struct{}{} || !ok {
2✔
790
                        return true
1✔
791
                }
1✔
792
        default:
1✔
793
                return false
1✔
794
        }
795
        return false
×
796
}
797

798
// q is the internal implementation of queue that writes the ModifyRequest to
799
// the channel to be sent.
800
func (c *Client) q(m *spb.ModifyRequest) {
1✔
801
        c.awaiting.RLock()
1✔
802
        defer c.awaiting.RUnlock()
1✔
803

1✔
804
        if !chIsClosed(c.sendExitCh) {
2✔
805
                c.qs.modifyCh <- m
1✔
806
        }
1✔
807
}
808

809
// StartSending toggles the client to begin sending messages that are in the send
810
// queue (enqued by Q) to the connection established by Connect.
811
func (c *Client) StartSending() {
1✔
812
        c.qs.sending.Store(true)
1✔
813

1✔
814
        if c.state.SessParams != nil {
2✔
815
                c.Q(&spb.ModifyRequest{
1✔
816
                        Params: c.state.SessParams,
1✔
817
                })
1✔
818
        }
1✔
819

820
        if c.state.ElectionID != nil {
2✔
821
                c.Q(&spb.ModifyRequest{
1✔
822
                        ElectionId: c.state.ElectionID,
1✔
823
                })
1✔
824
        }
1✔
825

826
        // take the initial set of messages that were enqueued and queue them.
827
        c.qs.sendMu.Lock()
1✔
828
        defer c.qs.sendMu.Unlock()
1✔
829
        for _, m := range c.qs.sendq {
2✔
830
                log.V(2).Infof("sending %s to modify channel", m)
1✔
831
                c.q(m)
1✔
832
        }
1✔
833
        c.qs.sendq = []*spb.ModifyRequest{}
1✔
834
}
835

836
// StopSending toggles the client to stop sending messages to the server, meaning
837
// that entries that are enqueued will be stored until StartSending is called.
838
func (c *Client) StopSending() {
1✔
839
        c.qs.sending.Store(false)
1✔
840
}
1✔
841

842
// handleModifyRequest performs any required post-processing after having sent a
843
// ModifyRequest onto the gRPC channel to the server. Particularly, this ensures
844
// that pending transactions are enqueued into the pending queue where they have
845
// a specified ID.
846
func (c *Client) handleModifyRequest(m *spb.ModifyRequest) error {
1✔
847
        // Add any pending operations to the pending queue.
1✔
848
        for _, o := range m.Operation {
2✔
849
                if err := c.addPendingOp(o); err != nil {
2✔
850
                        return err
1✔
851
                }
1✔
852
        }
853

854
        if m.ElectionId != nil {
2✔
855
                c.updatePendingElection(m.ElectionId)
1✔
856
        }
1✔
857

858
        if m.Params != nil {
2✔
859
                c.pendingSessionParams(m.Params)
1✔
860
        }
1✔
861

862
        return nil
1✔
863
}
864

865
// handleModifyResponse performs any required post-processing after having received
866
// a ModifyResponse from the gRPC channel from the server. Particularly, this
867
// ensures that pending transactions are dequeued into the results queue. An error
868
// is returned if the post processing is not possible.
869
func (c *Client) handleModifyResponse(m *spb.ModifyResponse) error {
1✔
870
        if m == nil {
2✔
871
                return errors.New("invalid nil modify response returned")
1✔
872
        }
1✔
873

874
        resPop := m.Result != nil
1✔
875
        elecPop := m.ElectionId != nil
1✔
876
        sessPop := m.SessionParamsResult != nil
1✔
877
        pop := 0
1✔
878
        for _, v := range []bool{resPop, elecPop, sessPop} {
2✔
879
                if v {
2✔
880
                        pop++
1✔
881
                }
1✔
882
        }
883
        if pop > 1 {
2✔
884
                return fmt.Errorf("invalid returned message, ElectionID, Result, and SessionParametersResult are mutually exclusive, got: %s", m)
1✔
885
        }
1✔
886

887
        // At this point we know we will have something to add to the results, so ensure that we
888
        // hold the results lock. This also stops the client from converging.
889
        c.qs.resultMu.Lock()
1✔
890
        defer c.qs.resultMu.Unlock()
1✔
891

1✔
892
        if m.ElectionId != nil {
2✔
893
                er := c.clearPendingElection()
1✔
894
                er.CurrentServerElectionID = m.ElectionId
1✔
895
                // This is an update from the server in response to an updated master election ID.
1✔
896
                c.qs.resultq = append(c.qs.resultq, er)
1✔
897
        }
1✔
898

899
        if m.SessionParamsResult != nil {
2✔
900
                sr := c.clearPendingSessionParams()
1✔
901
                sr.SessionParameters = m.SessionParamsResult
1✔
902
                c.qs.resultq = append(c.qs.resultq, sr)
1✔
903
        }
1✔
904

905
        for _, r := range m.Result {
2✔
906
                res, err := c.clearPendingOp(r)
1✔
907
                c.qs.resultq = append(c.qs.resultq, res)
1✔
908
                if err != nil {
1✔
909
                        return fmt.Errorf("cannot remove pending operation %d, %v", r.Id, err)
×
910
                }
×
911
        }
912

913
        return nil
1✔
914
}
915

916
// isConverged indicates whether the client is converged.
917
func (c *Client) isConverged() bool {
1✔
918
        c.qs.sendMu.RLock()
1✔
919
        defer c.qs.sendMu.RUnlock()
1✔
920
        c.qs.pendMu.RLock()
1✔
921
        defer c.qs.pendMu.RUnlock()
1✔
922

1✔
923
        return len(c.qs.sendq) == 0 && c.qs.pendq.Len() == 0
1✔
924
}
1✔
925

926
// addPendingOp adds the operation specified by op to the pending transaction queue
927
// on the client. This queue stores the operations that have been sent to the server
928
// but have not yet been reported on. It returns an error if the pending transaction
929
// cannot be added.
930
//
931
// TODO(robjs): if there is a pending operation for the same key (IPv4 prefix, NHG, NH)
932
// then we need to ensure that we correctly coalesce these values. This means that we need
933
// to keep some map of all the requests that corresponded to the same key.
934
func (c *Client) addPendingOp(op *spb.AFTOperation) error {
1✔
935
        c.qs.pendMu.Lock()
1✔
936
        defer c.qs.pendMu.Unlock()
1✔
937
        if v := c.qs.pendq.Ops[op.Id]; v != nil {
2✔
938
                return fmt.Errorf("could not enqueue operation %d, duplicate pending ID (pending: %v)", op.Id, v)
1✔
939
        }
1✔
940
        c.qs.pendq.Ops[op.Id] = &PendingOp{
1✔
941
                Timestamp: unixTS(),
1✔
942
                Op:        op,
1✔
943
        }
1✔
944
        return nil
1✔
945
}
946

947
// clearPendingOp removes the operation with the ID in the specified result from the
948
// pending queue and returns the result.
949
func (c *Client) clearPendingOp(op *spb.AFTResult) (*OpResult, error) {
1✔
950
        c.qs.pendMu.Lock()
1✔
951
        defer c.qs.pendMu.Unlock()
1✔
952

1✔
953
        if TreatRIBACKAsCompletedInFIBACKMode && c.state.SessParams.GetAckType() != spb.SessionParameters_RIB_AND_FIB_ACK {
1✔
954
                return nil, fmt.Errorf("logic error, TreatRIBACKAsCompletedInFIBACKMode set to true in %s mode", c.state.SessParams.GetAckType())
×
955
        }
×
956

957
        v, ok := c.qs.pendq.Ops[op.Id]
1✔
958
        if !ok {
2✔
959
                switch {
1✔
960
                case op.GetStatus() == spb.AFTResult_FIB_PROGRAMMED && TreatRIBACKAsCompletedInFIBACKMode:
1✔
961
                        // Expected condition, we hav already dequeued this operation because we are treating RIB_ACK as completed
1✔
962
                        // even though we are in FIB programmed mode.
1✔
963
                        return nil, nil
1✔
964
                case op.GetStatus() == spb.AFTResult_RIB_PROGRAMMED && c.state.SessParams.GetAckType() == spb.SessionParameters_RIB_AND_FIB_ACK:
1✔
965
                        // This condition occurs when the server sends up a FIB_ACK before a RIB_ACK and hence we have dequeued
1✔
966
                        // the operation. In this case, we don't return an error and simply log that this happened. This is based
1✔
967
                        // on being permissive, but is unexpected since gRPC should maintain the order, and there's no reason that
1✔
968
                        // we would expect that something can be programmed in the FIB before it has hit the RIB.
1✔
969
                        log.Warningf("operation %d, unexpectedly saw RIB_ACK after FIB_ACK", op.Id)
1✔
970

1✔
971
                        return &OpResult{
1✔
972
                                Timestamp:         unixTS(),
1✔
973
                                OperationID:       op.GetId(),
1✔
974
                                ProgrammingResult: op.GetStatus(),
1✔
975
                        }, nil
1✔
976
                }
977
                return nil, fmt.Errorf("could not dequeue operation %d, unknown operation", op.Id)
×
978
        }
979

980
        // We know that the operation exists - determine how to dequeue it.
981
        switch op.GetStatus() {
1✔
982
        case spb.AFTResult_FIB_FAILED, spb.AFTResult_FIB_PROGRAMMED:
1✔
983
                if TreatRIBACKAsCompletedInFIBACKMode {
1✔
984
                        log.Infof("RIB ACK not received for operation %d - dequeue is based on FIB ACK", op.Id)
×
985
                }
×
986
                delete(c.qs.pendq.Ops, op.Id)
1✔
987
        case spb.AFTResult_RIB_PROGRAMMED:
1✔
988
                switch {
1✔
989
                case TreatRIBACKAsCompletedInFIBACKMode:
1✔
990
                        // Dequeue, since we're specifically being asked to use RIB ACKs as the completion of the
1✔
991
                        // transaction.
1✔
992
                        delete(c.qs.pendq.Ops, op.Id)
1✔
993
                case c.state.SessParams.GetAckType() != spb.SessionParameters_RIB_AND_FIB_ACK:
1✔
994
                        // RIB ACK dequeues when we are not expecting FIB ACK.
1✔
995
                        delete(c.qs.pendq.Ops, op.Id)
1✔
996
                default:
1✔
997
                        // We're in FIB_ACK mode and this is a RIB_ACK, so the transaction is not complete.
998
                }
999
        case spb.AFTResult_FAILED:
1✔
1000
                delete(c.qs.pendq.Ops, op.Id)
1✔
1001
        }
1002

1003
        if v == nil {
1✔
1004
                return nil, fmt.Errorf("could not dequeue operation %d, unknown operation", op.Id)
×
1005
        }
×
1006

1007
        det := &OpDetailsResults{
1✔
1008
                Type: constants.OpFromAFTOp(v.Op.Op),
1✔
1009
        }
1✔
1010
        switch opEntry := v.Op.Entry.(type) {
1✔
1011
        case *spb.AFTOperation_Ipv4:
×
1012
                det.IPv4Prefix = opEntry.Ipv4.GetPrefix()
×
1013
        case *spb.AFTOperation_Ipv6:
×
1014
                det.IPv6Prefix = opEntry.Ipv6.GetPrefix()
×
1015
        case *spb.AFTOperation_Mpls:
×
1016
                det.MPLSLabel = opEntry.Mpls.GetLabelUint64()
×
1017
        case *spb.AFTOperation_NextHopGroup:
×
1018
                det.NextHopGroupID = opEntry.NextHopGroup.GetId()
×
1019
        case *spb.AFTOperation_NextHop:
1✔
1020
                det.NextHopIndex = opEntry.NextHop.GetIndex()
1✔
1021
        }
1022

1023
        n := unixTS()
1✔
1024
        return &OpResult{
1✔
1025
                Timestamp:         n,
1✔
1026
                Latency:           n - v.Timestamp,
1✔
1027
                OperationID:       op.GetId(),
1✔
1028
                ProgrammingResult: op.GetStatus(),
1✔
1029
                Details:           det,
1✔
1030
        }, nil
1✔
1031
}
1032

1033
// updatePendingElection adds the election ID specified by id to the pending transaction
1034
// queue on the client. There can only be a single pending election, and hence the
1035
// election ID is updated in place.
1036
func (c *Client) updatePendingElection(id *spb.Uint128) {
1✔
1037
        c.qs.pendMu.Lock()
1✔
1038
        defer c.qs.pendMu.Unlock()
1✔
1039
        c.qs.pendq.Election = &ElectionReqDetails{
1✔
1040
                Timestamp: unixTS(),
1✔
1041
                ID:        id,
1✔
1042
        }
1✔
1043
}
1✔
1044

1045
// clearPendingElection clears the pending election ID and returns a result determining
1046
// the latency and timestamp. If there is no pending election, a result describing an error
1047
// is returned.
1048
func (c *Client) clearPendingElection() *OpResult {
1✔
1049
        c.qs.pendMu.Lock()
1✔
1050
        defer c.qs.pendMu.Unlock()
1✔
1051
        e := c.qs.pendq.Election
1✔
1052
        if e == nil {
2✔
1053
                return &OpResult{
1✔
1054
                        Timestamp:   unixTS(),
1✔
1055
                        ClientError: "received a election update when there was none pending",
1✔
1056
                }
1✔
1057
        }
1✔
1058

1059
        n := unixTS()
1✔
1060
        c.qs.pendq.Election = nil
1✔
1061
        return &OpResult{
1✔
1062
                Timestamp: n,
1✔
1063
                Latency:   n - e.Timestamp,
1✔
1064
        }
1✔
1065
}
1066

1067
// pendingSessionParams updates the client's outgoing queue to indicate that there
1068
// is a pending session parameters request.
1069
func (c *Client) pendingSessionParams(out *spb.SessionParameters) {
1✔
1070
        c.qs.pendMu.Lock()
1✔
1071
        defer c.qs.pendMu.Unlock()
1✔
1072
        c.qs.pendq.SessionParams = &SessionParamReqDetails{
1✔
1073
                Timestamp: unixTS(),
1✔
1074
                Outgoing:  out,
1✔
1075
        }
1✔
1076
}
1✔
1077

1078
// clearPendingSessionParams clears the pending session parameters request in the client
1079
// and returns an OpResult describing the timing of the response.
1080
func (c *Client) clearPendingSessionParams() *OpResult {
1✔
1081
        c.qs.pendMu.Lock()
1✔
1082
        defer c.qs.pendMu.Unlock()
1✔
1083
        e := c.qs.pendq.SessionParams
1✔
1084

1✔
1085
        if e == nil {
2✔
1086
                return &OpResult{
1✔
1087
                        Timestamp:   unixTS(),
1✔
1088
                        ClientError: "received a session parameter result when there was none pending",
1✔
1089
                }
1✔
1090
        }
1✔
1091

1092
        c.qs.pendq.SessionParams = nil
1✔
1093
        n := unixTS()
1✔
1094
        return &OpResult{
1✔
1095
                Timestamp: n,
1✔
1096
                Latency:   n - e.Timestamp,
1✔
1097
        }
1✔
1098
}
1099

1100
// addReadErr adds an error experienced when reading from the server to the readErr
1101
// slice on the client.
1102
func (c *Client) addReadErr(err error) {
1✔
1103
        c.readErrMu.Lock()
1✔
1104
        defer c.readErrMu.Unlock()
1✔
1105
        c.readErr = append(c.readErr, err)
1✔
1106
}
1✔
1107

1108
// addSendErr adds an error experienced when writing to the server to the sendErr
1109
// slice on the client.
1110
func (c *Client) addSendErr(err error) {
×
1111
        c.sendErrMu.Lock()
×
1112
        defer c.sendErrMu.Unlock()
×
1113
        c.sendErr = append(c.sendErr, err)
×
1114
}
×
1115

1116
// Pending returns the set of AFTOperations that are pending response from the
1117
// target, it returns a slice of PendingRequest interfaces which describes the contents
1118
// of the pending queues.
1119
func (c *Client) Pending() ([]PendingRequest, error) {
1✔
1120
        if c.qs == nil {
2✔
1121
                return nil, errors.New("invalid (nil) queues in client")
1✔
1122
        }
1✔
1123
        c.qs.pendMu.RLock()
1✔
1124
        defer c.qs.pendMu.RUnlock()
1✔
1125
        ret := []PendingRequest{}
1✔
1126

1✔
1127
        ids := []uint64{}
1✔
1128
        for k, o := range c.qs.pendq.Ops {
2✔
1129
                if o == nil || o.Op == nil {
2✔
1130
                        return nil, errors.New("invalid (nil) operation in pending queue")
1✔
1131
                }
1✔
1132
                ids = append(ids, k)
1✔
1133
        }
1134

1135
        sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
2✔
1136
        for _, v := range ids {
2✔
1137
                ret = append(ret, c.qs.pendq.Ops[v])
1✔
1138
        }
1✔
1139
        if c.qs.pendq.Election != nil {
2✔
1140
                ret = append(ret, c.qs.pendq.Election)
1✔
1141
        }
1✔
1142
        if c.qs.pendq.SessionParams != nil {
2✔
1143
                ret = append(ret, c.qs.pendq.SessionParams)
1✔
1144
        }
1✔
1145
        return ret, nil
1✔
1146
}
1147

1148
// Results returns the set of ModifyResponses that have been received from the
1149
// target.
1150
func (c *Client) Results() ([]*OpResult, error) {
1✔
1151
        if c.qs == nil {
2✔
1152
                return nil, errors.New("invalid (nil) queues in client")
1✔
1153
        }
1✔
1154
        c.qs.resultMu.RLock()
1✔
1155
        defer c.qs.resultMu.RUnlock()
1✔
1156
        return append(make([]*OpResult, 0, len(c.qs.resultq)), c.qs.resultq...), nil
1✔
1157
}
1158

1159
// AckResult allows a caller to acknowledge a specific result in the client's
1160
// queue removing it from the queue stored in the client. If results are not
1161
// acknowledged, the client will store all results indefinitely.
1162
func (c *Client) AckResult(res ...*OpResult) error {
1✔
1163
        if c.qs == nil {
1✔
1164
                return errors.New("invalid (nil) queue in client")
×
1165
        }
×
1166

1167
        toACK := map[uint64]bool{}
1✔
1168
        for _, r := range res {
2✔
1169
                toACK[r.OperationID] = false
1✔
1170
        }
1✔
1171

1172
        c.qs.resultMu.RLock()
1✔
1173
        defer c.qs.resultMu.RUnlock()
1✔
1174
        nrq := []*OpResult{}
1✔
1175
        for _, r := range c.qs.resultq {
2✔
1176
                _, ok := toACK[r.OperationID]
1✔
1177
                if !ok {
2✔
1178
                        nrq = append(nrq, r)
1✔
1179
                }
1✔
1180
                if ok {
2✔
1181
                        toACK[r.OperationID] = true
1✔
1182
                }
1✔
1183
        }
1184
        c.qs.resultq = nrq
1✔
1185

1✔
1186
        var errs errlist.List
1✔
1187
        for k, v := range toACK {
2✔
1188
                if !v {
2✔
1189
                        errs.Add(fmt.Errorf("cannot find operation ID %d to acknowledge", k))
1✔
1190
                }
1✔
1191
        }
1192

1193
        return errs.Err()
1✔
1194
}
1195

1196
// ClientStatus is the overview status of the client, timestamped according to the
1197
// time at which the state was retrieved.
1198
type ClientStatus struct {
1199
        // Timestamp expressed in nanoseconds since the epoch that the status was retrieved.
1200
        Timestamp int64
1201
        // PendingTransactions is the slice of pending operations on the client.
1202
        PendingTransactions []PendingRequest
1203
        Results             []*OpResult
1204
        SendErrs            []error
1205
        ReadErrs            []error
1206
}
1207

1208
// Status returns a composite status of the client at the time that the caller specified.
1209
func (c *Client) Status() (*ClientStatus, error) {
1✔
1210
        pt, err := c.Pending()
1✔
1211
        if err != nil {
2✔
1212
                return nil, fmt.Errorf("invalid pending queue, %v", err)
1✔
1213
        }
1✔
1214
        r, err := c.Results()
1✔
1215
        if err != nil {
1✔
1216
                return nil, fmt.Errorf("invalid results queue, %v", err)
×
1217
        }
×
1218
        cs := &ClientStatus{
1✔
1219
                // we allow unixTS to be overloaded to allow us to unit test this function
1✔
1220
                // by overloading it.
1✔
1221
                Timestamp:           unixTS(),
1✔
1222
                PendingTransactions: pt,
1✔
1223
                Results:             r,
1✔
1224
        }
1✔
1225

1✔
1226
        c.sendErrMu.RLock()
1✔
1227
        defer c.sendErrMu.RUnlock()
1✔
1228
        cs.SendErrs = append(make([]error, 0, len(c.sendErr)), c.sendErr...)
1✔
1229

1✔
1230
        c.readErrMu.RLock()
1✔
1231
        defer c.readErrMu.RUnlock()
1✔
1232
        cs.ReadErrs = append(make([]error, 0, len(c.readErr)), c.readErr...)
1✔
1233

1✔
1234
        return cs, nil
1✔
1235
}
1236

1237
// hasErrors reports whether there are errors that have been encountered
1238
// in the client. It returns two slices of errors containing the send
1239
// and recv errors
1240
func (c *Client) hasErrors() ([]error, []error) {
1✔
1241
        c.readErrMu.RLock()
1✔
1242
        defer c.readErrMu.RUnlock()
1✔
1243
        c.sendErrMu.RLock()
1✔
1244
        defer c.sendErrMu.RUnlock()
1✔
1245
        if len(c.readErr) != 0 || len(c.sendErr) != 0 {
2✔
1246
                return c.sendErr, c.readErr
1✔
1247
        }
1✔
1248
        return nil, nil
1✔
1249
}
1250

1251
// ClientError encapsulates send and receive errors for the client.
1252
type ClientErr struct {
1253
        Send, Recv []error
1254
}
1255

1256
// Error implements the error interface.
1257
func (c *ClientErr) Error() string { return fmt.Sprintf("errors: send: %v, recv: %v", c.Send, c.Recv) }
×
1258

1259
// AwaitConverged waits until the client is converged and writes to the supplied
1260
// channel. The function blocks until such time as the client returns or when the
1261
// context is done.
1262
func (c *Client) AwaitConverged(ctx context.Context) error {
1✔
1263
        for {
2✔
1264
                select {
1✔
1265
                case <-ctx.Done():
×
1266
                        return ctx.Err()
×
1267
                default:
1✔
1268
                }
1269

1270
                // we need to check here that no-one is doing an operation that we might care about
1271
                // impacting convergence. this is:
1272
                //  - sending a message
1273
                //        - post-processing a sent message
1274
                //        - reading a message
1275
                //        - post-procesing a read message
1276
                // we do this by holding the awaiting mutex.
1277
                done, err := func() (bool, error) {
2✔
1278
                        c.awaiting.Lock()
1✔
1279
                        defer c.awaiting.Unlock()
1✔
1280
                        if sendE, recvE := c.hasErrors(); len(sendE) != 0 || len(recvE) != 0 {
1✔
1281
                                return true, &ClientErr{Send: sendE, Recv: recvE}
×
1282
                        }
×
1283
                        if c.isConverged() {
2✔
1284
                                return true, nil
1✔
1285
                        }
1✔
1286
                        return false, nil
1✔
1287
                }()
1288
                if err != nil {
1✔
1289
                        return err
×
1290
                }
×
1291
                if done {
2✔
1292
                        return nil
1✔
1293
                }
1✔
1294
                time.Sleep(BusyLoopDelay) // avoid busy looping.
1✔
1295
        }
1296
}
1297

1298
// Get implements the Get RPC to the gRIBI server. It takes an input context and a
1299
// GetRequest and returns a single GetResponse with all contained results within
1300
// it.
1301
func (c *Client) Get(ctx context.Context, sreq *spb.GetRequest) (*spb.GetResponse, error) {
1✔
1302
        if sreq == nil {
1✔
1303
                return nil, errors.New("get request cannot be nil")
×
1304
        }
×
1305

1306
        ni := sreq.GetNetworkInstance()
1✔
1307
        if ni == nil {
2✔
1308
                return nil, errors.New("network instance cannot be nil")
1✔
1309
        }
1✔
1310

1311
        switch ni.(type) {
1✔
1312
        case *spb.GetRequest_All:
1✔
1313
                if sreq.GetAll() == nil {
1✔
1314
                        return nil, errors.New("network instance All cannot be nil")
×
1315
                }
×
1316
        case *spb.GetRequest_Name:
1✔
1317
                if sreq.GetName() == "" {
1✔
1318
                        return nil, errors.New("network instance name is required")
×
1319
                }
×
1320
        }
1321

1322
        if sreq.GetAft() == spb.AFTType_INVALID {
2✔
1323
                return nil, errors.New("AFT is required")
1✔
1324
        }
1✔
1325

1326
        result := &spb.GetResponse{}
1✔
1327

1✔
1328
        stream, err := c.c.Get(ctx, sreq)
1✔
1329
        if err != nil {
1✔
1330
                return nil, fmt.Errorf("cannot send Get RPC, %v", err)
×
1331
        }
×
1332

1333
        for {
2✔
1334
                getres, err := stream.Recv()
1✔
1335
                if err == io.EOF {
2✔
1336
                        break
1✔
1337
                }
1338
                if err != nil {
1✔
1339
                        return nil, fmt.Errorf("error in Get RPC, %v", err)
×
1340
                }
×
1341
                result.Entry = append(result.Entry, getres.Entry...)
1✔
1342

1343
        }
1344
        return result, nil
1✔
1345
}
1346

1347
// Flush implements the gRIBI Flush RPC.
1348
func (c *Client) Flush(ctx context.Context, req *spb.FlushRequest) (*spb.FlushResponse, error) {
1✔
1349
        if req == nil {
2✔
1350
                return nil, errors.New("flush request cannot be nil")
1✔
1351
        }
1✔
1352

1353
        switch t := req.GetNetworkInstance().(type) {
1✔
1354
        case *spb.FlushRequest_All:
1✔
1355
        case *spb.FlushRequest_Name:
1✔
1356
                if t.Name == "" {
2✔
1357
                        return nil, errors.New("cannot specify an empty network instance name")
1✔
1358
                }
1✔
1359
        }
1360

1361
        if req.GetElection() == nil && c.state.ElectionID != nil {
2✔
1362
                return nil, fmt.Errorf("must specify an election behaviour for a SINGLE_PRIMARY client")
1✔
1363
        }
1✔
1364

1365
        switch req.GetElection().(type) {
1✔
1366
        case *spb.FlushRequest_Id:
1✔
1367
                if c.state.SessParams == nil || c.state.SessParams.Redundancy != spb.SessionParameters_SINGLE_PRIMARY {
2✔
1368
                        return nil, fmt.Errorf("invalid to specify an election ID when the client is not in SINGLE_PRIMARY mode")
1✔
1369
                }
1✔
1370
        case *spb.FlushRequest_Override:
1✔
1371
                if c.state.SessParams == nil || c.state.SessParams.Redundancy != spb.SessionParameters_SINGLE_PRIMARY {
2✔
1372
                        return nil, fmt.Errorf("cannot override election ID when the client is in ALL_PRIMARY mode")
1✔
1373
                }
1✔
1374
        }
1375

1376
        res, err := c.c.Flush(ctx, req)
1✔
1377
        if err != nil {
2✔
1378
                // return err directly here so that the client can receive the type of error.
1✔
1379
                return nil, err
1✔
1380
        }
1✔
1381
        return res, nil
1✔
1382
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc