• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

gameap / gameap / 25960901307

16 May 2026 11:29AM UTC coverage: 76.887% (+0.2%) from 76.64%
25960901307

push

github

et-nik
audit logs tests

45395 of 59041 relevant lines covered (76.89%)

33895.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.79
/internal/pubsub/postgres/postgres.go
1
package postgres
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "log/slog"
7
        "sync"
8
        "time"
9

10
        "github.com/gameap/gameap/internal/pubsub"
11
        "github.com/gameap/gameap/pkg/idgen"
12
        "github.com/jackc/pgx/v5"
13
        "github.com/jackc/pgx/v5/pgconn"
14
        "github.com/jackc/pgx/v5/pgxpool"
15
        "github.com/pkg/errors"
16
)
17

18
const (
19
        maxPayloadSize = 7900
20

21
        // transportChannel is the single PostgreSQL LISTEN/NOTIFY channel used to
22
        // route every pub-sub message. Pattern matching happens in Go via
23
        // pubsub.MatchPattern because PG LISTEN/NOTIFY only supports exact
24
        // channel-name matching.
25
        transportChannel = "gameap_pubsub"
26
)
27

28
type Postgres struct {
29
        pool              *pgxpool.Pool
30
        connStr           string
31
        handlers          map[string][]pubsub.Handler
32
        mu                sync.RWMutex
33
        logger            *slog.Logger
34
        instanceID        string
35
        closed            bool
36
        closeOnce         sync.Once
37
        wg                sync.WaitGroup
38
        reconnectInterval time.Duration
39
        maxReconnectDelay time.Duration
40
}
41

42
type Config struct {
43
        ConnStr           string
44
        InstanceID        string
45
        ReconnectInterval time.Duration
46
        MaxReconnectDelay time.Duration
47
}
48

49
func New(cfg Config) (*Postgres, error) {
7✔
50
        poolCfg, err := pgxpool.ParseConfig(cfg.ConnStr)
7✔
51
        if err != nil {
7✔
52
                return nil, errors.Wrap(err, "failed to parse connection string")
×
53
        }
×
54

55
        pool, err := pgxpool.NewWithConfig(context.Background(), poolCfg)
7✔
56
        if err != nil {
7✔
57
                return nil, errors.Wrap(err, "failed to create connection pool")
×
58
        }
×
59

60
        instanceID := cfg.InstanceID
7✔
61
        if instanceID == "" {
7✔
62
                instanceID = idgen.New()
×
63
        }
×
64

65
        reconnectInterval := cfg.ReconnectInterval
7✔
66
        if reconnectInterval == 0 {
14✔
67
                reconnectInterval = 5 * time.Second
7✔
68
        }
7✔
69

70
        maxReconnectDelay := cfg.MaxReconnectDelay
7✔
71
        if maxReconnectDelay == 0 {
14✔
72
                maxReconnectDelay = 2 * time.Minute
7✔
73
        }
7✔
74

75
        return &Postgres{
7✔
76
                pool:              pool,
7✔
77
                connStr:           cfg.ConnStr,
7✔
78
                handlers:          make(map[string][]pubsub.Handler),
7✔
79
                logger:            slog.Default(),
7✔
80
                instanceID:        instanceID,
7✔
81
                reconnectInterval: reconnectInterval,
7✔
82
                maxReconnectDelay: maxReconnectDelay,
7✔
83
        }, nil
7✔
84
}
85

86
func (p *Postgres) Publish(ctx context.Context, _ string, msg *pubsub.Message) error {
8✔
87
        p.mu.RLock()
8✔
88
        closed := p.closed
8✔
89
        p.mu.RUnlock()
8✔
90

8✔
91
        if closed {
9✔
92
                return pubsub.ErrClosed
1✔
93
        }
1✔
94

95
        msg.Source = p.instanceID
7✔
96

7✔
97
        data, err := json.Marshal(msg)
7✔
98
        if err != nil {
7✔
99
                return errors.Wrap(err, "failed to marshal message")
×
100
        }
×
101

102
        if len(data) > maxPayloadSize {
8✔
103
                return pubsub.ErrPayloadTooLarge
1✔
104
        }
1✔
105

106
        _, err = p.pool.Exec(ctx, "SELECT pg_notify($1, $2)", transportChannel, string(data))
6✔
107
        if err != nil {
6✔
108
                return errors.Wrap(err, "failed to send notification")
×
109
        }
×
110

111
        return nil
6✔
112
}
113

114
func (p *Postgres) Subscribe(_ context.Context, pattern string, handler pubsub.Handler) error {
6✔
115
        if pattern == "" {
7✔
116
                return pubsub.ErrEmptyPattern
1✔
117
        }
1✔
118

119
        p.mu.Lock()
5✔
120
        defer p.mu.Unlock()
5✔
121

5✔
122
        if p.closed {
6✔
123
                return pubsub.ErrClosed
1✔
124
        }
1✔
125

126
        p.handlers[pattern] = append(p.handlers[pattern], handler)
4✔
127

4✔
128
        return nil
4✔
129
}
130

131
func (p *Postgres) Unsubscribe(_ context.Context, pattern string) error {
1✔
132
        p.mu.Lock()
1✔
133
        defer p.mu.Unlock()
1✔
134

1✔
135
        delete(p.handlers, pattern)
1✔
136

1✔
137
        return nil
1✔
138
}
1✔
139

140
func (p *Postgres) Start(ctx context.Context) error {
4✔
141
        p.wg.Go(func() {
8✔
142
                p.listenLoop(ctx)
4✔
143
        })
4✔
144

145
        <-ctx.Done()
4✔
146

4✔
147
        return ctx.Err()
4✔
148
}
149

150
func (p *Postgres) listenLoop(ctx context.Context) {
4✔
151
        delay := p.reconnectInterval
4✔
152

4✔
153
        for {
8✔
154
                select {
4✔
155
                case <-ctx.Done():
×
156
                        return
×
157
                default:
4✔
158
                }
159

160
                if err := p.listen(ctx); err != nil {
8✔
161
                        if ctx.Err() != nil {
8✔
162
                                return
4✔
163
                        }
4✔
164

165
                        p.logger.Error("listener error, reconnecting",
×
166
                                slog.String("error", err.Error()),
×
167
                                slog.Duration("delay", delay),
×
168
                        )
×
169

×
170
                        select {
×
171
                        case <-ctx.Done():
×
172
                                return
×
173
                        case <-time.After(delay):
×
174
                                delay = min(delay*2, p.maxReconnectDelay)
×
175
                        }
176
                } else {
×
177
                        delay = p.reconnectInterval
×
178
                }
×
179
        }
180
}
181

182
func (p *Postgres) listen(ctx context.Context) error {
4✔
183
        conn, err := pgx.Connect(ctx, p.connStr)
4✔
184
        if err != nil {
4✔
185
                return errors.Wrap(err, "failed to connect")
×
186
        }
×
187
        defer func() {
8✔
188
                _ = conn.Close(ctx)
4✔
189
        }()
4✔
190

191
        if _, err := conn.Exec(ctx, "LISTEN "+transportChannel); err != nil {
4✔
192
                return errors.Wrapf(err, "failed to listen on channel %s", transportChannel)
×
193
        }
×
194

195
        for {
28✔
196
                select {
24✔
197
                case <-ctx.Done():
×
198
                        return ctx.Err()
×
199
                default:
24✔
200
                }
201

202
                waitCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
24✔
203
                notification, err := conn.WaitForNotification(waitCtx)
24✔
204
                cancel()
24✔
205

24✔
206
                if err != nil {
42✔
207
                        if ctx.Err() != nil {
22✔
208
                                return ctx.Err()
4✔
209
                        }
4✔
210
                        if errors.Is(err, context.DeadlineExceeded) {
28✔
211
                                continue
14✔
212
                        }
213

214
                        return errors.Wrap(err, "notification error")
×
215
                }
216

217
                p.handleNotification(ctx, notification)
6✔
218
        }
219
}
220

221
func (p *Postgres) handleNotification(ctx context.Context, notification *pgconn.Notification) {
6✔
222
        var msg pubsub.Message
6✔
223
        if err := json.Unmarshal([]byte(notification.Payload), &msg); err != nil {
6✔
224
                p.logger.Error("failed to unmarshal notification",
×
225
                        slog.String("channel", notification.Channel),
×
226
                        slog.String("error", err.Error()),
×
227
                )
×
228

×
229
                return
×
230
        }
×
231

232
        p.mu.RLock()
6✔
233
        handlers := p.getMatchingHandlers(msg.Channel)
6✔
234
        p.mu.RUnlock()
6✔
235

6✔
236
        for _, handler := range handlers {
11✔
237
                pubsub.SafeCall(ctx, handler, &msg, p.logger)
5✔
238
        }
5✔
239
}
240

241
func (p *Postgres) getMatchingHandlers(channel string) []pubsub.Handler {
6✔
242
        var handlers []pubsub.Handler
6✔
243

6✔
244
        for pattern, h := range p.handlers {
11✔
245
                if pubsub.MatchPattern(pattern, channel) {
10✔
246
                        handlers = append(handlers, h...)
5✔
247
                }
5✔
248
        }
249

250
        return handlers
6✔
251
}
252

253
func (p *Postgres) Close() error {
7✔
254
        p.closeOnce.Do(func() {
14✔
255
                p.mu.Lock()
7✔
256
                p.closed = true
7✔
257
                p.mu.Unlock()
7✔
258

7✔
259
                p.pool.Close()
7✔
260
                p.wg.Wait()
7✔
261
        })
7✔
262

263
        return nil
7✔
264
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc