• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dunglas / mercure / 4747719278

19 Apr 2023 08:41PM UTC coverage: 84.056% (-0.3%) from 84.391%
4747719278

Pull #759

github

Kévin Dunglas
const for buffer length
Pull Request #759: fix: don't block when subscriber chan is full

29 of 29 new or added lines in 1 file covered. (100.0%)

1513 of 1800 relevant lines covered (84.06%)

62.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.7
/subscriber.go
1
package mercure
2

3
import (
4
        "fmt"
5
        "net/url"
6
        "regexp"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/gofrs/uuid"
11
        uritemplate "github.com/yosida95/uritemplate/v3"
12
        "go.uber.org/zap"
13
        "go.uber.org/zap/zapcore"
14
)
15

16
// Subscriber represents a client subscribed to a list of topics.
17
type Subscriber struct {
18
        ID                     string
19
        EscapedID              string
20
        Claims                 *claims
21
        EscapedTopics          []string
22
        RequestLastEventID     string
23
        RemoteAddr             string
24
        SubscribedTopics       []string
25
        SubscribedTopicRegexps []*regexp.Regexp
26
        AllowedPrivateTopics   []string
27
        AllowedPrivateRegexps  []*regexp.Regexp
28
        Debug                  bool
29

30
        disconnected        int32
31
        out                 chan *Update
32
        outMutex            sync.RWMutex
33
        responseLastEventID chan string
34
        logger              Logger
35
        ready               int32
36
        liveQueue           []*Update
37
        liveMutex           sync.RWMutex
38
}
39

40
const outBufferLength = 1000
41

42
// NewSubscriber creates a new subscriber.
43
func NewSubscriber(lastEventID string, logger Logger) *Subscriber {
44
        id := "urn:uuid:" + uuid.Must(uuid.NewV4()).String()
107✔
45
        s := &Subscriber{
107✔
46
                ID:                  id,
107✔
47
                EscapedID:           url.QueryEscape(id),
107✔
48
                RequestLastEventID:  lastEventID,
107✔
49
                responseLastEventID: make(chan string, 1),
107✔
50
                out:                 make(chan *Update, outBufferLength),
107✔
51
                logger:              logger,
107✔
52
        }
107✔
53

107✔
54
        return s
107✔
55
}
107✔
56

107✔
57
// Dispatch an update to the subscriber.
58
// Security checks must (topics matching) be done before calling Dispatch,
59
// for instance by calling Match.
60
func (s *Subscriber) Dispatch(u *Update, fromHistory bool) bool {
61
        if atomic.LoadInt32(&s.disconnected) > 0 {
1,188✔
62
                return false
1,269✔
63
        }
81✔
64

81✔
65
        if !fromHistory && atomic.LoadInt32(&s.ready) < 1 {
66
                s.liveMutex.Lock()
1,109✔
67
                if s.ready < 1 {
2✔
68
                        s.liveQueue = append(s.liveQueue, u)
4✔
69
                        s.liveMutex.Unlock()
2✔
70

2✔
71
                        return true
2✔
72
                }
2✔
73
                s.liveMutex.Unlock()
2✔
74
        }
×
75

76
        s.outMutex.Lock()
77
        if atomic.LoadInt32(&s.disconnected) > 0 {
1,105✔
78
                s.outMutex.Unlock()
1,105✔
79

×
80
                return false
×
81
        }
×
82

×
83
        select {
84
        case s.out <- u:
1,105✔
85
                s.outMutex.Unlock()
1,104✔
86
        default:
1,104✔
87
                s.handleFullChan()
1✔
88

1✔
89
                return false
1✔
90
        }
1✔
91

92
        return true
93
}
1,104✔
94

95
// Ready flips the ready flag to true and flushes queued live updates returning number of events flushed.
96
func (s *Subscriber) Ready() (n int) {
97
        s.liveMutex.Lock()
90✔
98
        s.outMutex.Lock()
90✔
99

90✔
100
        for _, u := range s.liveQueue {
90✔
101
                select {
92✔
102
                case s.out <- u:
2✔
103
                        n++
2✔
104
                default:
2✔
105
                        s.handleFullChan()
×
106
                        s.liveMutex.Unlock()
×
107

×
108
                        return n
×
109
                }
×
110
        }
111
        atomic.StoreInt32(&s.ready, 1)
112

90✔
113
        s.outMutex.Unlock()
90✔
114
        s.liveMutex.Unlock()
90✔
115

90✔
116
        return n
90✔
117
}
90✔
118

119
// Receive returns a chan when incoming updates are dispatched.
120
func (s *Subscriber) Receive() <-chan *Update {
121
        return s.out
165✔
122
}
165✔
123

165✔
124
// HistoryDispatched must be called when all messages coming from the history have been dispatched.
125
func (s *Subscriber) HistoryDispatched(responseLastEventID string) {
126
        s.responseLastEventID <- responseLastEventID
14✔
127
}
14✔
128

14✔
129
// Disconnect disconnects the subscriber.
130
func (s *Subscriber) Disconnect() {
131
        if atomic.LoadInt32(&s.disconnected) > 0 {
87✔
132
                return
92✔
133
        }
5✔
134

5✔
135
        s.outMutex.Lock()
136
        defer s.outMutex.Unlock()
82✔
137

82✔
138
        atomic.StoreInt32(&s.disconnected, 1)
82✔
139
        close(s.out)
82✔
140
}
82✔
141

142
// SetTopics compiles topic selector regexps.
143
func (s *Subscriber) SetTopics(subscribedTopics, allowedPrivateTopics []string) {
144
        s.SubscribedTopics = subscribedTopics
87✔
145
        s.SubscribedTopicRegexps = make([]*regexp.Regexp, len(subscribedTopics))
87✔
146
        for i, ts := range subscribedTopics {
87✔
147
                var r *regexp.Regexp
190✔
148
                if tpl, err := uritemplate.New(ts); err == nil {
103✔
149
                        r = tpl.Regexp()
203✔
150
                }
100✔
151
                s.SubscribedTopicRegexps[i] = r
100✔
152
        }
103✔
153
        s.AllowedPrivateTopics = allowedPrivateTopics
154
        s.AllowedPrivateRegexps = make([]*regexp.Regexp, len(allowedPrivateTopics))
87✔
155
        for i, ts := range allowedPrivateTopics {
87✔
156
                var r *regexp.Regexp
99✔
157
                if tpl, err := uritemplate.New(ts); err == nil {
12✔
158
                        r = tpl.Regexp()
24✔
159
                }
12✔
160
                s.AllowedPrivateRegexps[i] = r
12✔
161
        }
12✔
162
        s.EscapedTopics = escapeTopics(subscribedTopics)
163
}
87✔
164

165
func escapeTopics(topics []string) []string {
166
        escapedTopics := make([]string, 0, len(topics))
87✔
167
        for _, topic := range topics {
87✔
168
                escapedTopics = append(escapedTopics, url.QueryEscape(topic))
190✔
169
        }
103✔
170

103✔
171
        return escapedTopics
172
}
87✔
173

174
// MatchTopic checks if the current subscriber can access to the given topic.
175
//
176
//nolint:gocognit
177
func (s *Subscriber) MatchTopics(topics []string, private bool) bool {
178
        var subscribed bool
120✔
179
        canAccess := !private
120✔
180

120✔
181
        for _, topic := range topics {
120✔
182
                if !subscribed {
244✔
183
                        for i, ts := range s.SubscribedTopics {
246✔
184
                                if ts == "*" || ts == topic {
288✔
185
                                        subscribed = true
248✔
186

82✔
187
                                        break
82✔
188
                                }
82✔
189

190
                                r := s.SubscribedTopicRegexps[i]
191
                                if r != nil && r.MatchString(topic) {
84✔
192
                                        subscribed = true
110✔
193

26✔
194
                                        break
26✔
195
                                }
26✔
196
                        }
197
                }
198

199
                if !canAccess {
200
                        for i, ts := range s.AllowedPrivateTopics {
144✔
201
                                if ts == "*" || ts == topic {
43✔
202
                                        canAccess = true
30✔
203

7✔
204
                                        break
7✔
205
                                }
7✔
206

207
                                r := s.AllowedPrivateRegexps[i]
208
                                if r != nil && r.MatchString(topic) {
16✔
209
                                        canAccess = true
19✔
210

3✔
211
                                        break
3✔
212
                                }
3✔
213
                        }
214
                }
215

216
                if subscribed && canAccess {
217
                        return true
225✔
218
                }
101✔
219
        }
101✔
220

221
        return false
222
}
19✔
223

224
// Match checks if the current subscriber can receive the given update.
225
func (s *Subscriber) Match(u *Update) bool {
226
        return s.MatchTopics(u.Topics, u.Private)
32✔
227
}
32✔
228

32✔
229
// getSubscriptions return the list of subscriptions associated to this subscriber.
230
func (s *Subscriber) getSubscriptions(topic, context string, active bool) []subscription {
231
        var subscriptions []subscription //nolint:prealloc
17✔
232
        for k, t := range s.SubscribedTopics {
17✔
233
                if topic != "" && !s.MatchTopics([]string{topic}, false) {
37✔
234
                        continue
23✔
235
                }
3✔
236

237
                subscription := subscription{
238
                        Context:    context,
17✔
239
                        ID:         "/.well-known/mercure/subscriptions/" + s.EscapedTopics[k] + "/" + s.EscapedID,
17✔
240
                        Type:       "Subscription",
17✔
241
                        Subscriber: s.ID,
17✔
242
                        Topic:      t,
17✔
243
                        Active:     active,
17✔
244
                }
17✔
245
                if s.Claims != nil && s.Claims.Mercure.Payload != nil {
17✔
246
                        subscription.Payload = s.Claims.Mercure.Payload
23✔
247
                }
6✔
248

6✔
249
                subscriptions = append(subscriptions, subscription)
250
        }
17✔
251

252
        return subscriptions
253
}
17✔
254

255
func (s *Subscriber) MarshalLogObject(enc zapcore.ObjectEncoder) error {
256
        enc.AddString("id", s.ID)
10✔
257
        enc.AddString("last_event_id", s.RequestLastEventID)
10✔
258
        if s.RemoteAddr != "" {
10✔
259
                enc.AddString("remote_addr", s.RemoteAddr)
20✔
260
        }
10✔
261
        if s.AllowedPrivateTopics != nil {
10✔
262
                if err := enc.AddArray("topic_selectors", stringArray(s.AllowedPrivateTopics)); err != nil {
14✔
263
                        return fmt.Errorf("log error: %w", err)
4✔
264
                }
×
265
        }
×
266
        if s.SubscribedTopics != nil {
267
                if err := enc.AddArray("topics", stringArray(s.SubscribedTopics)); err != nil {
20✔
268
                        return fmt.Errorf("log error: %w", err)
10✔
269
                }
×
270
        }
×
271

272
        return nil
273
}
10✔
274

275
// handleFullChan disconnects the subscriber when the out channel is full.
276
func (s *Subscriber) handleFullChan() {
277
        atomic.StoreInt32(&s.disconnected, 1)
1✔
278
        s.outMutex.Unlock()
1✔
279

1✔
280
        if c := s.logger.Check(zap.ErrorLevel, "subscriber unable to receive updates fast enough"); c != nil {
1✔
281
                c.Write(zap.Object("subscriber", s))
1✔
282
        }
×
283
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc