• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OdyseeTeam / odysee-api / 13172799395

06 Feb 2025 06:02AM UTC coverage: 33.127% (+0.08%) from 33.052%
13172799395

push

github

web-flow
Merge pull request #540 from OdyseeTeam/fix-random-endpoint

Fix random endpoint

56 of 62 new or added lines in 4 files covered. (90.32%)

6 existing lines in 2 files now uncovered.

5353 of 16159 relevant lines covered (33.13%)

115.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.12
/pkg/queue/queue.go
1
// Package queue provides a request-response mechanism for asynchronous message passing between separate services.
2
// It works best for scenarios where processing requires execution guarantees but is time-consuming and/or failure-prone.
3
// It utilizes the "asynq" package for message delivery.
4
package queue
5

6
import (
7
        "context"
8
        "encoding/json"
9
        "errors"
10
        "fmt"
11
        "time"
12

13
        "github.com/OdyseeTeam/odysee-api/pkg/logging"
14
        "github.com/OdyseeTeam/odysee-api/pkg/logging/zapadapter"
15

16
        "github.com/hibiken/asynq"
17
        "github.com/redis/go-redis/v9"
18
)
19

20
type Options struct {
21
        concurrency       int
22
        delayFunc         asynq.RetryDelayFunc
23
        logger            logging.KVLogger
24
        requestsConnOpts  asynq.RedisConnOpt
25
        requestsConnURL   string
26
        responsesConnOpts asynq.RedisConnOpt
27
        responsesConnURL  string
28
}
29

30
type MessageOptions struct {
31
        retry              int
32
        timeout, retention time.Duration
33
}
34

35
type Queue struct {
36
        options         *Options
37
        requestsClient  *asynq.Client
38
        responsesClient *asynq.Client
39
        asynqInspector  *asynq.Inspector
40
        asynqServer     *asynq.Server
41
        handlerStopChan chan struct{}
42
        handlers        map[string]asynq.HandlerFunc
43
        logger          logging.KVLogger
44
}
45

46
func WithRequestsConnOpts(opts asynq.RedisConnOpt) func(options *Options) {
2✔
47
        return func(options *Options) {
4✔
48
                options.requestsConnOpts = opts
2✔
49
        }
2✔
50
}
51

52
func WithResponsesConnOpts(opts asynq.RedisConnOpt) func(options *Options) {
1✔
53
        return func(options *Options) {
2✔
54
                options.responsesConnOpts = opts
1✔
55
        }
1✔
56
}
57

58
func WithRequestsConnURL(url string) func(options *Options) {
×
59
        return func(options *Options) {
×
60
                opts, err := asynq.ParseRedisURI(url)
×
61
                if err != nil {
×
62
                        panic(err)
×
63
                }
64
                options.requestsConnOpts = opts
×
65
                options.requestsConnURL = url
×
66
        }
67
}
68

69
func WithResponsesConnURL(url string) func(options *Options) {
×
70
        return func(options *Options) {
×
71
                opts, err := asynq.ParseRedisURI(url)
×
72
                if err != nil {
×
73
                        panic(err)
×
74
                }
75
                options.responsesConnOpts = opts
×
76
                options.responsesConnURL = url
×
77
        }
78
}
79

80
func WithConcurrency(concurrency int) func(options *Options) {
2✔
81
        return func(options *Options) {
4✔
82
                options.concurrency = concurrency
2✔
83
        }
2✔
84
}
85

86
func WithDelayFunc(f asynq.RetryDelayFunc) func(options *Options) {
×
87
        return func(options *Options) {
×
88
                options.delayFunc = f
×
89
        }
×
90
}
91

92
func WithLogger(logger logging.KVLogger) func(options *Options) {
2✔
93
        return func(options *Options) {
4✔
94
                options.logger = logger
2✔
95
        }
2✔
96
}
97

98
func WithRequestRetry(retry int) func(options *MessageOptions) {
×
99
        return func(options *MessageOptions) {
×
100
                options.retry = retry
×
101
        }
×
102
}
103

104
func WithRequestTimeout(timeout time.Duration) func(options *MessageOptions) {
×
105
        return func(options *MessageOptions) {
×
106
                options.timeout = timeout
×
107
        }
×
108
}
109

110
func WithRequestRetention(retention time.Duration) func(options *MessageOptions) {
×
111
        return func(options *MessageOptions) {
×
112
                options.retention = retention
×
113
        }
×
114
}
115

116
// New creates a new Queue instance with Redis request and response connections.
117
// If supplied WithRequestsConnOpts, the handler will be able to receive requests.
118
// If supplied WithResponsesConnOpts, the handler will be able to send responses.
119
// Response connection is provided for convenience, there is no coordination mechanism,
120
// each response is just another independent request sent to another queue.
121
func New(optionFuncs ...func(*Options)) (*Queue, error) {
2✔
122
        options := &Options{
2✔
123
                concurrency: 3,
2✔
124
                logger:      zapadapter.NewKV(nil),
2✔
125
                delayFunc: func(n int, err error, t *asynq.Task) time.Duration {
2✔
126
                        return 10 * time.Second
×
127
                },
×
128
        }
129
        for _, optionFunc := range optionFuncs {
9✔
130
                optionFunc(options)
7✔
131
        }
7✔
132

133
        var err error
2✔
134
        var conn bool
2✔
135
        queue := &Queue{
2✔
136
                options:         options,
2✔
137
                handlerStopChan: make(chan struct{}),
2✔
138
                handlers:        map[string]asynq.HandlerFunc{},
2✔
139
                logger:          options.logger,
2✔
140
        }
2✔
141

2✔
142
        if options.responsesConnOpts != nil {
3✔
143
                err = options.responsesConnOpts.MakeRedisClient().(redis.UniversalClient).Ping(context.Background()).Err()
1✔
144
                if err != nil {
1✔
145
                        return nil, fmt.Errorf("redis responses connection failed: %w", err)
×
146
                }
×
147
                queue.responsesClient = asynq.NewClient(options.responsesConnOpts)
1✔
148
                conn = true
1✔
149
        }
150
        if options.requestsConnOpts != nil {
4✔
151
                err = options.requestsConnOpts.MakeRedisClient().(redis.UniversalClient).Ping(context.Background()).Err()
2✔
152
                if err != nil {
2✔
153
                        return nil, fmt.Errorf("redis requests connection failed: %w", err)
×
154
                }
×
155
                queue.asynqInspector = asynq.NewInspector(options.requestsConnOpts)
2✔
156
                queue.requestsClient = asynq.NewClient(options.requestsConnOpts)
2✔
157
                conn = true
2✔
158
        }
159
        if !conn {
2✔
160
                return nil, errors.New("either requests or responses connection options must be provided")
×
161
        }
×
162

163
        return queue, nil
2✔
164
}
165

166
// NewWithResponses creates a new Queue instance with mandatory request and response connection URLs.
167
func NewWithResponses(requestsConnURL, responsesConnURL string, optionFuncs ...func(*Options)) (*Queue, error) {
×
168
        if requestsConnURL == "" || responsesConnURL == "" {
×
169
                return nil, errors.New("both requests and responses connection URL must be provided")
×
170
        }
×
171
        optionFuncs = append(optionFuncs, WithRequestsConnURL(requestsConnURL), WithResponsesConnURL(responsesConnURL))
×
172
        return New(optionFuncs...)
×
173
}
174

175
// AddHandler adds a request handler function for the specified request type.
176
// Must be called before ServeUntilShutdown.
177
func (q *Queue) AddHandler(requestType string, handler func(context.Context, *asynq.Task) error) {
2✔
178
        q.logger.Info("adding request handler", "type", requestType)
2✔
179
        q.handlers[requestType] = handler
2✔
180
}
2✔
181

182
// ServeUntilShutdown launches request handlers and blocks until it's stopped.
183
func (q *Queue) ServeUntilShutdown() error {
2✔
184
        if q.options.requestsConnOpts == nil {
2✔
185
                return errors.New("requests connection options must be provided")
×
186
        }
×
187
        if len(q.handlers) == 0 {
2✔
188
                return errors.New("no request handlers registered")
×
189
        }
×
190
        q.asynqServer = asynq.NewServer(
2✔
191
                q.options.requestsConnOpts,
2✔
192
                asynq.Config{
2✔
193
                        Concurrency:    q.options.concurrency,
2✔
194
                        RetryDelayFunc: q.options.delayFunc,
2✔
195
                        Logger:         zapadapter.New(nil),
2✔
196
                },
2✔
197
        )
2✔
198
        mux := asynq.NewServeMux()
2✔
199
        for k, v := range q.handlers {
4✔
200
                q.logger.Info("initializing request handler", "type", k)
2✔
201
                mux.HandleFunc(k, v)
2✔
202
        }
2✔
203
        q.logger.Info("started queue handlers")
2✔
204

2✔
205
        go func() {
4✔
206
                t := time.NewTicker(1 * time.Second)
2✔
207
                for {
7✔
208
                        select {
5✔
209
                        case <-t.C:
3✔
210
                                q, err := q.asynqInspector.GetQueueInfo("default")
3✔
211
                                if err != nil {
3✔
UNCOV
212
                                        continue
×
213
                                }
214
                                queueTasks.WithLabelValues("active").Set(float64(q.Active))
3✔
215
                                queueTasks.WithLabelValues("completed").Set(float64(q.Completed))
3✔
216
                                queueTasks.WithLabelValues("pending").Set(float64(q.Pending))
3✔
217
                                queueTasks.WithLabelValues("failed").Set(float64(q.Failed))
3✔
218
                        case <-q.handlerStopChan:
2✔
219
                                return
2✔
220
                        }
221
                }
222
        }()
223
        err := q.asynqServer.Run(mux)
2✔
224
        if err != nil {
2✔
225
                q.logger.Error("error starting queue", "error", err)
×
226
        }
×
227
        return err
×
228
}
229

230
func (q *Queue) Shutdown() {
2✔
231
        q.logger.Info("stopping queue")
2✔
232
        close(q.handlerStopChan)
2✔
233
        if q.responsesClient != nil {
3✔
234
                q.responsesClient.Close()
1✔
235
        }
1✔
236
        if q.requestsClient != nil {
4✔
237
                q.requestsClient.Close()
2✔
238
        }
2✔
239
        if q.asynqInspector != nil {
4✔
240
                q.asynqInspector.Close()
2✔
241
        }
2✔
242
        if q.asynqServer != nil {
4✔
243
                q.asynqServer.Shutdown()
2✔
244
        }
2✔
245
}
246

247
func (q *Queue) SendResponse(responseType string, payload any, optionFuncs ...func(*MessageOptions)) error {
1✔
248
        if q.responsesClient == nil {
1✔
249
                return errors.New("response client is missing")
×
250
        }
×
251
        return q.sendMessage(q.responsesClient, responseType, payload, optionFuncs...)
1✔
252
}
253

254
func (q *Queue) SendRequest(requestType string, payload any, optionFuncs ...func(*MessageOptions)) error {
1✔
255
        if q.requestsClient == nil {
1✔
256
                return errors.New("requests client is missing")
×
257
        }
×
258
        return q.sendMessage(q.requestsClient, requestType, payload, optionFuncs...)
1✔
259

260
}
261

262
func (q *Queue) sendMessage(client *asynq.Client, messageType string, payload any, optionFuncs ...func(*MessageOptions)) error {
2✔
263
        options := &MessageOptions{
2✔
264
                retry:     3,
2✔
265
                timeout:   1 * time.Hour,
2✔
266
                retention: 72 * time.Hour,
2✔
267
        }
2✔
268
        for _, optionFunc := range optionFuncs {
2✔
269
                optionFunc(options)
×
270
        }
×
271

272
        pb, err := json.Marshal(payload)
2✔
273
        if err != nil {
2✔
274
                return err
×
275
        }
×
276
        t := asynq.NewTask(messageType, pb, asynq.MaxRetry(options.retry))
2✔
277
        q.logger.Debug(
2✔
278
                "sending message", "type", messageType,
2✔
279
                "payload", string(pb), "retries", options.retry,
2✔
280
                "timeout", options.timeout, "retention", options.retention)
2✔
281
        _, err = client.Enqueue(t, asynq.Timeout(options.timeout), asynq.Retention(options.retention))
2✔
282
        if err != nil {
2✔
283
                return fmt.Errorf("failed to enqueue %s request: %w", messageType, err)
×
284
        }
×
285
        return nil
2✔
286
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc