• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nikolayk812 / pgx-outbox / 12693441430

09 Jan 2025 03:46PM UTC coverage: 80.449% (-1.3%) from 81.734%
12693441430

push

github

nikolayk812
change signatures for reader, forwarder, forward output, demo tracing

17 of 26 new or added lines in 4 files covered. (65.38%)

1 existing line in 1 file now uncovered.

251 of 312 relevant lines covered (80.45%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.78
/reader.go
1
package outbox
2

3
import (
4
        "context"
5
        "fmt"
6
        "time"
7

8
        sq "github.com/Masterminds/squirrel"
9
        "github.com/jackc/pgx/v5"
10
        "github.com/jackc/pgx/v5/pgxpool"
11
        "github.com/nikolayk812/pgx-outbox/types"
12
)
13

14
//go:generate mockery --name=Reader --output=internal/mocks --outpkg=mocks --filename=reader_mock.go
15

16
// Reader reads outbox unpublished messages from a single outbox table.
17
// Users should prefer to interact directly with Forwarder instance instead of Reader.
18
// Read and Ack happen in different transactions.
19
type Reader interface {
20
        // Read reads unpublished messages from the outbox table that match the filter.
21
        // limit is the maximum number of messages to read.
22
        // Limit and frequency of Read invocations should be considered carefully to avoid overloading the database.
23
        Read(ctx context.Context, limit int) ([]types.Message, error)
24

25
        // Ack acknowledges / marks the messages by ids as published in a single transaction.
26
        // ids can be obtained from the Read method output.
27
        // It returns ids of acknowledged messages.
28
        Ack(ctx context.Context, ids []int64) ([]int64, error)
29
}
30

31
type reader struct {
32
        pool   *pgxpool.Pool
33
        table  string
34
        filter types.MessageFilter
35
}
36

37
func NewReader(table string, pool *pgxpool.Pool, opts ...ReadOption) (Reader, error) {
1✔
38
        if pool == nil {
2✔
39
                return nil, ErrPoolNil
1✔
40
        }
1✔
41
        if table == "" {
2✔
42
                return nil, ErrTableEmpty
1✔
43
        }
1✔
44

45
        r := &reader{
1✔
46
                pool:  pool,
1✔
47
                table: table,
1✔
48
        }
1✔
49

1✔
50
        if err := r.filter.Validate(); err != nil {
1✔
51
                return nil, fmt.Errorf("filter.Validate: %w", err)
×
52
        }
×
53

54
        for _, opt := range opts {
2✔
55
                opt(r)
1✔
56
        }
1✔
57

58
        return r, nil
1✔
59
}
60

61
// Read returns unpublished messages sorted by ID in ascending order.
62
// returns an error if
63
// - limit is LTE 0
64
// - SQL query building or DB call fails.
65
func (r *reader) Read(ctx context.Context, limit int) ([]types.Message, error) {
1✔
66
        if limit <= 0 {
2✔
67
                return nil, fmt.Errorf("limit must be GT 0, got %d", limit)
1✔
68
        }
1✔
69

70
        sb := sq.StatementBuilder.PlaceholderFormat(sq.Dollar).
1✔
71
                Select("id", "broker", "topic", "metadata", "payload").
1✔
72
                From(r.table).
1✔
73
                Where(sq.Eq{"published_at": nil})
1✔
74

1✔
75
        sb = whereFilter(sb, r.filter)
1✔
76

1✔
77
        sb = sb.OrderBy("id ASC").Limit(uint64(limit))
1✔
78

1✔
79
        q, args, err := sb.ToSql()
1✔
80
        if err != nil {
1✔
81
                return nil, fmt.Errorf("sb.ToSql: %w", err)
×
82
        }
×
83

84
        rows, err := r.pool.Query(ctx, q, args...)
1✔
85
        if err != nil {
1✔
86
                return nil, fmt.Errorf("pool.Query: %w", err)
×
87
        }
×
88

89
        result, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (types.Message, error) {
2✔
90
                var msg types.Message
1✔
91
                if err := row.Scan(&msg.ID, &msg.Broker, &msg.Topic, &msg.Metadata, &msg.Payload); err != nil {
1✔
92
                        return types.Message{}, fmt.Errorf("row.Scan: %w", err)
×
93
                }
×
94
                return msg, nil
1✔
95
        })
96
        if err != nil {
1✔
97
                return nil, fmt.Errorf("pgx.CollectRows: %w", err)
×
98
        }
×
99

100
        return result, nil
1✔
101
}
102

103
// Ack marks the messages by ids as published in a single transaction.
104
// It sets the published_at column to the current time, same for all ids.
105
// Non-existent and duplicate ids are skipped.
106
// returns an error if
107
// - SQL query building or DB call fails.
108
func (r *reader) Ack(ctx context.Context, ids []int64) ([]int64, error) {
1✔
109
        if len(ids) == 0 {
2✔
110
                return ids, nil
1✔
111
        }
1✔
112

113
        now := time.Now().UTC()
1✔
114

1✔
115
        ub := sq.StatementBuilder.PlaceholderFormat(sq.Dollar).
1✔
116
                Update(r.table).
1✔
117
                Set("published_at", now).
1✔
118
                Where(sq.Eq{"id": ids}).
1✔
119
                Where(sq.Eq{"published_at": nil}).
1✔
120
                Suffix("RETURNING id")
1✔
121

1✔
122
        q, args, err := ub.ToSql()
1✔
123
        if err != nil {
1✔
NEW
124
                return nil, fmt.Errorf("ub.ToSql: %w", err)
×
125
        }
×
126

127
        rows, err := r.pool.Query(ctx, q, args...)
1✔
128
        if err != nil {
1✔
NEW
129
                return nil, fmt.Errorf("pool.Query: %w", err)
×
NEW
130
        }
×
131

132
        updatedIDs, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (int64, error) {
2✔
133
                var id int64
1✔
134
                if err := row.Scan(&id); err != nil {
1✔
NEW
135
                        return 0, fmt.Errorf("row.Scan: %w", err)
×
NEW
136
                }
×
137
                return id, nil
1✔
138
        })
139
        if err != nil {
1✔
NEW
140
                return nil, fmt.Errorf("pgx.CollectRows: %w", err)
×
UNCOV
141
        }
×
142

143
        return updatedIDs, nil
1✔
144
}
145

146
func whereFilter(sb sq.SelectBuilder, filter types.MessageFilter) sq.SelectBuilder {
1✔
147
        if len(filter.Brokers) > 0 {
2✔
148
                sb = sb.Where(sq.Eq{"broker": filter.Brokers})
1✔
149
        }
1✔
150

151
        if len(filter.Topics) > 0 {
2✔
152
                sb = sb.Where(sq.Eq{"topic": filter.Topics})
1✔
153
        }
1✔
154

155
        return sb
1✔
156
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc