• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

looplab / eventhorizon / 3620309305

pending completion
3620309305

push

github

GitHub
Merge pull request #395 from klowdo/main

23 of 23 new or added lines in 1 file covered. (100.0%)

4156 of 6063 relevant lines covered (68.55%)

78.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.7
/eventbus/redis/eventbus.go
1
// Copyright (c) 2014 - The Event Horizon authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package redis
16

17
import (
18
        "context"
19
        "errors"
20
        "fmt"
21
        "log"
22
        "strings"
23
        "sync"
24
        "time"
25

26
        "github.com/go-redis/redis/v8"
27

28
        eh "github.com/looplab/eventhorizon"
29
        "github.com/looplab/eventhorizon/codec/json"
30
)
31

32
// EventBus is a local event bus that delegates handling of published events
33
// to all matching registered handlers, in order of registration.
34
type EventBus struct {
35
        appID        string
36
        clientID     string
37
        streamName   string
38
        client       *redis.Client
39
        clientOpts   *redis.Options
40
        registered   map[eh.EventHandlerType]struct{}
41
        registeredMu sync.RWMutex
42
        errCh        chan error
43
        cctx         context.Context
44
        cancel       context.CancelFunc
45
        wg           sync.WaitGroup
46
        codec        eh.EventCodec
47
}
48

49
// NewEventBus creates an EventBus, with optional settings.
50
func NewEventBus(addr, appID, clientID string, options ...Option) (*EventBus, error) {
3✔
51
        ctx, cancel := context.WithCancel(context.Background())
3✔
52

3✔
53
        b := &EventBus{
3✔
54
                appID:      appID,
3✔
55
                clientID:   clientID,
3✔
56
                streamName: appID + "_events",
3✔
57
                registered: map[eh.EventHandlerType]struct{}{},
3✔
58
                errCh:      make(chan error, 100),
3✔
59
                cctx:       ctx,
3✔
60
                cancel:     cancel,
3✔
61
                codec:      &json.EventCodec{},
3✔
62
        }
3✔
63

3✔
64
        // Apply configuration options.
3✔
65
        for _, option := range options {
6✔
66
                if option == nil {
3✔
67
                        continue
×
68
                }
69

70
                if err := option(b); err != nil {
3✔
71
                        return nil, fmt.Errorf("error while applying option: %w", err)
×
72
                }
×
73
        }
74

75
        // Default client options.
76
        if b.clientOpts == nil {
3✔
77
                b.clientOpts = &redis.Options{
×
78
                        Addr: addr,
×
79
                }
×
80
        }
×
81

82
        // Create client and check connection.
83
        b.client = redis.NewClient(b.clientOpts)
3✔
84
        if res, err := b.client.Ping(b.cctx).Result(); err != nil || res != "PONG" {
3✔
85
                return nil, fmt.Errorf("could not check Redis server: %w", err)
×
86
        }
×
87

88
        return b, nil
3✔
89
}
90

91
// Option is an option setter used to configure creation.
92
type Option func(*EventBus) error
93

94
// WithCodec uses the specified codec for encoding events.
95
func WithCodec(codec eh.EventCodec) Option {
×
96
        return func(b *EventBus) error {
×
97
                b.codec = codec
×
98

×
99
                return nil
×
100
        }
×
101
}
102

103
// WithRedisOptions uses the Redis options for the underlying client, instead of the defaults.
104
func WithRedisOptions(opts *redis.Options) Option {
3✔
105
        return func(b *EventBus) error {
6✔
106
                b.clientOpts = opts
3✔
107

3✔
108
                return nil
3✔
109
        }
3✔
110
}
111

112
// HandlerType implements the HandlerType method of the eventhorizon.EventHandler interface.
113
func (b *EventBus) HandlerType() eh.EventHandlerType {
×
114
        return "eventbus"
×
115
}
×
116

117
const (
118
        aggregateTypeKey = "aggregate_type"
119
        eventTypeKey     = "event_type"
120
        dataKey          = "data"
121
)
122

123
// HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface.
124
func (b *EventBus) HandleEvent(ctx context.Context, event eh.Event) error {
4✔
125
        data, err := b.codec.MarshalEvent(ctx, event)
4✔
126
        if err != nil {
4✔
127
                return fmt.Errorf("could not marshal event: %w", err)
×
128
        }
×
129

130
        args := &redis.XAddArgs{
4✔
131
                Stream: b.streamName,
4✔
132
                Values: map[string]interface{}{
4✔
133
                        aggregateTypeKey: event.AggregateType().String(),
4✔
134
                        eventTypeKey:     event.EventType().String(),
4✔
135
                        dataKey:          data,
4✔
136
                },
4✔
137
        }
4✔
138
        if _, err := b.client.XAdd(ctx, args).Result(); err != nil {
4✔
139
                return fmt.Errorf("could not publish event: %w", err)
×
140
        }
×
141

142
        return nil
4✔
143
}
144

145
// AddHandler implements the AddHandler method of the eventhorizon.EventBus interface.
146
func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.EventHandler) error {
11✔
147
        if m == nil {
12✔
148
                return eh.ErrMissingMatcher
1✔
149
        }
1✔
150

151
        if h == nil {
11✔
152
                return eh.ErrMissingHandler
1✔
153
        }
1✔
154

155
        // Check handler existence.
156
        b.registeredMu.Lock()
9✔
157
        defer b.registeredMu.Unlock()
9✔
158

9✔
159
        if _, ok := b.registered[h.HandlerType()]; ok {
10✔
160
                return eh.ErrHandlerAlreadyAdded
1✔
161
        }
1✔
162

163
        // Get or create the subscription.
164
        // TODO: Filter subscription.
165
        groupName := fmt.Sprintf("%s_%s", b.appID, h.HandlerType())
8✔
166

8✔
167
        res, err := b.client.XGroupCreateMkStream(ctx, b.streamName, groupName, "$").Result()
8✔
168
        if err != nil {
9✔
169
                // Ignore group exists non-errors.
1✔
170
                if !strings.HasPrefix(err.Error(), "BUSYGROUP") {
1✔
171
                        return fmt.Errorf("could not create consumer group: %w", err)
×
172
                }
×
173
        } else if res != "OK" {
7✔
174
                return fmt.Errorf("could not create consumer group: %s", res)
×
175
        }
×
176

177
        // Register handler.
178
        b.registered[h.HandlerType()] = struct{}{}
8✔
179

8✔
180
        b.wg.Add(1)
8✔
181

8✔
182
        // Handle until context is cancelled.
8✔
183
        go b.handle(m, h, groupName)
8✔
184

8✔
185
        return nil
8✔
186
}
187

188
// Errors implements the Errors method of the eventhorizon.EventBus interface.
189
func (b *EventBus) Errors() <-chan error {
3✔
190
        return b.errCh
3✔
191
}
3✔
192

193
// Close implements the Close method of the eventhorizon.EventBus interface.
194
func (b *EventBus) Close() error {
2✔
195
        b.cancel()
2✔
196
        b.wg.Wait()
2✔
197

2✔
198
        return b.client.Close()
2✔
199
}
2✔
200

201
// Handles all events coming in on the channel.
202
func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, groupName string) {
8✔
203
        defer b.wg.Done()
8✔
204

8✔
205
        handler := b.handler(m, h, groupName)
8✔
206

8✔
207
        for {
28✔
208
                streams, err := b.client.XReadGroup(b.cctx, &redis.XReadGroupArgs{
20✔
209
                        Group:    groupName,
20✔
210
                        Consumer: groupName + "_" + b.clientID,
20✔
211
                        Streams:  []string{b.streamName, ">"},
20✔
212
                }).Result()
20✔
213
                if errors.Is(err, context.Canceled) {
27✔
214
                        break
7✔
215
                } else if err != nil {
12✔
216
                        err = fmt.Errorf("could not receive: %w", err)
×
217
                        select {
×
218
                        case b.errCh <- &eh.EventBusError{Err: err}:
×
219
                        default:
×
220
                                log.Printf("eventhorizon: missed error in Redis event bus: %s", err)
×
221
                        }
222

223
                        // Retry the receive loop if there was an error.
224
                        time.Sleep(time.Second)
×
225

×
226
                        continue
×
227
                }
228

229
                // Handle all messages from group read.
230
                for _, stream := range streams {
24✔
231
                        if stream.Stream != b.streamName {
12✔
232
                                continue
×
233
                        }
234

235
                        for _, msg := range stream.Messages {
24✔
236
                                handler(b.cctx, &msg)
12✔
237
                        }
12✔
238
                }
239
        }
240
}
241

242
func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, groupName string) func(ctx context.Context, msg *redis.XMessage) {
8✔
243
        return func(ctx context.Context, msg *redis.XMessage) {
20✔
244
                data, ok := msg.Values[dataKey].(string)
12✔
245
                if !ok {
12✔
246
                        err := fmt.Errorf("event data is of incorrect type %T", msg.Values[dataKey])
×
247
                        select {
×
248
                        case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx}:
×
249
                        default:
×
250
                                log.Printf("eventhorizon: missed error in Redis event bus: %s", err)
×
251
                        }
252

253
                        // TODO: Nack if possible.
254
                        return
×
255
                }
256

257
                event, ctx, err := b.codec.UnmarshalEvent(ctx, []byte(data))
12✔
258
                if err != nil {
12✔
259
                        err = fmt.Errorf("could not unmarshal event: %w", err)
×
260
                        select {
×
261
                        case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx}:
×
262
                        default:
×
263
                                log.Printf("eventhorizon: missed error in Redis event bus: %s", err)
×
264
                        }
265

266
                        // TODO: Nack if possible.
267
                        return
×
268
                }
269

270
                // Ignore non-matching events.
271
                if !m.Match(event) {
14✔
272
                        if _, err := b.client.XAck(ctx, b.streamName, groupName, msg.ID).Result(); err != nil {
2✔
273
                                err = fmt.Errorf("could not ack non-matching event: %w", err)
×
274
                                select {
×
275
                                case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
×
276
                                default:
×
277
                                        log.Printf("eventhorizon: missed error in Redis event bus: %s", err)
×
278
                                }
279
                        }
280

281
                        return
2✔
282
                }
283

284
                // Handle the event if it did match.
285
                if err := h.HandleEvent(ctx, event); err != nil {
11✔
286
                        err = fmt.Errorf("could not handle event (%s): %w", h.HandlerType(), err)
1✔
287
                        select {
1✔
288
                        case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
1✔
289
                        default:
×
290
                                log.Printf("eventhorizon: missed error in Redis event bus: %s", err)
×
291
                        }
292

293
                        // TODO: Nack if possible.
294
                        return
1✔
295
                }
296

297
                _, err = b.client.XAck(ctx, b.streamName, groupName, msg.ID).Result()
9✔
298
                if err != nil {
9✔
299
                        err = fmt.Errorf("could not ack handled event: %w", err)
×
300
                        select {
×
301
                        case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
×
302
                        default:
×
303
                                log.Printf("eventhorizon: missed error in Redis event bus: %s", err)
×
304
                        }
305
                }
306
        }
307
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc