• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 13513815732

24 Feb 2025 07:33PM UTC coverage: 68.025% (-17.4%) from 85.444%
13513815732

push

github

web-flow
[FIXED] Preserve max delivered messages with Interest retention (#6575)

Resolves https://github.com/nats-io/nats-server/issues/6538

If a consumer reached max deliveries for a message, it should preserve
the redelivered state and allow inspecting its content. However, if a
new consumer would be created and consume this message as well, it would
still be removed under Interest retention.

This PR fixes that by using the redelivered state to keep marking
there's interest.

Only downside is that the redelivered state gets cleaned up after a
restart (this PR does not change/fix that). So if the consumer that had
a max delivery message keeps acknowledging messages and its
acknowledgement floor moves up, it would clean up the redelivered state
below this ack floor.

Honestly I feel like keeping messages around if max delivery is reached
makes the code very complex. It would be a lot cleaner if we'd only have
the acknowledgement floor, starting sequence, and pending messages
in-between, not also redelivered state that can be below ack floor. It's
not something we can change now I suppose, but I'd be in favor of having
messages automatically be removed once max delivery is reached and all
consumers have consumed the message. DLQ-style behavior would then be
more explicitly (and reliably) handled by the client, for example by
publishing into another stream and then TERM the message, instead of
relying on advisories that could be missed.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>

55031 of 80898 relevant lines covered (68.03%)

310123.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.87
/server/jetstream_errors.go
1
package server
2

3
import (
4
        "fmt"
5
)
6

7
type errOpts struct {
8
        err error
9
}
10

11
// ErrorOption configures a NATS Error helper
12
type ErrorOption func(*errOpts)
13

14
// Unless ensures that if err is a ApiErr that err will be returned rather than the one being created via the helper
15
func Unless(err error) ErrorOption {
1,542✔
16
        return func(opts *errOpts) {
3,084✔
17
                opts.err = err
1,542✔
18
        }
1,542✔
19
}
20

21
func parseOpts(opts []ErrorOption) *errOpts {
41,713✔
22
        eopts := &errOpts{}
41,713✔
23
        for _, opt := range opts {
43,255✔
24
                opt(eopts)
1,542✔
25
        }
1,542✔
26
        return eopts
41,713✔
27
}
28

29
type ErrorIdentifier uint16
30

31
// IsNatsErr determines if an error matches ID, if multiple IDs are given if the error matches any of these the function will be true
32
func IsNatsErr(err error, ids ...ErrorIdentifier) bool {
290✔
33
        if err == nil {
526✔
34
                return false
236✔
35
        }
236✔
36

37
        ce, ok := err.(*ApiError)
54✔
38
        if !ok || ce == nil {
59✔
39
                return false
5✔
40
        }
5✔
41

42
        for _, id := range ids {
124✔
43
                ae, ok := ApiErrors[id]
75✔
44
                if !ok || ae == nil {
75✔
45
                        continue
×
46
                }
47

48
                if ce.ErrCode == ae.ErrCode {
88✔
49
                        return true
13✔
50
                }
13✔
51
        }
52

53
        return false
36✔
54
}
55

56
// ApiError is included in all responses if there was an error.
57
type ApiError struct {
58
        Code        int    `json:"code"`
59
        ErrCode     uint16 `json:"err_code,omitempty"`
60
        Description string `json:"description,omitempty"`
61
}
62

63
// ErrorsData is the source data for generated errors as found in errors.json
64
type ErrorsData struct {
65
        Constant    string `json:"constant"`
66
        Code        int    `json:"code"`
67
        ErrCode     uint16 `json:"error_code"`
68
        Description string `json:"description"`
69
        Comment     string `json:"comment"`
70
        Help        string `json:"help"`
71
        URL         string `json:"url"`
72
        Deprecates  string `json:"deprecates"`
73
}
74

75
func (e *ApiError) Error() string {
20,080✔
76
        return fmt.Sprintf("%s (%d)", e.Description, e.ErrCode)
20,080✔
77
}
20,080✔
78

79
func (e *ApiError) toReplacerArgs(replacements []any) []string {
1,554✔
80
        var (
1,554✔
81
                ra  []string
1,554✔
82
                key string
1,554✔
83
        )
1,554✔
84

1,554✔
85
        for i, replacement := range replacements {
4,666✔
86
                if i%2 == 0 {
4,668✔
87
                        key = replacement.(string)
1,556✔
88
                        continue
1,556✔
89
                }
90

91
                switch v := replacement.(type) {
1,556✔
92
                case string:
4✔
93
                        ra = append(ra, key, v)
4✔
94
                case error:
1,547✔
95
                        ra = append(ra, key, v.Error())
1,547✔
96
                default:
5✔
97
                        ra = append(ra, key, fmt.Sprintf("%v", v))
5✔
98
                }
99
        }
100

101
        return ra
1,554✔
102
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc