• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.go / 14218118055

02 Apr 2025 11:07AM UTC coverage: 84.545% (-0.02%) from 84.561%
14218118055

push

github

piotrpio
Revert "[ADDED] DefaultTimeout option for JetStream API requests"

This reverts commit 37db22677.

3 of 3 new or added lines in 1 file covered. (100.0%)

10 existing lines in 4 files now uncovered.

13397 of 15846 relevant lines covered (84.54%)

33368.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.49
/jetstream/pull.go
1
// Copyright 2022-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package jetstream
15

16
import (
17
        "encoding/json"
18
        "errors"
19
        "fmt"
20
        "math"
21
        "sync"
22
        "sync/atomic"
23
        "time"
24

25
        "github.com/nats-io/nats.go"
26
        "github.com/nats-io/nats.go/internal/syncx"
27
        "github.com/nats-io/nuid"
28
)
29

30
type (
31
        // MessagesContext supports iterating over a messages on a stream.
32
        // It is returned by [Consumer.Messages] method.
33
        MessagesContext interface {
34
                // Next retrieves next message on a stream. It will block until the next
35
                // message is available. If the context is canceled, Next will return
36
                // ErrMsgIteratorClosed error.
37
                Next() (Msg, error)
38

39
                // Stop unsubscribes from the stream and cancels subscription. Calling
40
                // Next after calling Stop will return ErrMsgIteratorClosed error.
41
                // All messages that are already in the buffer are discarded.
42
                Stop()
43

44
                // Drain unsubscribes from the stream and cancels subscription. All
45
                // messages that are already in the buffer will be available on
46
                // subsequent calls to Next. After the buffer is drained, Next will
47
                // return ErrMsgIteratorClosed error.
48
                Drain()
49
        }
50

51
        // ConsumeContext supports processing incoming messages from a stream.
52
        // It is returned by [Consumer.Consume] method.
53
        ConsumeContext interface {
54
                // Stop unsubscribes from the stream and cancels subscription.
55
                // No more messages will be received after calling this method.
56
                // All messages that are already in the buffer are discarded.
57
                Stop()
58

59
                // Drain unsubscribes from the stream and cancels subscription.
60
                // All messages that are already in the buffer will be processed in callback function.
61
                Drain()
62

63
                // Closed returns a channel that is closed when the consuming is
64
                // fully stopped/drained. When the channel is closed, no more messages
65
                // will be received and processing is complete.
66
                Closed() <-chan struct{}
67
        }
68

69
        // MessageHandler is a handler function used as callback in [Consume].
70
        MessageHandler func(msg Msg)
71

72
        // PullConsumeOpt represent additional options used in [Consume] for pull consumers.
73
        PullConsumeOpt interface {
74
                configureConsume(*consumeOpts) error
75
        }
76

77
        // PullMessagesOpt represent additional options used in [Messages] for pull consumers.
78
        PullMessagesOpt interface {
79
                configureMessages(*consumeOpts) error
80
        }
81

82
        pullConsumer struct {
83
                sync.Mutex
84
                js      *jetStream
85
                stream  string
86
                durable bool
87
                name    string
88
                info    *ConsumerInfo
89
                subs    syncx.Map[string, *pullSubscription]
90
        }
91

92
        pullRequest struct {
93
                Expires   time.Duration `json:"expires,omitempty"`
94
                Batch     int           `json:"batch,omitempty"`
95
                MaxBytes  int           `json:"max_bytes,omitempty"`
96
                NoWait    bool          `json:"no_wait,omitempty"`
97
                Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
98
        }
99

100
        consumeOpts struct {
101
                Expires                 time.Duration
102
                MaxMessages             int
103
                MaxBytes                int
104
                LimitSize               bool
105
                Heartbeat               time.Duration
106
                ErrHandler              ConsumeErrHandlerFunc
107
                ReportMissingHeartbeats bool
108
                ThresholdMessages       int
109
                ThresholdBytes          int
110
                StopAfter               int
111
                stopAfterMsgsLeft       chan int
112
                notifyOnReconnect       bool
113
        }
114

115
        ConsumeErrHandlerFunc func(consumeCtx ConsumeContext, err error)
116

117
        pullSubscription struct {
118
                sync.Mutex
119
                id                string
120
                consumer          *pullConsumer
121
                subscription      *nats.Subscription
122
                msgs              chan *nats.Msg
123
                errs              chan error
124
                pending           pendingMsgs
125
                hbMonitor         *hbMonitor
126
                fetchInProgress   atomic.Uint32
127
                closed            atomic.Uint32
128
                draining          atomic.Uint32
129
                done              chan struct{}
130
                connStatusChanged chan nats.Status
131
                fetchNext         chan *pullRequest
132
                consumeOpts       *consumeOpts
133
                delivered         int
134
                closedCh          chan struct{}
135
        }
136

137
        pendingMsgs struct {
138
                msgCount  int
139
                byteCount int
140
        }
141

142
        MessageBatch interface {
143
                Messages() <-chan Msg
144
                Error() error
145
        }
146

147
        fetchResult struct {
148
                sync.Mutex
149
                msgs chan Msg
150
                err  error
151
                done bool
152
                sseq uint64
153
        }
154

155
        FetchOpt func(*pullRequest) error
156

157
        hbMonitor struct {
158
                timer *time.Timer
159
                sync.Mutex
160
        }
161
)
162

163
const (
164
        DefaultMaxMessages       = 500
165
        DefaultExpires           = 30 * time.Second
166
        defaultBatchMaxBytesOnly = 1_000_000
167
        unset                    = -1
168
)
169

170
func min(x, y int) int {
84✔
171
        if x < y {
101✔
172
                return x
17✔
173
        }
17✔
174
        return y
67✔
175
}
176

177
// Consume can be used to continuously receive messages and handle them
178
// with the provided callback function. Consume cannot be used concurrently
179
// when using ordered consumer.
180
//
181
// See [Consumer.Consume] for more details.
182
func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (ConsumeContext, error) {
64✔
183
        if handler == nil {
64✔
184
                return nil, ErrHandlerRequired
×
185
        }
×
186
        consumeOpts, err := parseConsumeOpts(false, opts...)
64✔
187
        if err != nil {
66✔
188
                return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err)
2✔
189
        }
2✔
190
        p.Lock()
62✔
191

62✔
192
        subject := p.js.apiSubject(fmt.Sprintf(apiRequestNextT, p.stream, p.name))
62✔
193

62✔
194
        consumeID := nuid.Next()
62✔
195
        sub := &pullSubscription{
62✔
196
                id:          consumeID,
62✔
197
                consumer:    p,
62✔
198
                errs:        make(chan error, 10),
62✔
199
                done:        make(chan struct{}, 1),
62✔
200
                fetchNext:   make(chan *pullRequest, 1),
62✔
201
                consumeOpts: consumeOpts,
62✔
202
        }
62✔
203
        sub.connStatusChanged = p.js.conn.StatusChanged(nats.CONNECTED, nats.RECONNECTING)
62✔
204

62✔
205
        sub.hbMonitor = sub.scheduleHeartbeatCheck(consumeOpts.Heartbeat)
62✔
206

62✔
207
        p.subs.Store(sub.id, sub)
62✔
208
        p.Unlock()
62✔
209

62✔
210
        internalHandler := func(msg *nats.Msg) {
563✔
211
                if sub.hbMonitor != nil {
1,002✔
212
                        sub.hbMonitor.Stop()
501✔
213
                }
501✔
214
                userMsg, msgErr := checkMsg(msg)
501✔
215
                if !userMsg && msgErr == nil {
501✔
UNCOV
216
                        if sub.hbMonitor != nil {
×
UNCOV
217
                                sub.hbMonitor.Reset(2 * consumeOpts.Heartbeat)
×
UNCOV
218
                        }
×
UNCOV
219
                        return
×
220
                }
221
                defer func() {
1,002✔
222
                        sub.Lock()
501✔
223
                        sub.checkPending()
501✔
224
                        if sub.hbMonitor != nil {
1,002✔
225
                                sub.hbMonitor.Reset(2 * consumeOpts.Heartbeat)
501✔
226
                        }
501✔
227
                        sub.Unlock()
501✔
228
                }()
229
                if !userMsg {
539✔
230
                        // heartbeat message
38✔
231
                        if msgErr == nil {
38✔
232
                                return
×
233
                        }
×
234

235
                        sub.Lock()
38✔
236
                        err := sub.handleStatusMsg(msg, msgErr)
38✔
237
                        sub.Unlock()
38✔
238

38✔
239
                        if err != nil {
46✔
240
                                if sub.closed.Load() == 1 {
9✔
241
                                        return
1✔
242
                                }
1✔
243
                                if sub.consumeOpts.ErrHandler != nil {
14✔
244
                                        sub.consumeOpts.ErrHandler(sub, err)
7✔
245
                                }
7✔
246
                                sub.Stop()
7✔
247
                        }
248
                        return
37✔
249
                }
250
                handler(p.js.toJSMsg(msg))
463✔
251
                sub.Lock()
463✔
252
                sub.decrementPendingMsgs(msg)
463✔
253
                sub.incrementDeliveredMsgs()
463✔
254
                sub.Unlock()
463✔
255

463✔
256
                if sub.consumeOpts.StopAfter > 0 && sub.consumeOpts.StopAfter == sub.delivered {
466✔
257
                        sub.Stop()
3✔
258
                }
3✔
259
        }
260
        inbox := p.js.conn.NewInbox()
62✔
261
        sub.subscription, err = p.js.conn.Subscribe(inbox, internalHandler)
62✔
262
        if err != nil {
62✔
263
                return nil, err
×
264
        }
×
265
        sub.subscription.SetClosedHandler(func(sid string) func(string) {
124✔
266
                return func(subject string) {
124✔
267
                        p.subs.Delete(sid)
62✔
268
                        sub.draining.CompareAndSwap(1, 0)
62✔
269
                        sub.Lock()
62✔
270
                        if sub.closedCh != nil {
66✔
271
                                close(sub.closedCh)
4✔
272
                                sub.closedCh = nil
4✔
273
                        }
4✔
274
                        sub.Unlock()
62✔
275
                }
276
        }(sub.id))
277

278
        sub.Lock()
62✔
279
        // initial pull
62✔
280
        sub.resetPendingMsgs()
62✔
281
        batchSize := sub.consumeOpts.MaxMessages
62✔
282
        if sub.consumeOpts.StopAfter > 0 {
66✔
283
                batchSize = min(batchSize, sub.consumeOpts.StopAfter-sub.delivered)
4✔
284
        }
4✔
285
        if err := sub.pull(&pullRequest{
62✔
286
                Expires:   consumeOpts.Expires,
62✔
287
                Batch:     batchSize,
62✔
288
                MaxBytes:  consumeOpts.MaxBytes,
62✔
289
                Heartbeat: consumeOpts.Heartbeat,
62✔
290
        }, subject); err != nil {
62✔
291
                sub.errs <- err
×
292
        }
×
293
        sub.Unlock()
62✔
294

62✔
295
        go func() {
124✔
296
                isConnected := true
62✔
297
                for {
141✔
298
                        if sub.closed.Load() == 1 {
84✔
299
                                return
5✔
300
                        }
5✔
301
                        select {
74✔
302
                        case status, ok := <-sub.connStatusChanged:
13✔
303
                                if !ok {
13✔
304
                                        continue
×
305
                                }
306
                                if status == nats.RECONNECTING {
20✔
307
                                        if sub.hbMonitor != nil {
14✔
308
                                                sub.hbMonitor.Stop()
7✔
309
                                        }
7✔
310
                                        isConnected = false
7✔
311
                                }
312
                                if status == nats.CONNECTED {
19✔
313
                                        sub.Lock()
6✔
314
                                        if !isConnected {
12✔
315
                                                isConnected = true
6✔
316
                                                if sub.consumeOpts.notifyOnReconnect {
7✔
317
                                                        sub.errs <- errConnected
1✔
318
                                                }
1✔
319

320
                                                sub.fetchNext <- &pullRequest{
6✔
321
                                                        Expires:   sub.consumeOpts.Expires,
6✔
322
                                                        Batch:     sub.consumeOpts.MaxMessages,
6✔
323
                                                        MaxBytes:  sub.consumeOpts.MaxBytes,
6✔
324
                                                        Heartbeat: sub.consumeOpts.Heartbeat,
6✔
325
                                                }
6✔
326
                                                if sub.hbMonitor != nil {
12✔
327
                                                        sub.hbMonitor.Reset(2 * sub.consumeOpts.Heartbeat)
6✔
328
                                                }
6✔
329
                                                sub.resetPendingMsgs()
6✔
330
                                        }
331
                                        sub.Unlock()
6✔
332
                                }
333
                        case err := <-sub.errs:
4✔
334
                                sub.Lock()
4✔
335
                                if sub.consumeOpts.ErrHandler != nil {
6✔
336
                                        sub.consumeOpts.ErrHandler(sub, err)
2✔
337
                                }
2✔
338
                                if errors.Is(err, ErrNoHeartbeat) {
7✔
339
                                        batchSize := sub.consumeOpts.MaxMessages
3✔
340
                                        if sub.consumeOpts.StopAfter > 0 {
3✔
341
                                                batchSize = min(batchSize, sub.consumeOpts.StopAfter-sub.delivered)
×
342
                                        }
×
343
                                        sub.fetchNext <- &pullRequest{
3✔
344
                                                Expires:   sub.consumeOpts.Expires,
3✔
345
                                                Batch:     batchSize,
3✔
346
                                                MaxBytes:  sub.consumeOpts.MaxBytes,
3✔
347
                                                Heartbeat: sub.consumeOpts.Heartbeat,
3✔
348
                                        }
3✔
349
                                        if sub.hbMonitor != nil {
6✔
350
                                                sub.hbMonitor.Reset(2 * sub.consumeOpts.Heartbeat)
3✔
351
                                        }
3✔
352
                                        sub.resetPendingMsgs()
3✔
353
                                }
354
                                sub.Unlock()
4✔
355
                        case <-sub.done:
57✔
356
                                return
57✔
357
                        }
358
                }
359
        }()
360

361
        go sub.pullMessages(subject)
62✔
362

62✔
363
        return sub, nil
62✔
364
}
365

366
// resetPendingMsgs resets pending message count and byte count
367
// to the values set in consumeOpts
368
// lock should be held before calling this method
369
func (s *pullSubscription) resetPendingMsgs() {
71✔
370
        s.pending.msgCount = s.consumeOpts.MaxMessages
71✔
371
        s.pending.byteCount = s.consumeOpts.MaxBytes
71✔
372
}
71✔
373

374
// decrementPendingMsgs decrements pending message count and byte count
375
// lock should be held before calling this method
376
func (s *pullSubscription) decrementPendingMsgs(msg *nats.Msg) {
885✔
377
        s.pending.msgCount--
885✔
378
        if s.consumeOpts.MaxBytes != 0 && !s.consumeOpts.LimitSize {
900✔
379
                s.pending.byteCount -= msg.Size()
15✔
380
        }
15✔
381
}
382

383
// incrementDeliveredMsgs increments delivered message count
384
// lock should be held before calling this method
385
func (s *pullSubscription) incrementDeliveredMsgs() {
885✔
386
        s.delivered++
885✔
387
}
885✔
388

389
// checkPending verifies whether there are enough messages in
390
// the buffer to trigger a new pull request.
391
// lock should be held before calling this method
392
func (s *pullSubscription) checkPending() {
964✔
393
        // check if we went below any threshold
964✔
394
        // we don't want to track bytes threshold if either it's not set or we used
964✔
395
        // PullMaxMessagesWithBytesLimit
964✔
396
        if (s.pending.msgCount < s.consumeOpts.ThresholdMessages ||
964✔
397
                (s.pending.byteCount < s.consumeOpts.ThresholdBytes && s.consumeOpts.MaxBytes != 0 && !s.consumeOpts.LimitSize)) &&
964✔
398
                s.fetchInProgress.Load() == 0 {
1,122✔
399

158✔
400
                var batchSize, maxBytes int
158✔
401
                batchSize = s.consumeOpts.MaxMessages - s.pending.msgCount
158✔
402
                if s.consumeOpts.MaxBytes != 0 {
203✔
403
                        if s.consumeOpts.LimitSize {
72✔
404
                                maxBytes = s.consumeOpts.MaxBytes
27✔
405
                        } else {
45✔
406
                                maxBytes = s.consumeOpts.MaxBytes - s.pending.byteCount
18✔
407
                                // when working with max bytes only, always ask for full batch
18✔
408
                                batchSize = s.consumeOpts.MaxMessages
18✔
409
                        }
18✔
410
                }
411
                if s.consumeOpts.StopAfter > 0 {
238✔
412
                        batchSize = min(batchSize, s.consumeOpts.StopAfter-s.delivered-s.pending.msgCount)
80✔
413
                }
80✔
414
                if batchSize > 0 {
257✔
415
                        s.fetchNext <- &pullRequest{
99✔
416
                                Expires:   s.consumeOpts.Expires,
99✔
417
                                Batch:     batchSize,
99✔
418
                                MaxBytes:  maxBytes,
99✔
419
                                Heartbeat: s.consumeOpts.Heartbeat,
99✔
420
                        }
99✔
421

99✔
422
                        s.pending.msgCount = s.consumeOpts.MaxMessages
99✔
423
                        s.pending.byteCount = s.consumeOpts.MaxBytes
99✔
424
                }
99✔
425
        }
426
}
427

428
// Messages returns MessagesContext, allowing continuously iterating
429
// over messages on a stream. Messages cannot be used concurrently
430
// when using ordered consumer.
431
//
432
// See [Consumer.Messages] for more details.
433
func (p *pullConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, error) {
29✔
434
        consumeOpts, err := parseMessagesOpts(false, opts...)
29✔
435
        if err != nil {
30✔
436
                return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err)
1✔
437
        }
1✔
438

439
        p.Lock()
28✔
440
        subject := p.js.apiSubject(fmt.Sprintf(apiRequestNextT, p.stream, p.name))
28✔
441

28✔
442
        msgs := make(chan *nats.Msg, consumeOpts.MaxMessages)
28✔
443

28✔
444
        consumeID := nuid.Next()
28✔
445
        sub := &pullSubscription{
28✔
446
                id:          consumeID,
28✔
447
                consumer:    p,
28✔
448
                done:        make(chan struct{}, 1),
28✔
449
                msgs:        msgs,
28✔
450
                errs:        make(chan error, 10),
28✔
451
                fetchNext:   make(chan *pullRequest, 1),
28✔
452
                consumeOpts: consumeOpts,
28✔
453
        }
28✔
454
        sub.connStatusChanged = p.js.conn.StatusChanged(nats.CONNECTED, nats.RECONNECTING)
28✔
455
        inbox := p.js.conn.NewInbox()
28✔
456
        sub.subscription, err = p.js.conn.ChanSubscribe(inbox, sub.msgs)
28✔
457
        if err != nil {
28✔
458
                p.Unlock()
×
459
                return nil, err
×
460
        }
×
461
        sub.subscription.SetClosedHandler(func(sid string) func(string) {
56✔
462
                return func(subject string) {
47✔
463
                        if sub.draining.Load() != 1 {
35✔
464
                                // if we're not draining, subscription can be closed as soon
16✔
465
                                // as closed handler is called
16✔
466
                                // otherwise, we need to wait until all messages are drained
16✔
467
                                // in Next
16✔
468
                                p.subs.Delete(sid)
16✔
469
                        }
16✔
470
                        close(msgs)
19✔
471
                }
472
        }(sub.id))
473

474
        p.subs.Store(sub.id, sub)
28✔
475
        p.Unlock()
28✔
476

28✔
477
        go sub.pullMessages(subject)
28✔
478

28✔
479
        go func() {
56✔
480
                for {
62✔
481
                        select {
34✔
482
                        case status, ok := <-sub.connStatusChanged:
6✔
483
                                if !ok {
6✔
484
                                        return
×
485
                                }
×
486
                                if status == nats.CONNECTED {
8✔
487
                                        sub.errs <- errConnected
2✔
488
                                }
2✔
489
                                if status == nats.RECONNECTING {
10✔
490
                                        sub.errs <- errDisconnected
4✔
491
                                }
4✔
492
                        case <-sub.done:
28✔
493
                                return
28✔
494
                        }
495
                }
496
        }()
497

498
        return sub, nil
28✔
499
}
500

501
var (
502
        errConnected    = errors.New("connected")
503
        errDisconnected = errors.New("disconnected")
504
)
505

506
// Next retrieves next message on a stream. It will block until the next
507
// message is available. If the context is canceled, Next will return
508
// ErrMsgIteratorClosed error.
509
func (s *pullSubscription) Next() (Msg, error) {
439✔
510
        s.Lock()
439✔
511
        defer s.Unlock()
439✔
512
        drainMode := s.draining.Load() == 1
439✔
513
        closed := s.closed.Load() == 1
439✔
514
        if closed && !drainMode {
442✔
515
                return nil, ErrMsgIteratorClosed
3✔
516
        }
3✔
517
        hbMonitor := s.scheduleHeartbeatCheck(s.consumeOpts.Heartbeat)
436✔
518
        defer func() {
872✔
519
                if hbMonitor != nil {
872✔
520
                        hbMonitor.Stop()
436✔
521
                }
436✔
522
        }()
523

524
        isConnected := true
436✔
525
        if s.consumeOpts.StopAfter > 0 && s.delivered >= s.consumeOpts.StopAfter {
440✔
526
                s.Stop()
4✔
527
                return nil, ErrMsgIteratorClosed
4✔
528
        }
4✔
529

530
        for {
887✔
531
                s.checkPending()
455✔
532
                select {
455✔
533
                case msg, ok := <-s.msgs:
449✔
534
                        if !ok {
453✔
535
                                // if msgs channel is closed, it means that subscription was either drained or stopped
4✔
536
                                s.consumer.subs.Delete(s.id)
4✔
537
                                s.draining.CompareAndSwap(1, 0)
4✔
538
                                return nil, ErrMsgIteratorClosed
4✔
539
                        }
4✔
540
                        if hbMonitor != nil {
890✔
541
                                hbMonitor.Reset(2 * s.consumeOpts.Heartbeat)
445✔
542
                        }
445✔
543
                        userMsg, msgErr := checkMsg(msg)
445✔
544
                        if !userMsg {
468✔
545
                                // heartbeat message
23✔
546
                                if msgErr == nil {
23✔
547
                                        continue
×
548
                                }
549
                                if err := s.handleStatusMsg(msg, msgErr); err != nil {
26✔
550
                                        s.Stop()
3✔
551
                                        return nil, err
3✔
552
                                }
3✔
553
                                continue
20✔
554
                        }
555
                        s.decrementPendingMsgs(msg)
422✔
556
                        s.incrementDeliveredMsgs()
422✔
557
                        return s.consumer.js.toJSMsg(msg), nil
422✔
558
                case err := <-s.errs:
6✔
559
                        if errors.Is(err, ErrNoHeartbeat) {
8✔
560
                                s.pending.msgCount = 0
2✔
561
                                s.pending.byteCount = 0
2✔
562
                                if s.consumeOpts.ReportMissingHeartbeats {
4✔
563
                                        return nil, err
2✔
564
                                }
2✔
565
                                if hbMonitor != nil {
×
566
                                        hbMonitor.Reset(2 * s.consumeOpts.Heartbeat)
×
567
                                }
×
568
                        }
569
                        if errors.Is(err, errConnected) {
6✔
570
                                if !isConnected {
4✔
571
                                        isConnected = true
2✔
572

2✔
573
                                        if s.consumeOpts.notifyOnReconnect {
3✔
574
                                                return nil, errConnected
1✔
575
                                        }
1✔
576
                                        s.pending.msgCount = 0
1✔
577
                                        s.pending.byteCount = 0
1✔
578
                                        if hbMonitor != nil {
2✔
579
                                                hbMonitor.Reset(2 * s.consumeOpts.Heartbeat)
1✔
580
                                        }
1✔
581
                                }
582
                        }
583
                        if errors.Is(err, errDisconnected) {
5✔
584
                                if hbMonitor != nil {
4✔
585
                                        hbMonitor.Stop()
2✔
586
                                }
2✔
587
                                isConnected = false
2✔
588
                        }
589
                }
590
        }
591
}
592

593
func (s *pullSubscription) handleStatusMsg(msg *nats.Msg, msgErr error) error {
61✔
594
        if !errors.Is(msgErr, nats.ErrTimeout) && !errors.Is(msgErr, ErrMaxBytesExceeded) && !errors.Is(msgErr, ErrBatchCompleted) {
88✔
595
                if errors.Is(msgErr, ErrConsumerDeleted) || errors.Is(msgErr, ErrBadRequest) {
38✔
596
                        return msgErr
11✔
597
                }
11✔
598
                if s.consumeOpts.ErrHandler != nil {
20✔
599
                        s.consumeOpts.ErrHandler(s, msgErr)
4✔
600
                }
4✔
601
                if errors.Is(msgErr, ErrConsumerLeadershipChanged) {
16✔
602
                        s.pending.msgCount = 0
×
603
                        s.pending.byteCount = 0
×
604
                }
×
605
                return nil
16✔
606
        }
607
        msgsLeft, bytesLeft, err := parsePending(msg)
34✔
608
        if err != nil {
34✔
609
                return err
×
610
        }
×
611
        s.pending.msgCount -= msgsLeft
34✔
612
        if s.pending.msgCount < 0 {
36✔
613
                s.pending.msgCount = 0
2✔
614
        }
2✔
615
        if s.consumeOpts.MaxBytes > 0 && !s.consumeOpts.LimitSize {
42✔
616
                s.pending.byteCount -= bytesLeft
8✔
617
                if s.pending.byteCount < 0 {
8✔
618
                        s.pending.byteCount = 0
×
619
                }
×
620
        }
621
        return nil
34✔
622
}
623

624
func (hb *hbMonitor) Stop() {
990✔
625
        hb.Mutex.Lock()
990✔
626
        hb.timer.Stop()
990✔
627
        hb.Mutex.Unlock()
990✔
628
}
990✔
629

630
func (hb *hbMonitor) Reset(dur time.Duration) {
1,039✔
631
        hb.Mutex.Lock()
1,039✔
632
        hb.timer.Reset(dur)
1,039✔
633
        hb.Mutex.Unlock()
1,039✔
634
}
1,039✔
635

636
// Stop unsubscribes from the stream and cancels subscription. Calling
637
// Next after calling Stop will return ErrMsgIteratorClosed error.
638
// All messages that are already in the buffer are discarded.
639
func (s *pullSubscription) Stop() {
120✔
640
        if !s.closed.CompareAndSwap(0, 1) {
157✔
641
                return
37✔
642
        }
37✔
643
        close(s.done)
83✔
644
        if s.consumeOpts.stopAfterMsgsLeft != nil {
88✔
645
                if s.delivered >= s.consumeOpts.StopAfter {
8✔
646
                        close(s.consumeOpts.stopAfterMsgsLeft)
3✔
647
                } else {
5✔
648
                        s.consumeOpts.stopAfterMsgsLeft <- s.consumeOpts.StopAfter - s.delivered
2✔
649
                }
2✔
650
        }
651
}
652

653
// Drain unsubscribes from the stream and cancels subscription. All
654
// messages that are already in the buffer will be available on
655
// subsequent calls to Next. After the buffer is drained, Next will
656
// return ErrMsgIteratorClosed error.
657
func (s *pullSubscription) Drain() {
7✔
658
        if !s.closed.CompareAndSwap(0, 1) {
7✔
659
                return
×
660
        }
×
661
        s.draining.Store(1)
7✔
662
        close(s.done)
7✔
663
        if s.consumeOpts.stopAfterMsgsLeft != nil {
7✔
664
                if s.delivered >= s.consumeOpts.StopAfter {
×
665
                        close(s.consumeOpts.stopAfterMsgsLeft)
×
666
                } else {
×
667
                        s.consumeOpts.stopAfterMsgsLeft <- s.consumeOpts.StopAfter - s.delivered
×
668
                }
×
669
        }
670
}
671

672
// Closed returns a channel that is closed when consuming is
673
// fully stopped/drained. When the channel is closed, no more messages
674
// will be received and processing is complete.
675
func (s *pullSubscription) Closed() <-chan struct{} {
6✔
676
        s.Lock()
6✔
677
        defer s.Unlock()
6✔
678
        closedCh := s.closedCh
6✔
679
        if closedCh == nil {
12✔
680
                closedCh = make(chan struct{})
6✔
681
                s.closedCh = closedCh
6✔
682
        }
6✔
683
        if !s.subscription.IsValid() {
8✔
684
                close(s.closedCh)
2✔
685
                s.closedCh = nil
2✔
686
        }
2✔
687
        return closedCh
6✔
688
}
689

690
// Fetch sends a single request to retrieve given number of messages.
691
// It will wait up to provided expiry time if not all messages are available.
692
func (p *pullConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error) {
1,047✔
693
        req := &pullRequest{
1,047✔
694
                Batch:     batch,
1,047✔
695
                Expires:   DefaultExpires,
1,047✔
696
                Heartbeat: unset,
1,047✔
697
        }
1,047✔
698
        for _, opt := range opts {
2,060✔
699
                if err := opt(req); err != nil {
1,015✔
700
                        return nil, err
2✔
701
                }
2✔
702
        }
703
        // if heartbeat was not explicitly set, set it to 5 seconds for longer pulls
704
        // and disable it for shorter pulls
705
        if req.Heartbeat == unset {
2,088✔
706
                if req.Expires >= 10*time.Second {
1,078✔
707
                        req.Heartbeat = 5 * time.Second
35✔
708
                } else {
1,043✔
709
                        req.Heartbeat = 0
1,008✔
710
                }
1,008✔
711
        }
712
        if req.Expires < 2*req.Heartbeat {
1,047✔
713
                return nil, fmt.Errorf("%w: expiry time should be at least 2 times the heartbeat", ErrInvalidOption)
2✔
714
        }
2✔
715

716
        return p.fetch(req)
1,043✔
717
}
718

719
// FetchBytes is used to retrieve up to a provided bytes from the stream.
720
func (p *pullConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error) {
11✔
721
        req := &pullRequest{
11✔
722
                Batch:     defaultBatchMaxBytesOnly,
11✔
723
                MaxBytes:  maxBytes,
11✔
724
                Expires:   DefaultExpires,
11✔
725
                Heartbeat: unset,
11✔
726
        }
11✔
727
        for _, opt := range opts {
19✔
728
                if err := opt(req); err != nil {
9✔
729
                        return nil, err
1✔
730
                }
1✔
731
        }
732
        // if heartbeat was not explicitly set, set it to 5 seconds for longer pulls
733
        // and disable it for shorter pulls
734
        if req.Heartbeat == unset {
18✔
735
                if req.Expires >= 10*time.Second {
12✔
736
                        req.Heartbeat = 5 * time.Second
4✔
737
                } else {
8✔
738
                        req.Heartbeat = 0
4✔
739
                }
4✔
740
        }
741
        if req.Expires < 2*req.Heartbeat {
12✔
742
                return nil, fmt.Errorf("%w: expiry time should be at least 2 times the heartbeat", ErrInvalidOption)
2✔
743
        }
2✔
744

745
        return p.fetch(req)
8✔
746
}
747

748
// FetchNoWait sends a single request to retrieve given number of messages.
749
// FetchNoWait will only return messages that are available at the time of the
750
// request. It will not wait for more messages to arrive.
751
func (p *pullConsumer) FetchNoWait(batch int) (MessageBatch, error) {
41✔
752
        req := &pullRequest{
41✔
753
                Batch:  batch,
41✔
754
                NoWait: true,
41✔
755
        }
41✔
756

41✔
757
        return p.fetch(req)
41✔
758
}
41✔
759

760
func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) {
1,092✔
761
        res := &fetchResult{
1,092✔
762
                msgs: make(chan Msg, req.Batch),
1,092✔
763
        }
1,092✔
764
        msgs := make(chan *nats.Msg, 2*req.Batch)
1,092✔
765
        subject := p.js.apiSubject(fmt.Sprintf(apiRequestNextT, p.stream, p.name))
1,092✔
766

1,092✔
767
        sub := &pullSubscription{
1,092✔
768
                consumer: p,
1,092✔
769
                done:     make(chan struct{}, 1),
1,092✔
770
                msgs:     msgs,
1,092✔
771
                errs:     make(chan error, 10),
1,092✔
772
        }
1,092✔
773
        inbox := p.js.conn.NewInbox()
1,092✔
774
        var err error
1,092✔
775
        sub.subscription, err = p.js.conn.ChanSubscribe(inbox, sub.msgs)
1,092✔
776
        if err != nil {
1,092✔
777
                return nil, err
×
778
        }
×
779
        if err := sub.pull(req, subject); err != nil {
1,092✔
780
                return nil, err
×
781
        }
×
782

783
        var receivedMsgs, receivedBytes int
1,092✔
784
        hbTimer := sub.scheduleHeartbeatCheck(req.Heartbeat)
1,092✔
785
        go func(res *fetchResult) {
2,184✔
786
                defer sub.subscription.Unsubscribe()
1,092✔
787
                defer close(res.msgs)
1,092✔
788
                for {
2,262✔
789
                        select {
1,170✔
790
                        case msg := <-msgs:
1,170✔
791
                                res.Lock()
1,170✔
792
                                if hbTimer != nil {
1,253✔
793
                                        hbTimer.Reset(2 * req.Heartbeat)
83✔
794
                                }
83✔
795
                                userMsg, err := checkMsg(msg)
1,170✔
796
                                if err != nil {
1,197✔
797
                                        errNotTimeoutOrNoMsgs := !errors.Is(err, nats.ErrTimeout) && !errors.Is(err, ErrNoMessages)
27✔
798
                                        if errNotTimeoutOrNoMsgs && !errors.Is(err, ErrMaxBytesExceeded) {
32✔
799
                                                res.err = err
5✔
800
                                        }
5✔
801
                                        res.done = true
27✔
802
                                        res.Unlock()
27✔
803
                                        return
27✔
804
                                }
805
                                if !userMsg {
1,143✔
806
                                        res.Unlock()
×
807
                                        continue
×
808
                                }
809
                                res.msgs <- p.js.toJSMsg(msg)
1,143✔
810
                                meta, err := msg.Metadata()
1,143✔
811
                                if err != nil {
1,143✔
812
                                        res.err = fmt.Errorf("parsing message metadata: %s", err)
×
813
                                        res.done = true
×
814
                                        res.Unlock()
×
815
                                        return
×
816
                                }
×
817
                                res.sseq = meta.Sequence.Stream
1,143✔
818
                                receivedMsgs++
1,143✔
819
                                if req.MaxBytes != 0 {
1,167✔
820
                                        receivedBytes += msg.Size()
24✔
821
                                }
24✔
822
                                if receivedMsgs == req.Batch || (req.MaxBytes != 0 && receivedBytes >= req.MaxBytes) {
2,208✔
823
                                        res.done = true
1,065✔
824
                                        res.Unlock()
1,065✔
825
                                        return
1,065✔
826
                                }
1,065✔
827
                                res.Unlock()
78✔
828
                        case err := <-sub.errs:
×
829
                                res.Lock()
×
830
                                res.err = err
×
831
                                res.done = true
×
832
                                res.Unlock()
×
833
                                return
×
834
                        case <-time.After(req.Expires + 1*time.Second):
×
835
                                res.Lock()
×
836
                                res.done = true
×
837
                                res.Unlock()
×
838
                                return
×
839
                        }
840
                }
841
        }(res)
842
        return res, nil
1,092✔
843
}
844

845
func (fr *fetchResult) Messages() <-chan Msg {
1,092✔
846
        fr.Lock()
1,092✔
847
        defer fr.Unlock()
1,092✔
848
        return fr.msgs
1,092✔
849
}
1,092✔
850

851
func (fr *fetchResult) Error() error {
6,731,306✔
852
        fr.Lock()
6,731,306✔
853
        defer fr.Unlock()
6,731,306✔
854
        return fr.err
6,731,306✔
855
}
6,731,306✔
856

857
func (fr *fetchResult) closed() bool {
1,006✔
858
        fr.Lock()
1,006✔
859
        defer fr.Unlock()
1,006✔
860
        return fr.done
1,006✔
861
}
1,006✔
862

863
// Next is used to retrieve the next message from the stream. This
864
// method will block until the message is retrieved or timeout is
865
// reached.
866
func (p *pullConsumer) Next(opts ...FetchOpt) (Msg, error) {
8✔
867
        res, err := p.Fetch(1, opts...)
8✔
868
        if err != nil {
8✔
869
                return nil, err
×
870
        }
×
871
        msg := <-res.Messages()
8✔
872
        if msg != nil {
14✔
873
                return msg, nil
6✔
874
        }
6✔
875
        if res.Error() == nil {
3✔
876
                return nil, nats.ErrTimeout
1✔
877
        }
1✔
878
        return nil, res.Error()
1✔
879
}
880

881
func (s *pullSubscription) pullMessages(subject string) {
90✔
882
        for {
283✔
883
                select {
193✔
884
                case req := <-s.fetchNext:
105✔
885
                        s.fetchInProgress.Store(1)
105✔
886

105✔
887
                        if err := s.pull(req, subject); err != nil {
107✔
888
                                if errors.Is(err, ErrMsgIteratorClosed) {
4✔
889
                                        s.cleanup()
2✔
890
                                        return
2✔
891
                                }
2✔
892
                                s.errs <- err
×
893
                        }
894
                        s.fetchInProgress.Store(0)
103✔
895
                case <-s.done:
88✔
896
                        s.cleanup()
88✔
897
                        return
88✔
898
                }
899
        }
900
}
901

902
func (s *pullSubscription) scheduleHeartbeatCheck(dur time.Duration) *hbMonitor {
1,590✔
903
        if dur == 0 {
2,643✔
904
                return nil
1,053✔
905
        }
1,053✔
906
        return &hbMonitor{
537✔
907
                timer: time.AfterFunc(2*dur, func() {
585✔
908
                        s.errs <- ErrNoHeartbeat
48✔
909
                }),
48✔
910
        }
911
}
912

913
func (s *pullSubscription) cleanup() {
90✔
914
        // For now this function does not need to hold the lock.
90✔
915
        // Holding the lock here might cause a deadlock if Next()
90✔
916
        // is already holding the lock and waiting.
90✔
917
        // The fields that are read (subscription, hbMonitor)
90✔
918
        // are read only (Only written on creation of pullSubscription).
90✔
919
        if s.subscription == nil || !s.subscription.IsValid() {
115✔
920
                return
25✔
921
        }
25✔
922
        if s.hbMonitor != nil {
109✔
923
                s.hbMonitor.Stop()
44✔
924
        }
44✔
925
        drainMode := s.draining.Load() == 1
65✔
926
        if drainMode {
72✔
927
                s.subscription.Drain()
7✔
928
        } else {
65✔
929
                s.subscription.Unsubscribe()
58✔
930
        }
58✔
931
        s.closed.Store(1)
65✔
932
}
933

934
// pull sends a pull request to the server and waits for messages using a subscription from [pullSubscription].
935
// Messages will be fetched up to given batch_size or until there are no more messages or timeout is returned
936
func (s *pullSubscription) pull(req *pullRequest, subject string) error {
1,259✔
937
        s.consumer.Lock()
1,259✔
938
        defer s.consumer.Unlock()
1,259✔
939
        if s.closed.Load() == 1 {
1,261✔
940
                return ErrMsgIteratorClosed
2✔
941
        }
2✔
942
        if req.Batch < 1 {
1,257✔
943
                return fmt.Errorf("%w: batch size must be at least 1", nats.ErrInvalidArg)
×
944
        }
×
945
        reqJSON, err := json.Marshal(req)
1,257✔
946
        if err != nil {
1,257✔
947
                return err
×
948
        }
×
949

950
        reply := s.subscription.Subject
1,257✔
951
        if err := s.consumer.js.conn.PublishRequest(subject, reply, reqJSON); err != nil {
1,257✔
952
                return err
×
953
        }
×
954
        return nil
1,257✔
955
}
956

957
func parseConsumeOpts(ordered bool, opts ...PullConsumeOpt) (*consumeOpts, error) {
90✔
958
        consumeOpts := &consumeOpts{
90✔
959
                MaxMessages:             unset,
90✔
960
                MaxBytes:                unset,
90✔
961
                Expires:                 DefaultExpires,
90✔
962
                Heartbeat:               unset,
90✔
963
                ReportMissingHeartbeats: true,
90✔
964
                StopAfter:               unset,
90✔
965
        }
90✔
966
        for _, opt := range opts {
194✔
967
                if err := opt.configureConsume(consumeOpts); err != nil {
106✔
968
                        return nil, err
2✔
969
                }
2✔
970
        }
971
        if err := consumeOpts.setDefaults(ordered); err != nil {
88✔
972
                return nil, err
×
973
        }
×
974
        return consumeOpts, nil
88✔
975
}
976

977
func parseMessagesOpts(ordered bool, opts ...PullMessagesOpt) (*consumeOpts, error) {
36✔
978
        consumeOpts := &consumeOpts{
36✔
979
                MaxMessages:             unset,
36✔
980
                MaxBytes:                unset,
36✔
981
                Expires:                 DefaultExpires,
36✔
982
                Heartbeat:               unset,
36✔
983
                ReportMissingHeartbeats: true,
36✔
984
                StopAfter:               unset,
36✔
985
        }
36✔
986
        for _, opt := range opts {
85✔
987
                if err := opt.configureMessages(consumeOpts); err != nil {
50✔
988
                        return nil, err
1✔
989
                }
1✔
990
        }
991
        if err := consumeOpts.setDefaults(ordered); err != nil {
35✔
992
                return nil, err
×
993
        }
×
994
        return consumeOpts, nil
35✔
995
}
996

997
func (consumeOpts *consumeOpts) setDefaults(ordered bool) error {
123✔
998
        // we cannot use both max messages and max bytes unless we're using max bytes as fetch size limiter
123✔
999
        if consumeOpts.MaxBytes != unset && consumeOpts.MaxMessages != unset && !consumeOpts.LimitSize {
123✔
1000
                return errors.New("only one of MaxMessages and MaxBytes can be specified")
×
1001
        }
×
1002
        if consumeOpts.MaxBytes != unset && !consumeOpts.LimitSize {
126✔
1003
                // we used PullMaxBytes setting, set MaxMessages to a high value
3✔
1004
                consumeOpts.MaxMessages = defaultBatchMaxBytesOnly
3✔
1005
        } else if consumeOpts.MaxMessages == unset {
223✔
1006
                // otherwise, if max messages is not set, set it to default value
100✔
1007
                consumeOpts.MaxMessages = DefaultMaxMessages
100✔
1008
        }
100✔
1009
        // if user did not set max bytes, set it to 0
1010
        if consumeOpts.MaxBytes == unset {
240✔
1011
                consumeOpts.MaxBytes = 0
117✔
1012
        }
117✔
1013

1014
        if consumeOpts.ThresholdMessages == 0 {
246✔
1015
                // half of the max messages, rounded up
123✔
1016
                consumeOpts.ThresholdMessages = int(math.Ceil(float64(consumeOpts.MaxMessages) / 2))
123✔
1017
        }
123✔
1018
        if consumeOpts.ThresholdBytes == 0 {
246✔
1019
                // half of the max bytes, rounded up
123✔
1020
                consumeOpts.ThresholdBytes = int(math.Ceil(float64(consumeOpts.MaxBytes) / 2))
123✔
1021
        }
123✔
1022

1023
        // set default heartbeats
1024
        if consumeOpts.Heartbeat == unset {
239✔
1025
                // by default, use 50% of expiry time
116✔
1026
                consumeOpts.Heartbeat = consumeOpts.Expires / 2
116✔
1027
                if ordered {
148✔
1028
                        // for ordered consumers, the default heartbeat is 5 seconds
32✔
1029
                        if consumeOpts.Expires < 10*time.Second {
32✔
1030
                                consumeOpts.Heartbeat = consumeOpts.Expires / 2
×
1031
                        } else {
32✔
1032
                                consumeOpts.Heartbeat = 5 * time.Second
32✔
1033
                        }
32✔
1034
                } else if consumeOpts.Heartbeat > 30*time.Second {
84✔
1035
                        // cap the heartbeat to 30 seconds
×
1036
                        consumeOpts.Heartbeat = 30 * time.Second
×
1037
                }
×
1038
        }
1039
        if consumeOpts.Heartbeat > consumeOpts.Expires/2 {
123✔
1040
                return errors.New("the value of Heartbeat must be less than 50%% of expiry")
×
1041
        }
×
1042
        return nil
123✔
1043
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc