Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

lightningnetwork / lnd / 11827

11 Oct 2019 - 20:47 coverage increased (+0.02%) to 63.216%
11827

Pull #3595

travis-ci

9181eb84f9c35729a3bad740fb7f9d93?size=18&default=identiconweb-flow
htlcswitch: raise max cltv limit to 2016 blocks

The previous limit of 1008 proved to be low, given that almost 50% of
the network still advertises CLTV deltas of 144 blocks, possibly
resulting in routes with many hops failing.
Pull Request #3595: routing+routerrpc: take max cltv limit into account within path finding

3 of 9 new or added lines in 3 files covered. (33.33%)

16 existing lines in 8 files now uncovered.

45935 of 72663 relevant lines covered (63.22%)

68288.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.53
/invoices/invoiceregistry.go
1
package invoices
2

3
import (
4
        "errors"
5
        "sync"
6
        "sync/atomic"
7

8
        "github.com/davecgh/go-spew/spew"
9
        "github.com/lightningnetwork/lnd/channeldb"
10
        "github.com/lightningnetwork/lnd/lntypes"
11
        "github.com/lightningnetwork/lnd/lnwire"
12
        "github.com/lightningnetwork/lnd/queue"
13
)
14

15
var (
16
        // ErrInvoiceExpiryTooSoon is returned when an invoice is attempted to be
17
        // accepted or settled with not enough blocks remaining.
18
        ErrInvoiceExpiryTooSoon = errors.New("invoice expiry too soon")
19

20
        // ErrInvoiceAmountTooLow is returned  when an invoice is attempted to be
21
        // accepted or settled with an amount that is too low.
22
        ErrInvoiceAmountTooLow = errors.New("paid amount less than invoice amount")
23

24
        // ErrShuttingDown is returned when an operation failed because the
25
        // invoice registry is shutting down.
26
        ErrShuttingDown = errors.New("invoice registry shutting down")
27

28
        // errNoUpdate is returned when no invoice updated is required.
29
        errNoUpdate = errors.New("no update needed")
30
)
31

32
// HodlEvent describes how an htlc should be resolved. If HodlEvent.Preimage is
33
// set, the event indicates a settle event. If Preimage is nil, it is a cancel
34
// event.
35
type HodlEvent struct {
36
        // Preimage is the htlc preimage. Its value is nil in case of a cancel.
37
        Preimage *lntypes.Preimage
38

39
        // CircuitKey is the key of the htlc for which we have a resolution
40
        // decision.
41
        CircuitKey channeldb.CircuitKey
42

43
        // AcceptHeight is the original height at which the htlc was accepted.
44
        AcceptHeight int32
45
}
46

47
// InvoiceRegistry is a central registry of all the outstanding invoices
48
// created by the daemon. The registry is a thin wrapper around a map in order
49
// to ensure that all updates/reads are thread safe.
50
type InvoiceRegistry struct {
51
        sync.RWMutex
52

53
        cdb *channeldb.DB
54

55
        clientMtx                 sync.Mutex
56
        nextClientID              uint32
57
        notificationClients       map[uint32]*InvoiceSubscription
58
        singleNotificationClients map[uint32]*SingleInvoiceSubscription
59

60
        newSubscriptions    chan *InvoiceSubscription
61
        subscriptionCancels chan uint32
62

63
        // invoiceEvents is a single channel over which both invoice updates and
64
        // new single invoice subscriptions are carried.
65
        invoiceEvents chan interface{}
66

67
        // subscriptions is a map from a circuit key to a list of subscribers.
68
        // It is used for efficient notification of links.
69
        hodlSubscriptions map[channeldb.CircuitKey]map[chan<- interface{}]struct{}
70

71
        // reverseSubscriptions tracks circuit keys subscribed to per
72
        // subscriber. This is used to unsubscribe from all hashes efficiently.
73
        hodlReverseSubscriptions map[chan<- interface{}]map[channeldb.CircuitKey]struct{}
74

75
        // finalCltvRejectDelta defines the number of blocks before the expiry
76
        // of the htlc where we no longer settle it as an exit hop and instead
77
        // cancel it back. Normally this value should be lower than the cltv
78
        // expiry of any invoice we create and the code effectuating this should
79
        // not be hit.
80
        finalCltvRejectDelta int32
81

82
        wg   sync.WaitGroup
83
        quit chan struct{}
84
}
85

86
// NewRegistry creates a new invoice registry. The invoice registry
87
// wraps the persistent on-disk invoice storage with an additional in-memory
88
// layer. The in-memory layer is in place such that debug invoices can be added
89
// which are volatile yet available system wide within the daemon.
90
func NewRegistry(cdb *channeldb.DB, finalCltvRejectDelta int32) *InvoiceRegistry {
149×
91

149×
92
        return &InvoiceRegistry{
149×
93
                cdb:                       cdb,
149×
94
                notificationClients:       make(map[uint32]*InvoiceSubscription),
149×
95
                singleNotificationClients: make(map[uint32]*SingleInvoiceSubscription),
149×
96
                newSubscriptions:          make(chan *InvoiceSubscription),
149×
97
                subscriptionCancels:       make(chan uint32),
149×
98
                invoiceEvents:             make(chan interface{}, 100),
149×
99
                hodlSubscriptions:         make(map[channeldb.CircuitKey]map[chan<- interface{}]struct{}),
149×
100
                hodlReverseSubscriptions:  make(map[chan<- interface{}]map[channeldb.CircuitKey]struct{}),
149×
101
                finalCltvRejectDelta:      finalCltvRejectDelta,
149×
102
                quit:                      make(chan struct{}),
149×
103
        }
149×
104
}
149×
105

106
// Start starts the registry and all goroutines it needs to carry out its task.
107
func (i *InvoiceRegistry) Start() error {
149×
108
        i.wg.Add(1)
149×
109

149×
110
        go i.invoiceEventNotifier()
149×
111

149×
112
        return nil
149×
113
}
149×
114

115
// Stop signals the registry for a graceful shutdown.
116
func (i *InvoiceRegistry) Stop() {
5×
117
        close(i.quit)
5×
118

5×
119
        i.wg.Wait()
5×
120
}
5×
121

122
// invoiceEvent represents a new event that has modified on invoice on disk.
123
// Only two event types are currently supported: newly created invoices, and
124
// instance where invoices are settled.
125
type invoiceEvent struct {
126
        hash    lntypes.Hash
127
        invoice *channeldb.Invoice
128
}
129

130
// invoiceEventNotifier is the dedicated goroutine responsible for accepting
131
// new notification subscriptions, cancelling old subscriptions, and
132
// dispatching new invoice events.
133
func (i *InvoiceRegistry) invoiceEventNotifier() {
149×
134
        defer i.wg.Done()
149×
135

149×
136
        for {
4,293×
137
                select {
4,144×
138
                // A new invoice subscription for all invoices has just arrived!
139
                // We'll query for any backlog notifications, then add it to the
140
                // set of clients.
141
                case newClient := <-i.newSubscriptions:
3×
142
                        // Before we add the client to our set of active
3×
143
                        // clients, we'll first attempt to deliver any backlog
3×
144
                        // invoice events.
3×
145
                        err := i.deliverBacklogEvents(newClient)
3×
146
                        if err != nil {
3×
147
                                log.Errorf("unable to deliver backlog invoice "+
!
148
                                        "notifications: %v", err)
!
149
                        }
!
150

151
                        log.Infof("New invoice subscription "+
3×
152
                                "client: id=%v", newClient.id)
3×
153

3×
154
                        // With the backlog notifications delivered (if any),
3×
155
                        // we'll add this to our active subscriptions and
3×
156
                        // continue.
3×
157
                        i.notificationClients[newClient.id] = newClient
3×
158

159
                // A client no longer wishes to receive invoice notifications.
160
                // So we'll remove them from the set of active clients.
161
                case clientID := <-i.subscriptionCancels:
6×
162
                        log.Infof("Cancelling invoice subscription for "+
6×
163
                                "client=%v", clientID)
6×
164

6×
165
                        delete(i.notificationClients, clientID)
6×
166
                        delete(i.singleNotificationClients, clientID)
6×
167

168
                // An invoice event has come in. This can either be an update to
169
                // an invoice or a new single invoice subscriber. Both type of
170
                // events are passed in via the same channel, to make sure that
171
                // subscribers get a consistent view of the event sequence.
172
                case event := <-i.invoiceEvents:
3,986×
173
                        switch e := event.(type) {
3,986×
174

175
                        // A sub-systems has just modified the invoice state, so
176
                        // we'll dispatch notifications to all registered
177
                        // clients.
178
                        case *invoiceEvent:
3,983×
179
                                // For backwards compatibility, do not notify
3,983×
180
                                // all invoice subscribers of cancel and accept
3,983×
181
                                // events.
3,983×
182
                                state := e.invoice.Terms.State
3,983×
183
                                if state != channeldb.ContractCanceled &&
3,983×
184
                                        state != channeldb.ContractAccepted {
7,955×
185

3,972×
186
                                        i.dispatchToClients(e)
3,972×
187
                                }
3,972×
188
                                i.dispatchToSingleClients(e)
3,983×
189

190
                        // A new single invoice subscription has arrived. Add it
191
                        // to the set of clients. It is important to do this in
192
                        // sequence with any other invoice events, because an
193
                        // initial invoice update has already been sent out to
194
                        // the subscriber.
195
                        case *SingleInvoiceSubscription:
3×
196
                                log.Infof("New single invoice subscription "+
3×
197
                                        "client: id=%v, hash=%v", e.id, e.hash)
3×
198

3×
199
                                i.singleNotificationClients[e.id] = e
3×
200
                        }
201

202
                case <-i.quit:
5×
203
                        return
5×
204
                }
205
        }
206
}
207

208
// dispatchToSingleClients passes the supplied event to all notification clients
209
// that subscribed to all the invoice this event applies to.
210
func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) {
3,983×
211
        // Dispatch to single invoice subscribers.
3,983×
212
        for _, client := range i.singleNotificationClients {
3,990×
213
                if client.hash != event.hash {
7×
214
                        continue
!
215
                }
216

217
                client.notify(event)
7×
218
        }
219
}
220

221
// dispatchToClients passes the supplied event to all notification clients that
222
// subscribed to all invoices. Add and settle indices are used to make sure that
223
// clients don't receive duplicate or unwanted events.
224
func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
3,972×
225
        invoice := event.invoice
3,972×
226

3,972×
227
        for clientID, client := range i.notificationClients {
3,977×
228
                // Before we dispatch this event, we'll check
5×
229
                // to ensure that this client hasn't already
5×
230
                // received this notification in order to
5×
231
                // ensure we don't duplicate any events.
5×
232

5×
233
                // TODO(joostjager): Refactor switches.
5×
234
                state := event.invoice.Terms.State
5×
235
                switch {
5×
236
                // If we've already sent this settle event to
237
                // the client, then we can skip this.
238
                case state == channeldb.ContractSettled &&
239
                        client.settleIndex >= invoice.SettleIndex:
!
240
                        continue
!
241

242
                // Similarly, if we've already sent this add to
243
                // the client then we can skip this one.
244
                case state == channeldb.ContractOpen &&
245
                        client.addIndex >= invoice.AddIndex:
!
246
                        continue
!
247

248
                // These two states should never happen, but we
249
                // log them just in case so we can detect this
250
                // instance.
251
                case state == channeldb.ContractOpen &&
252
                        client.addIndex+1 != invoice.AddIndex:
!
253
                        log.Warnf("client=%v for invoice "+
!
254
                                "notifications missed an update, "+
!
255
                                "add_index=%v, new add event index=%v",
!
256
                                clientID, client.addIndex,
!
257
                                invoice.AddIndex)
!
258

259
                case state == channeldb.ContractSettled &&
260
                        client.settleIndex+1 != invoice.SettleIndex:
!
261
                        log.Warnf("client=%v for invoice "+
!
262
                                "notifications missed an update, "+
!
263
                                "settle_index=%v, new settle event index=%v",
!
264
                                clientID, client.settleIndex,
!
265
                                invoice.SettleIndex)
!
266
                }
267

268
                select {
5×
269
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
270
                        invoice: invoice,
271
                }:
5×
272
                case <-i.quit:
!
273
                        return
!
274
                }
275

276
                // Each time we send a notification to a client, we'll record
277
                // the latest add/settle index it has. We'll use this to ensure
278
                // we don't send a notification twice, which can happen if a new
279
                // event is added while we're catching up a new client.
280
                switch event.invoice.Terms.State {
5×
281
                case channeldb.ContractSettled:
2×
282
                        client.settleIndex = invoice.SettleIndex
2×
283
                case channeldb.ContractOpen:
3×
284
                        client.addIndex = invoice.AddIndex
3×
285
                default:
!
286
                        log.Errorf("unexpected invoice state: %v",
!
287
                                event.invoice.Terms.State)
!
288
                }
289
        }
290
}
291

292
// deliverBacklogEvents will attempts to query the invoice database for any
293
// notifications that the client has missed since it reconnected last.
294
func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) error {
3×
295
        // First, we'll query the database to see if based on the provided
3×
296
        // addIndex and settledIndex we need to deliver any backlog
3×
297
        // notifications.
3×
298
        addEvents, err := i.cdb.InvoicesAddedSince(client.addIndex)
3×
299
        if err != nil {
3×
300
                return err
!
301
        }
!
302

303
        settleEvents, err := i.cdb.InvoicesSettledSince(client.settleIndex)
3×
304
        if err != nil {
3×
305
                return err
!
306
        }
!
307

308
        // If we have any to deliver, then we'll append them to the end of the
309
        // notification queue in order to catch up the client before delivering
310
        // any new notifications.
311
        for _, addEvent := range addEvents {
3×
312
                // We re-bind the loop variable to ensure we don't hold onto
!
313
                // the loop reference causing is to point to the same item.
!
314
                addEvent := addEvent
!
315

!
316
                select {
!
317
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
318
                        invoice: &addEvent,
319
                }:
!
320
                case <-i.quit:
!
321
                        return ErrShuttingDown
!
322
                }
323
        }
324

325
        for _, settleEvent := range settleEvents {
3×
326
                // We re-bind the loop variable to ensure we don't hold onto
!
327
                // the loop reference causing is to point to the same item.
!
328
                settleEvent := settleEvent
!
329

!
330
                select {
!
331
                case client.ntfnQueue.ChanIn() <- &invoiceEvent{
332
                        invoice: &settleEvent,
333
                }:
!
334
                case <-i.quit:
!
335
                        return ErrShuttingDown
!
336
                }
337
        }
338

339
        return nil
3×
340
}
341

342
// deliverSingleBacklogEvents will attempt to query the invoice database to
343
// retrieve the current invoice state and deliver this to the subscriber. Single
344
// invoice subscribers will always receive the current state right after
345
// subscribing. Only in case the invoice does not yet exist, nothing is sent
346
// yet.
347
func (i *InvoiceRegistry) deliverSingleBacklogEvents(
348
        client *SingleInvoiceSubscription) error {
3×
349

3×
350
        invoice, err := i.cdb.LookupInvoice(client.hash)
3×
351

3×
352
        // It is possible that the invoice does not exist yet, but the client is
3×
353
        // already watching it in anticipation.
3×
354
        if err == channeldb.ErrInvoiceNotFound ||
3×
355
                err == channeldb.ErrNoInvoicesCreated {
6×
356

3×
357
                return nil
3×
358
        }
3×
359
        if err != nil {
!
360
                return err
!
361
        }
!
362

363
        err = client.notify(&invoiceEvent{
!
364
                hash:    client.hash,
!
365
                invoice: &invoice,
!
366
        })
!
367
        if err != nil {
!
368
                return err
!
369
        }
!
370

371
        return nil
!
372
}
373

374
// AddInvoice adds a regular invoice for the specified amount, identified by
375
// the passed preimage. Additionally, any memo or receipt data provided will
376
// also be stored on-disk. Once this invoice is added, subsystems within the
377
// daemon add/forward HTLCs are able to obtain the proper preimage required for
378
// redemption in the case that we're the final destination. We also return the
379
// addIndex of the newly created invoice which monotonically increases for each
380
// new invoice added.  A side effect of this function is that it also sets
381
// AddIndex on the invoice argument.
382
func (i *InvoiceRegistry) AddInvoice(invoice *channeldb.Invoice,
383
        paymentHash lntypes.Hash) (uint64, error) {
1,999×
384

1,999×
385
        i.Lock()
1,999×
386
        defer i.Unlock()
1,999×
387

1,999×
388
        log.Debugf("Invoice(%v): added %v", paymentHash,
1,999×
389
                newLogClosure(func() string {
1,999×
390
                        return spew.Sdump(invoice)
!
391
                }),
!
392
        )
393

394
        addIndex, err := i.cdb.AddInvoice(invoice, paymentHash)
1,999×
395
        if err != nil {
1,999×
396
                return 0, err
!
397
        }
!
398

399
        // Now that we've added the invoice, we'll send dispatch a message to
400
        // notify the clients of this new invoice.
401
        i.notifyClients(paymentHash, invoice, channeldb.ContractOpen)
1,999×
402

1,999×
403
        return addIndex, nil
1,999×
404
}
405

406
// LookupInvoice looks up an invoice by its payment hash (R-Hash), if found
407
// then we're able to pull the funds pending within an HTLC.
408
//
409
// TODO(roasbeef): ignore if settled?
410
func (i *InvoiceRegistry) LookupInvoice(rHash lntypes.Hash) (channeldb.Invoice,
411
        error) {
12×
412

12×
413
        // We'll check the database to see if there's an existing matching
12×
414
        // invoice.
12×
415
        return i.cdb.LookupInvoice(rHash)
12×
416
}
12×
417

418
// NotifyExitHopHtlc attempts to mark an invoice as settled. If the invoice is a
419
// debug invoice, then this method is a noop as debug invoices are never fully
420
// settled. The return value describes how the htlc should be resolved.
421
//
422
// When the preimage of the invoice is not yet known (hodl invoice), this
423
// function moves the invoice to the accepted state. When SettleHoldInvoice is
424
// called later, a resolution message will be send back to the caller via the
425
// provided hodlChan. Invoice registry sends on this channel what action needs
426
// to be taken on the htlc (settle or cancel). The caller needs to ensure that
427
// the channel is either buffered or received on from another goroutine to
428
// prevent deadlock.
429
func (i *InvoiceRegistry) NotifyExitHopHtlc(rHash lntypes.Hash,
430
        amtPaid lnwire.MilliSatoshi, expiry uint32, currentHeight int32,
431
        circuitKey channeldb.CircuitKey, hodlChan chan<- interface{},
432
        eob []byte) (*HodlEvent, error) {
1,989×
433

1,989×
434
        i.Lock()
1,989×
435
        defer i.Unlock()
1,989×
436

1,989×
437
        debugLog := func(s string) {
3,978×
438
                log.Debugf("Invoice(%x): %v, amt=%v, expiry=%v, circuit=%v",
1,989×
439
                        rHash[:], s, amtPaid, expiry, circuitKey)
1,989×
440
        }
1,989×
441

442
        // Default is to not update subscribers after the invoice update.
443
        updateSubscribers := false
1,989×
444

1,989×
445
        updateInvoice := func(inv *channeldb.Invoice) (
1,989×
446
                *channeldb.InvoiceUpdateDesc, error) {
3,976×
447

1,987×
448
                // Don't update the invoice when this is a replayed htlc.
1,987×
449
                htlc, ok := inv.Htlcs[circuitKey]
1,987×
450
                if ok {
1,992×
451
                        switch htlc.State {
5×
452
                        case channeldb.HtlcStateCanceled:
1×
453
                                debugLog("replayed htlc to canceled invoice")
1×
454

455
                        case channeldb.HtlcStateAccepted:
3×
456
                                debugLog("replayed htlc to accepted invoice")
3×
457

458
                        case channeldb.HtlcStateSettled:
1×
459
                                debugLog("replayed htlc to settled invoice")
1×
460

461
                        default:
!
462
                                return nil, errors.New("unexpected htlc state")
!
463
                        }
464

465
                        return nil, errNoUpdate
5×
466
                }
467

468
                // If the invoice is already canceled, there is no further
469
                // checking to do.
470
                if inv.Terms.State == channeldb.ContractCanceled {
1,984×
471
                        debugLog("invoice already canceled")
2×
472
                        return nil, errNoUpdate
2×
473
                }
2×
474

475
                // If an invoice amount is specified, check that enough
476
                // is paid. Also check this for duplicate payments if
477
                // the invoice is already settled or accepted.
478
                if inv.Terms.Value > 0 && amtPaid < inv.Terms.Value {
1,981×
479
                        debugLog("amount too low")
1×
480
                        return nil, errNoUpdate
1×
481
                }
1×
482

483
                // The invoice is still open. Check the expiry.
484
                if expiry < uint32(currentHeight+i.finalCltvRejectDelta) {
1,981×
485
                        debugLog("expiry too soon")
2×
486
                        return nil, errNoUpdate
2×
487
                }
2×
488

489
                if expiry < uint32(currentHeight+inv.FinalCltvDelta) {
1,978×
490
                        debugLog("expiry too soon")
1×
491
                        return nil, errNoUpdate
1×
492
                }
1×
493

494
                // Record HTLC in the invoice database.
495
                newHtlcs := map[channeldb.CircuitKey]*channeldb.HtlcAcceptDesc{
1,976×
496
                        circuitKey: {
1,976×
497
                                Amt:          amtPaid,
1,976×
498
                                Expiry:       expiry,
1,976×
499
                                AcceptHeight: currentHeight,
1,976×
500
                        },
1,976×
501
                }
1,976×
502

1,976×
503
                update := channeldb.InvoiceUpdateDesc{
1,976×
504
                        Htlcs: newHtlcs,
1,976×
505
                }
1,976×
506

1,976×
507
                // Don't update invoice state if we are accepting a duplicate
1,976×
508
                // payment. We do accept or settle the HTLC.
1,976×
509
                switch inv.Terms.State {
1,976×
510
                case channeldb.ContractAccepted:
!
511
                        debugLog("accepting duplicate payment to accepted invoice")
!
512
                        update.State = channeldb.ContractAccepted
!
513
                        return &update, nil
!
514

515
                case channeldb.ContractSettled:
1×
516
                        debugLog("accepting duplicate payment to settled invoice")
1×
517
                        update.State = channeldb.ContractSettled
1×
518
                        return &update, nil
1×
519
                }
520

521
                // Check to see if we can settle or this is an hold invoice and
522
                // we need to wait for the preimage.
523
                holdInvoice := inv.Terms.PaymentPreimage == channeldb.UnknownPreimage
1,975×
524
                if holdInvoice {
1,982×
525
                        debugLog("accepted")
7×
526
                        update.State = channeldb.ContractAccepted
7×
527
                } else {
1,975×
528
                        debugLog("settled")
1,968×
529
                        update.Preimage = inv.Terms.PaymentPreimage
1,968×
530
                        update.State = channeldb.ContractSettled
1,968×
531
                }
1,968×
532

533
                updateSubscribers = true
1,975×
534

1,975×
535
                return &update, nil
1,975×
536
        }
537

538
        // We'll attempt to settle an invoice matching this rHash on disk (if
539
        // one exists). The callback will set the resolution action that is
540
        // returned to the link or contract resolver.
541
        invoice, err := i.cdb.UpdateInvoice(rHash, updateInvoice)
1,989×
542
        if err != nil && err != errNoUpdate {
1,991×
543
                debugLog(err.Error())
2×
544

2×
545
                return nil, err
2×
546
        }
2×
547

548
        if updateSubscribers {
3,962×
549
                i.notifyClients(rHash, invoice, invoice.Terms.State)
1,975×
550
        }
1,975×
551

552
        // Inspect latest htlc state on the invoice.
553
        invoiceHtlc, ok := invoice.Htlcs[circuitKey]
1,987×
554

1,987×
555
        // If it isn't recorded, cancel htlc.
1,987×
556
        if !ok {
1,993×
557
                return &HodlEvent{
6×
558
                        CircuitKey:   circuitKey,
6×
559
                        AcceptHeight: currentHeight,
6×
560
                }, nil
6×
561
        }
6×
562

563
        // Determine accepted height of this htlc. If the htlc reached the
564
        // invoice database (possibly in a previous call to the invoice
565
        // registry), we'll take the original accepted height as it was recorded
566
        // in the database.
567
        acceptHeight := int32(invoiceHtlc.AcceptHeight)
1,981×
568

1,981×
569
        switch invoiceHtlc.State {
1,981×
570
        case channeldb.HtlcStateCanceled:
1×
571
                return &HodlEvent{
1×
572
                        CircuitKey:   circuitKey,
1×
573
                        AcceptHeight: acceptHeight,
1×
574
                }, nil
1×
575

576
        case channeldb.HtlcStateSettled:
1,970×
577
                return &HodlEvent{
1,970×
578
                        CircuitKey:   circuitKey,
1,970×
579
                        Preimage:     &invoice.Terms.PaymentPreimage,
1,970×
580
                        AcceptHeight: acceptHeight,
1,970×
581
                }, nil
1,970×
582

583
        case channeldb.HtlcStateAccepted:
10×
584
                i.hodlSubscribe(hodlChan, circuitKey)
10×
585
                return nil, nil
10×
586

587
        default:
!
588
                panic("unknown action")
!
589
        }
590
}
591

592
// SettleHodlInvoice sets the preimage of a hodl invoice.
593
func (i *InvoiceRegistry) SettleHodlInvoice(preimage lntypes.Preimage) error {
6×
594
        i.Lock()
6×
595
        defer i.Unlock()
6×
596

6×
597
        updateInvoice := func(invoice *channeldb.Invoice) (
6×
598
                *channeldb.InvoiceUpdateDesc, error) {
12×
599

6×
600
                switch invoice.Terms.State {
6×
601
                case channeldb.ContractOpen:
!
602
                        return nil, channeldb.ErrInvoiceStillOpen
!
603
                case channeldb.ContractCanceled:
!
604
                        return nil, channeldb.ErrInvoiceAlreadyCanceled
!
605
                case channeldb.ContractSettled:
1×
606
                        return nil, channeldb.ErrInvoiceAlreadySettled
1×
607
                }
608

609
                return &channeldb.InvoiceUpdateDesc{
5×
610
                        State:    channeldb.ContractSettled,
5×
611
                        Preimage: preimage,
5×
612
                }, nil
5×
613
        }
614

615
        hash := preimage.Hash()
6×
616
        invoice, err := i.cdb.UpdateInvoice(hash, updateInvoice)
6×
617
        if err != nil {
7×
618
                log.Errorf("SettleHodlInvoice with preimage %v: %v", preimage, err)
1×
619
                return err
1×
620
        }
1×
621

622
        log.Debugf("Invoice(%v): settled with preimage %v", hash,
5×
623
                invoice.Terms.PaymentPreimage)
5×
624

5×
625
        // In the callback, we marked the invoice as settled. UpdateInvoice will
5×
626
        // have seen this and should have moved all htlcs that were accepted to
5×
627
        // the settled state. In the loop below, we go through all of these and
5×
628
        // notify links and resolvers that are waiting for resolution. Any htlcs
5×
629
        // that were already settled before, will be notified again. This isn't
5×
630
        // necessary but doesn't hurt either.
5×
631
        for key, htlc := range invoice.Htlcs {
10×
632
                if htlc.State != channeldb.HtlcStateSettled {
5×
633
                        continue
!
634
                }
635

636
                i.notifyHodlSubscribers(HodlEvent{
5×
637
                        CircuitKey:   key,
5×
638
                        Preimage:     &preimage,
5×
639
                        AcceptHeight: int32(htlc.AcceptHeight),
5×
640
                })
5×
641
        }
642
        i.notifyClients(hash, invoice, invoice.Terms.State)
5×
643

5×
644
        return nil
5×
645
}
646

647
// CancelInvoice attempts to cancel the invoice corresponding to the passed
648
// payment hash.
649
func (i *InvoiceRegistry) CancelInvoice(payHash lntypes.Hash) error {
8×
650
        i.Lock()
8×
651
        defer i.Unlock()
8×
652

8×
653
        log.Debugf("Invoice(%v): canceling invoice", payHash)
8×
654

8×
655
        updateInvoice := func(invoice *channeldb.Invoice) (
8×
656
                *channeldb.InvoiceUpdateDesc, error) {
15×
657

7×
658
                switch invoice.Terms.State {
7×
659
                case channeldb.ContractSettled:
2×
660
                        return nil, channeldb.ErrInvoiceAlreadySettled
2×
661
                case channeldb.ContractCanceled:
1×
662
                        return nil, channeldb.ErrInvoiceAlreadyCanceled
1×
663
                }
664

665
                // Mark individual held htlcs as canceled.
666
                canceledHtlcs := make(
4×
667
                        map[channeldb.CircuitKey]*channeldb.HtlcAcceptDesc,
4×
668
                )
4×
669
                for key, htlc := range invoice.Htlcs {
6×
670
                        switch htlc.State {
2×
671

672
                        // If we get here, there shouldn't be any settled htlcs.
673
                        case channeldb.HtlcStateSettled:
!
674
                                return nil, errors.New("cannot cancel " +
!
675
                                        "invoice with settled htlc(s)")
!
676

677
                        // Don't cancel htlcs that were already canceled,
678
                        // because it would incorrectly modify the invoice paid
679
                        // amt.
680
                        case channeldb.HtlcStateCanceled:
!
681
                                continue
!
682
                        }
683

684
                        canceledHtlcs[key] = nil
2×
685
                }
686

687
                // Move invoice to the canceled state.
688
                return &channeldb.InvoiceUpdateDesc{
4×
689
                        Htlcs: canceledHtlcs,
4×
690
                        State: channeldb.ContractCanceled,
4×
691
                }, nil
4×
692
        }
693

694
        invoice, err := i.cdb.UpdateInvoice(payHash, updateInvoice)
8×
695

8×
696
        // Implement idempotency by returning success if the invoice was already
8×
697
        // canceled.
8×
698
        if err == channeldb.ErrInvoiceAlreadyCanceled {
9×
699
                log.Debugf("Invoice(%v): already canceled", payHash)
1×
700
                return nil
1×
701
        }
1×
702
        if err != nil {
10×
703
                return err
3×
704
        }
3×
705

706
        log.Debugf("Invoice(%v): canceled", payHash)
4×
707

4×
708
        // In the callback, some htlcs may have been moved to the canceled
4×
709
        // state. We now go through all of these and notify links and resolvers
4×
710
        // that are waiting for resolution. Any htlcs that were already canceled
4×
711
        // before, will be notified again. This isn't necessary but doesn't hurt
4×
712
        // either.
4×
713
        for key, htlc := range invoice.Htlcs {
6×
714
                if htlc.State != channeldb.HtlcStateCanceled {
2×
715
                        continue
!
716
                }
717

718
                i.notifyHodlSubscribers(HodlEvent{
2×
719
                        CircuitKey:   key,
2×
720
                        AcceptHeight: int32(htlc.AcceptHeight),
2×
721
                })
2×
722
        }
723
        i.notifyClients(payHash, invoice, channeldb.ContractCanceled)
4×
724

4×
725
        return nil
4×
726
}
727

728
// notifyClients notifies all currently registered invoice notification clients
729
// of a newly added/settled invoice.
730
func (i *InvoiceRegistry) notifyClients(hash lntypes.Hash,
731
        invoice *channeldb.Invoice,
732
        state channeldb.ContractState) {
3,983×
733

3,983×
734
        event := &invoiceEvent{
3,983×
735
                invoice: invoice,
3,983×
736
                hash:    hash,
3,983×
737
        }
3,983×
738

3,983×
739
        select {
3,983×
740
        case i.invoiceEvents <- event:
3,983×
741
        case <-i.quit:
!
742
        }
743
}
744

745
// invoiceSubscriptionKit defines that are common to both all invoice
746
// subscribers and single invoice subscribers.
747
type invoiceSubscriptionKit struct {
748
        id        uint32
749
        inv       *InvoiceRegistry
750
        ntfnQueue *queue.ConcurrentQueue
751

752
        canceled   uint32 // To be used atomically.
753
        cancelChan chan struct{}
754
        wg         sync.WaitGroup
755
}
756

757
// InvoiceSubscription represents an intent to receive updates for newly added
758
// or settled invoices. For each newly added invoice, a copy of the invoice
759
// will be sent over the NewInvoices channel. Similarly, for each newly settled
760
// invoice, a copy of the invoice will be sent over the SettledInvoices
761
// channel.
762
type InvoiceSubscription struct {
763
        invoiceSubscriptionKit
764

765
        // NewInvoices is a channel that we'll use to send all newly created
766
        // invoices with an invoice index greater than the specified
767
        // StartingInvoiceIndex field.
768
        NewInvoices chan *channeldb.Invoice
769

770
        // SettledInvoices is a channel that we'll use to send all setted
771
        // invoices with an invoices index greater than the specified
772
        // StartingInvoiceIndex field.
773
        SettledInvoices chan *channeldb.Invoice
774

775
        // addIndex is the highest add index the caller knows of. We'll use
776
        // this information to send out an event backlog to the notifications
777
        // subscriber. Any new add events with an index greater than this will
778
        // be dispatched before any new notifications are sent out.
779
        addIndex uint64
780

781
        // settleIndex is the highest settle index the caller knows of. We'll
782
        // use this information to send out an event backlog to the
783
        // notifications subscriber. Any new settle events with an index
784
        // greater than this will be dispatched before any new notifications
785
        // are sent out.
786
        settleIndex uint64
787
}
788

789
// SingleInvoiceSubscription represents an intent to receive updates for a
790
// specific invoice.
791
type SingleInvoiceSubscription struct {
792
        invoiceSubscriptionKit
793

794
        hash lntypes.Hash
795

796
        // Updates is a channel that we'll use to send all invoice events for
797
        // the invoice that is subscribed to.
798
        Updates chan *channeldb.Invoice
799
}
800

801
// Cancel unregisters the InvoiceSubscription, freeing any previously allocated
802
// resources.
803
func (i *invoiceSubscriptionKit) Cancel() {
6×
804
        if !atomic.CompareAndSwapUint32(&i.canceled, 0, 1) {
6×
805
                return
!
806
        }
!
807

808
        select {
6×
809
        case i.inv.subscriptionCancels <- i.id:
6×
810
        case <-i.inv.quit:
!
811
        }
812

813
        i.ntfnQueue.Stop()
6×
814
        close(i.cancelChan)
6×
815

6×
816
        i.wg.Wait()
6×
817
}
818

819
func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error {
7×
820
        select {
7×
821
        case i.ntfnQueue.ChanIn() <- event:
7×
822
        case <-i.inv.quit:
!
823
                return ErrShuttingDown
!
824
        }
825

826
        return nil
7×
827
}
828

829
// SubscribeNotifications returns an InvoiceSubscription which allows the
830
// caller to receive async notifications when any invoices are settled or
831
// added. The invoiceIndex parameter is a streaming "checkpoint". We'll start
832
// by first sending out all new events with an invoice index _greater_ than
833
// this value. Afterwards, we'll send out real-time notifications.
834
func (i *InvoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *InvoiceSubscription {
3×
835
        client := &InvoiceSubscription{
3×
836
                NewInvoices:     make(chan *channeldb.Invoice),
3×
837
                SettledInvoices: make(chan *channeldb.Invoice),
3×
838
                addIndex:        addIndex,
3×
839
                settleIndex:     settleIndex,
3×
840
                invoiceSubscriptionKit: invoiceSubscriptionKit{
3×
841
                        inv:        i,
3×
842
                        ntfnQueue:  queue.NewConcurrentQueue(20),
3×
843
                        cancelChan: make(chan struct{}),
3×
844
                },
3×
845
        }
3×
846
        client.ntfnQueue.Start()
3×
847

3×
848
        i.clientMtx.Lock()
3×
849
        client.id = i.nextClientID
3×
850
        i.nextClientID++
3×
851
        i.clientMtx.Unlock()
3×
852

3×
853
        // Before we register this new invoice subscription, we'll launch a new
3×
854
        // goroutine that will proxy all notifications appended to the end of
3×
855
        // the concurrent queue to the two client-side channels the caller will
3×
856
        // feed off of.
3×
857
        i.wg.Add(1)
3×
858
        go func() {
6×
859
                defer i.wg.Done()
3×
860

3×
861
                for {
11×
862
                        select {
8×
863
                        // A new invoice event has been sent by the
864
                        // invoiceRegistry! We'll figure out if this is an add
865
                        // event or a settle event, then dispatch the event to
866
                        // the client.
867
                        case ntfn := <-client.ntfnQueue.ChanOut():
5×
868
                                invoiceEvent := ntfn.(*invoiceEvent)
5×
869

5×
870
                                var targetChan chan *channeldb.Invoice
5×
871
                                state := invoiceEvent.invoice.Terms.State
5×
872
                                switch state {
5×
873
                                case channeldb.ContractOpen:
3×
874
                                        targetChan = client.NewInvoices
3×
875
                                case channeldb.ContractSettled:
2×
876
                                        targetChan = client.SettledInvoices
2×
877
                                default:
!
878
                                        log.Errorf("unknown invoice "+
!
879
                                                "state: %v", state)
!
880

!
881
                                        continue
!
882
                                }
883

884
                                select {
5×
885
                                case targetChan <- invoiceEvent.invoice:
5×
886

887
                                case <-client.cancelChan:
!
888
                                        return
!
889

890
                                case <-i.quit:
!
891
                                        return
!
892
                                }
893

894
                        case <-client.cancelChan:
1×
895
                                return
1×
896

897
                        case <-i.quit:
2×
898
                                return
2×
899
                        }
900
                }
901
        }()
902

903
        select {
3×
904
        case i.newSubscriptions <- client:
3×
905
        case <-i.quit:
!
906
        }
907

908
        return client
3×
909
}
910

911
// SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the
912
// caller to receive async notifications for a specific invoice.
913
func (i *InvoiceRegistry) SubscribeSingleInvoice(
914
        hash lntypes.Hash) (*SingleInvoiceSubscription, error) {
3×
915

3×
916
        client := &SingleInvoiceSubscription{
3×
917
                Updates: make(chan *channeldb.Invoice),
3×
918
                invoiceSubscriptionKit: invoiceSubscriptionKit{
3×
919
                        inv:        i,
3×
920
                        ntfnQueue:  queue.NewConcurrentQueue(20),
3×
921
                        cancelChan: make(chan struct{}),
3×
922
                },
3×
923
                hash: hash,
3×
924
        }
3×
925
        client.ntfnQueue.Start()
3×
926

3×
927
        i.clientMtx.Lock()
3×
928
        client.id = i.nextClientID
3×
929
        i.nextClientID++
3×
930
        i.clientMtx.Unlock()
3×
931

3×
932
        // Before we register this new invoice subscription, we'll launch a new
3×
933
        // goroutine that will proxy all notifications appended to the end of
3×
934
        // the concurrent queue to the two client-side channels the caller will
3×
935
        // feed off of.
3×
936
        i.wg.Add(1)
3×
937
        go func() {
6×
938
                defer i.wg.Done()
3×
939

3×
940
                for {
13×
941
                        select {
10×
942
                        // A new invoice event has been sent by the
943
                        // invoiceRegistry. We will dispatch the event to the
944
                        // client.
945
                        case ntfn := <-client.ntfnQueue.ChanOut():
7×
946
                                invoiceEvent := ntfn.(*invoiceEvent)
7×
947

7×
948
                                select {
7×
949
                                case client.Updates <- invoiceEvent.invoice:
7×
950

951
                                case <-client.cancelChan:
!
952
                                        return
!
953

954
                                case <-i.quit:
!
955
                                        return
!
956
                                }
957

958
                        case <-client.cancelChan:
3×
959
                                return
3×
960

UNCOV
961
                        case <-i.quit:
!
UNCOV
962
                                return
!
963
                        }
964
                }
965
        }()
966

967
        // Within the lock, we both query the invoice state and pass the client
968
        // subscription to the invoiceEvents channel. This is to make sure that
969
        // the client receives a consistent stream of events.
970
        i.Lock()
3×
971
        defer i.Unlock()
3×
972

3×
973
        err := i.deliverSingleBacklogEvents(client)
3×
974
        if err != nil {
3×
975
                return nil, err
!
976
        }
!
977

978
        select {
3×
979
        case i.invoiceEvents <- client:
3×
980
        case <-i.quit:
!
981
                return nil, ErrShuttingDown
!
982
        }
983

984
        return client, nil
3×
985
}
986

987
// notifyHodlSubscribers sends out the hodl event to all current subscribers.
988
func (i *InvoiceRegistry) notifyHodlSubscribers(hodlEvent HodlEvent) {
7×
989
        subscribers, ok := i.hodlSubscriptions[hodlEvent.CircuitKey]
7×
990
        if !ok {
7×
991
                return
!
992
        }
!
993

994
        // Notify all interested subscribers and remove subscription from both
995
        // maps. The subscription can be removed as there only ever will be a
996
        // single resolution for each hash.
997
        for subscriber := range subscribers {
14×
998
                select {
7×
999
                case subscriber <- hodlEvent:
7×
1000
                case <-i.quit:
!
1001
                        return
!
1002
                }
1003

1004
                delete(
7×
1005
                        i.hodlReverseSubscriptions[subscriber],
7×
1006
                        hodlEvent.CircuitKey,
7×
1007
                )
7×
1008
        }
1009

1010
        delete(i.hodlSubscriptions, hodlEvent.CircuitKey)
7×
1011
}
1012

1013
// hodlSubscribe adds a new invoice subscription.
1014
func (i *InvoiceRegistry) hodlSubscribe(subscriber chan<- interface{},
1015
        circuitKey channeldb.CircuitKey) {
10×
1016

10×
1017
        log.Debugf("Hodl subscribe for %v", circuitKey)
10×
1018

10×
1019
        subscriptions, ok := i.hodlSubscriptions[circuitKey]
10×
1020
        if !ok {
17×
1021
                subscriptions = make(map[chan<- interface{}]struct{})
7×
1022
                i.hodlSubscriptions[circuitKey] = subscriptions
7×
1023
        }
7×
1024
        subscriptions[subscriber] = struct{}{}
10×
1025

10×
1026
        reverseSubscriptions, ok := i.hodlReverseSubscriptions[subscriber]
10×
1027
        if !ok {
17×
1028
                reverseSubscriptions = make(map[channeldb.CircuitKey]struct{})
7×
1029
                i.hodlReverseSubscriptions[subscriber] = reverseSubscriptions
7×
1030
        }
7×
1031
        reverseSubscriptions[circuitKey] = struct{}{}
10×
1032
}
1033

1034
// HodlUnsubscribeAll cancels the subscription.
1035
func (i *InvoiceRegistry) HodlUnsubscribeAll(subscriber chan<- interface{}) {
142×
1036
        i.Lock()
142×
1037
        defer i.Unlock()
142×
1038

142×
1039
        hashes := i.hodlReverseSubscriptions[subscriber]
142×
1040
        for hash := range hashes {
143×
1041
                delete(i.hodlSubscriptions[hash], subscriber)
1×
1042
        }
1×
1043

1044
        delete(i.hodlReverseSubscriptions, subscriber)
142×
1045
}
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2019 Coveralls, LLC