• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

looplab / eventhorizon / 12622512440

05 Jan 2025 07:58PM UTC coverage: 27.495% (-39.9%) from 67.361%
12622512440

Pull #419

github

web-flow
Merge b3c17d928 into ac3a97277
Pull Request #419: fix(ci): update to up/download-artifact v4

1769 of 6434 relevant lines covered (27.49%)

1.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/eventhandler/projector/eventhandler.go
1
// Copyright (c) 2017 - The Event Horizon authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
package projector
16

17
import (
18
        "context"
19
        "errors"
20
        "fmt"
21
        "time"
22

23
        eh "github.com/looplab/eventhorizon"
24
        "github.com/looplab/eventhorizon/repo/version"
25
        "github.com/looplab/eventhorizon/uuid"
26
)
27

28
// Projector is a projector of events onto models.
29
type Projector interface {
30
        // ProjectorType returns the type of the projector.
31
        ProjectorType() Type
32

33
        // Project projects an event onto a model and returns the updated model or
34
        // an error.
35
        Project(context.Context, eh.Event, eh.Entity) (eh.Entity, error)
36
}
37

38
// Type is the type of a projector, used as its unique identifier.
39
type Type string
40

41
// String returns the string representation of a projector type.
42
func (t Type) String() string {
×
43
        return string(t)
×
44
}
×
45

46
var (
47
        // ErrModelNotSet is when a model factory is not set on the EventHandler.
48
        ErrModelNotSet = errors.New("model not set")
49
        // ErrModelRemoved is when a model has been removed.
50
        ErrModelRemoved = errors.New("model removed")
51
        // Returned if the model has not incremented its version as predicted.
52
        ErrIncorrectProjectedEntityVersion = errors.New("incorrect projected entity version")
53
)
54

55
// Error is an error in the projector.
56
type Error struct {
57
        // Err is the error that happened when projecting the event.
58
        Err error
59
        // Projector is the projector where the error happened.
60
        Projector string
61
        // Event is the event being projected.
62
        Event eh.Event
63
        // EntityID of related operation.
64
        EntityID uuid.UUID
65
        // EntityVersion is the version of the entity.
66
        EntityVersion int
67
}
68

69
// Error implements the Error method of the errors.Error interface.
70
func (e *Error) Error() string {
×
71
        str := "projector '" + e.Projector + "': "
×
72

×
73
        if e.Err != nil {
×
74
                str += e.Err.Error()
×
75
        } else {
×
76
                str += "unknown error"
×
77
        }
×
78

79
        if e.EntityID != uuid.Nil {
×
80
                str += fmt.Sprintf(", Entity(%s, v%d)", e.EntityID, e.EntityVersion)
×
81
        }
×
82

83
        if e.Event != nil {
×
84
                str += ", " + e.Event.String()
×
85
        }
×
86

87
        return str
×
88
}
89

90
// Unwrap implements the errors.Unwrap method.
91
func (e *Error) Unwrap() error {
×
92
        return e.Err
×
93
}
×
94

95
// Cause implements the github.com/pkg/errors Unwrap method.
96
func (e *Error) Cause() error {
×
97
        return e.Unwrap()
×
98
}
×
99

100
// EventHandler is a CQRS projection handler to run a Projector implementation.
101
type EventHandler struct {
102
        projector              Projector
103
        repo                   eh.ReadWriteRepo
104
        factoryFn              func() eh.Entity
105
        useWait                bool
106
        useRetryOnce           bool
107
        useIrregularVersioning bool
108
        entityLookupFn         func(eh.Event) uuid.UUID
109
}
110

111
var _ = eh.EventHandler(&EventHandler{})
112

113
// NewEventHandler creates a new EventHandler.
114
func NewEventHandler(projector Projector, repo eh.ReadWriteRepo, options ...Option) *EventHandler {
×
115
        h := &EventHandler{
×
116
                projector:      projector,
×
117
                repo:           repo,
×
118
                entityLookupFn: defaultEntityLookupFn,
×
119
        }
×
120

×
121
        for _, option := range options {
×
122
                option(h)
×
123
        }
×
124

125
        return h
×
126
}
127

128
// Option is an option setter used to configure creation.
129
type Option func(*EventHandler)
130

131
// WithWait adds waiting for the correct version when projecting.
132
func WithWait() Option {
×
133
        return func(h *EventHandler) {
×
134
                h.useWait = true
×
135
        }
×
136
}
137

138
// WithRetryOnce adds a single retry in case of version mismatch. Useful to
139
// let racy projections finish witout an error.
140
func WithRetryOnce() Option {
×
141
        return func(h *EventHandler) {
×
142
                h.useRetryOnce = true
×
143
        }
×
144
}
145

146
// WithIrregularVersioning sets the option to allow gaps in the version numbers.
147
// This can be useful for projectors that project only some events of a larger
148
// aggregate, which will lead to gaps in the versions.
149
func WithIrregularVersioning() Option {
×
150
        return func(h *EventHandler) {
×
151
                h.useIrregularVersioning = true
×
152
        }
×
153
}
154

155
// WithEntityLookup can be used to provide an alternative ID (from the aggregate ID)
156
// for fetching the projected entity. The lookup func can for example extract
157
// another field from the event or use a static ID for some singleton-like projections.
158
func WithEntityLookup(f func(eh.Event) uuid.UUID) Option {
×
159
        return func(h *EventHandler) {
×
160
                h.entityLookupFn = f
×
161
        }
×
162
}
163

164
// defaultEntitypLookupFn does a lookup by the aggregate ID of the event.
165
func defaultEntityLookupFn(event eh.Event) uuid.UUID {
×
166
        return event.AggregateID()
×
167
}
×
168

169
// HandlerType implements the HandlerType method of the eventhorizon.EventHandler interface.
170
func (h *EventHandler) HandlerType() eh.EventHandlerType {
×
171
        return eh.EventHandlerType("projector_" + h.projector.ProjectorType())
×
172
}
×
173

174
// HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface.
175
// It will try to find the correct version of the model, waiting for it the projector
176
// has the WithWait option set.
177
func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error {
×
178
        if event == nil {
×
179
                return &Error{
×
180
                        Err:       eh.ErrMissingEvent,
×
181
                        Projector: h.projector.ProjectorType().String(),
×
182
                }
×
183
        }
×
184

185
        // Used to retry once in case of a version mismatch.
186
        triedOnce := false
×
187
retryOnce:
×
188

189
        findCtx := ctx
×
190
        // Irregular versioning skips min version loading.
×
191
        if !h.useIrregularVersioning {
×
192
                // Try to find it with a min version (and optional retry) if the
×
193
                // underlying repo supports it.
×
194
                findCtx = version.NewContextWithMinVersion(ctx, event.Version()-1)
×
195

×
196
                if h.useWait {
×
197
                        var cancel func()
×
198
                        findCtx, cancel = version.NewContextWithMinVersionWait(ctx, event.Version()-1)
×
199

×
200
                        defer cancel()
×
201
                }
×
202
        }
203

204
        id := h.entityLookupFn(event)
×
205

×
206
        // Get or create the model.
×
207
        entity, err := h.repo.Find(findCtx, id)
×
208
        if errors.Is(err, eh.ErrEntityNotFound) {
×
209
                if h.factoryFn == nil {
×
210
                        return &Error{
×
211
                                Err:       ErrModelNotSet,
×
212
                                Projector: h.projector.ProjectorType().String(),
×
213
                                Event:     event,
×
214
                                EntityID:  id,
×
215
                        }
×
216
                }
×
217

218
                entity = h.factoryFn()
×
219
        } else if errors.Is(err, eh.ErrIncorrectEntityVersion) {
×
220
                if h.useRetryOnce && !triedOnce {
×
221
                        triedOnce = true
×
222

×
223
                        time.Sleep(100 * time.Millisecond)
×
224

×
225
                        goto retryOnce
×
226
                }
227

228
                return &Error{
×
229
                        Err:       fmt.Errorf("could not load entity with correct version: %w", err),
×
230
                        Projector: h.projector.ProjectorType().String(),
×
231
                        Event:     event,
×
232
                        EntityID:  id,
×
233
                }
×
234
        } else if err != nil {
×
235
                return &Error{
×
236
                        Err:       fmt.Errorf("could not load entity: %w", err),
×
237
                        Projector: h.projector.ProjectorType().String(),
×
238
                        Event:     event,
×
239
                        EntityID:  id,
×
240
                }
×
241
        }
×
242

243
        // The entity should be one version behind the event.
244
        entityVersion := 0
×
245
        if entity, ok := entity.(eh.Versionable); ok {
×
246
                entityVersion = entity.AggregateVersion()
×
247

×
248
                // Ignore old/duplicate events.
×
249
                if event.Version() <= entityVersion {
×
250
                        return nil
×
251
                }
×
252

253
                // Irregular versioning has looser checks on the version.
254
                if event.Version() != entityVersion+1 && !h.useIrregularVersioning {
×
255
                        if h.useRetryOnce && !triedOnce {
×
256
                                triedOnce = true
×
257

×
258
                                time.Sleep(100 * time.Millisecond)
×
259

×
260
                                goto retryOnce
×
261
                        }
262

263
                        if entityVersion == 0 && event.Version() > 1 {
×
264
                                return &Error{
×
265
                                        Err:           ErrModelRemoved,
×
266
                                        Projector:     h.projector.ProjectorType().String(),
×
267
                                        Event:         event,
×
268
                                        EntityID:      id,
×
269
                                        EntityVersion: entityVersion,
×
270
                                }
×
271
                        }
×
272

273
                        return &Error{
×
274
                                Err:           eh.ErrIncorrectEntityVersion,
×
275
                                Projector:     h.projector.ProjectorType().String(),
×
276
                                Event:         event,
×
277
                                EntityID:      id,
×
278
                                EntityVersion: entityVersion,
×
279
                        }
×
280
                }
281
        }
282

283
        // Run the projection, which will possibly increment the version.
284
        newEntity, err := h.projector.Project(ctx, event, entity)
×
285
        if err != nil {
×
286
                return &Error{
×
287
                        Err:           fmt.Errorf("could not project: %w", err),
×
288
                        Projector:     h.projector.ProjectorType().String(),
×
289
                        Event:         event,
×
290
                        EntityID:      id,
×
291
                        EntityVersion: entityVersion,
×
292
                }
×
293
        }
×
294

295
        // The model should now be at the same version as the event.
296
        if newEntity, ok := newEntity.(eh.Versionable); ok {
×
297
                entityVersion = newEntity.AggregateVersion()
×
298

×
299
                if newEntity.AggregateVersion() != event.Version() {
×
300
                        return &Error{
×
301
                                Err:           ErrIncorrectProjectedEntityVersion,
×
302
                                Projector:     h.projector.ProjectorType().String(),
×
303
                                Event:         event,
×
304
                                EntityID:      id,
×
305
                                EntityVersion: entityVersion,
×
306
                        }
×
307
                }
×
308
        }
309

310
        // Update or remove the model.
311
        if newEntity != nil {
×
312
                if newEntity.EntityID() != id {
×
313
                        return &Error{
×
314
                                Err:           fmt.Errorf("incorrect entity ID after projection"),
×
315
                                Projector:     h.projector.ProjectorType().String(),
×
316
                                Event:         event,
×
317
                                EntityID:      id,
×
318
                                EntityVersion: entityVersion,
×
319
                        }
×
320
                }
×
321

322
                if err := h.repo.Save(ctx, newEntity); err != nil {
×
323
                        return &Error{
×
324
                                Err:           fmt.Errorf("could not save: %w", err),
×
325
                                Projector:     h.projector.ProjectorType().String(),
×
326
                                Event:         event,
×
327
                                EntityID:      id,
×
328
                                EntityVersion: entityVersion,
×
329
                        }
×
330
                }
×
331
        } else {
×
332
                if err := h.repo.Remove(ctx, id); err != nil {
×
333
                        return &Error{
×
334
                                Err:           fmt.Errorf("could not remove: %w", err),
×
335
                                Projector:     h.projector.ProjectorType().String(),
×
336
                                Event:         event,
×
337
                                EntityID:      id,
×
338
                                EntityVersion: entityVersion,
×
339
                        }
×
340
                }
×
341
        }
342

343
        return nil
×
344
}
345

346
// SetEntityFactory sets a factory function that creates concrete entity types.
347
func (h *EventHandler) SetEntityFactory(f func() eh.Entity) {
×
348
        h.factoryFn = f
×
349
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc