• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mehdihadeli / Go-MediatR / 4588657665

02 Apr 2023 12:23PM UTC coverage: 88.889%. First build
4588657665

push

github

GitHub
merge main to dev (#7)

46 of 46 new or added lines in 1 file covered. (100.0%)

144 of 162 relevant lines covered (88.89%)

1.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.89
/mediatr.go
1
package mediatr
2

3
import (
4
        "context"
5

6
        "github.com/ahmetb/go-linq/v3"
7
        "github.com/goccy/go-reflect"
8
        "github.com/pkg/errors"
9
)
10

11
// RequestHandlerFunc is a continuation for the next task to execute in the pipeline
12
type RequestHandlerFunc func() (interface{}, error)
13

14
// PipelineBehavior is a Pipeline behavior for wrapping the inner handler.
15
type PipelineBehavior interface {
16
        Handle(ctx context.Context, request interface{}, next RequestHandlerFunc) (interface{}, error)
17
}
18

19
type RequestHandler[TRequest any, TResponse any] interface {
20
        Handle(ctx context.Context, request TRequest) (TResponse, error)
21
}
22

23
type RequestHandlerFactory[TRequest any, TResponse any] func() RequestHandler[TRequest, TResponse]
24

25
type NotificationHandler[TNotification any] interface {
26
        Handle(ctx context.Context, notification TNotification) error
27
}
28

29
type NotificationHandlerFactory[TNotification any] func() NotificationHandler[TNotification]
30

31
var requestHandlersRegistrations = map[reflect.Type]interface{}{}
32
var notificationHandlersRegistrations = map[reflect.Type][]interface{}{}
33
var pipelineBehaviours []interface{} = []interface{}{}
34

35
type Unit struct{}
36

37
func registerRequestHandler[TRequest any, TResponse any](handler any) error {
1✔
38
        var request TRequest
1✔
39
        requestType := reflect.TypeOf(request)
1✔
40

1✔
41
        _, exist := requestHandlersRegistrations[requestType]
1✔
42
        if exist {
2✔
43
                // each request in request/response strategy should have just one handler
1✔
44
                return errors.Errorf("registered handler already exists in the registry for message %s", requestType.String())
1✔
45
        }
1✔
46

47
        requestHandlersRegistrations[requestType] = handler
1✔
48

1✔
49
        return nil
1✔
50
}
51

52
// RegisterRequestHandler register the request handler to mediatr registry.
53
func RegisterRequestHandler[TRequest any, TResponse any](handler RequestHandler[TRequest, TResponse]) error {
1✔
54
        return registerRequestHandler[TRequest, TResponse](handler)
1✔
55
}
1✔
56

57
// RegisterRequestHandlerFactory register the request handler factory to mediatr registry.
58
func RegisterRequestHandlerFactory[TRequest any, TResponse any](factory RequestHandlerFactory[TRequest, TResponse]) error {
1✔
59
        return registerRequestHandler[TRequest, TResponse](factory)
1✔
60
}
1✔
61

62
// RegisterRequestPipelineBehaviors register the request behaviors to mediatr registry.
63
func RegisterRequestPipelineBehaviors(behaviours ...PipelineBehavior) error {
1✔
64
        for _, behavior := range behaviours {
2✔
65
                behaviorType := reflect.TypeOf(behavior)
1✔
66

1✔
67
                existsPipe := existsPipeType(behaviorType)
1✔
68
                if existsPipe {
2✔
69
                        return errors.Errorf("registered behavior already exists in the registry.")
1✔
70
                }
1✔
71

72
                pipelineBehaviours = append(pipelineBehaviours, behavior)
1✔
73
        }
74

75
        return nil
1✔
76
}
77

78
func registerNotificationHandler[TEvent any](handler any) error {
1✔
79
        var event TEvent
1✔
80
        eventType := reflect.TypeOf(event)
1✔
81

1✔
82
        handlers, exist := notificationHandlersRegistrations[eventType]
1✔
83
        if !exist {
2✔
84
                notificationHandlersRegistrations[eventType] = []interface{}{handler}
1✔
85
                return nil
1✔
86
        }
1✔
87

88
        notificationHandlersRegistrations[eventType] = append(handlers, handler)
1✔
89

1✔
90
        return nil
1✔
91
}
92

93
// RegisterNotificationHandler register the notification handler to mediatr registry.
94
func RegisterNotificationHandler[TEvent any](handler NotificationHandler[TEvent]) error {
1✔
95
        return registerNotificationHandler[TEvent](handler)
1✔
96
}
1✔
97

98
// RegisterNotificationHandlerFactory register the notification handler factory to mediatr registry.
99
func RegisterNotificationHandlerFactory[TEvent any](factory NotificationHandlerFactory[TEvent]) error {
1✔
100
        return registerNotificationHandler[TEvent](factory)
1✔
101
}
1✔
102

103
// RegisterNotificationHandlers register the notification handlers to mediatr registry.
104
func RegisterNotificationHandlers[TEvent any](handlers ...NotificationHandler[TEvent]) error {
1✔
105
        if len(handlers) == 0 {
1✔
106
                return errors.New("no handlers provided")
×
107
        }
×
108

109
        for _, handler := range handlers {
2✔
110
                err := RegisterNotificationHandler(handler)
1✔
111
                if err != nil {
1✔
112
                        return err
×
113
                }
×
114
        }
115

116
        return nil
1✔
117
}
118

119
// RegisterNotificationHandlers register the notification handlers factories to mediatr registry.
120
func RegisterNotificationHandlersFactories[TEvent any](factories ...NotificationHandlerFactory[TEvent]) error {
1✔
121
        if len(factories) == 0 {
1✔
122
                return errors.New("no handlers provided")
×
123
        }
×
124

125
        for _, handler := range factories {
2✔
126
                err := RegisterNotificationHandlerFactory[TEvent](handler)
1✔
127
                if err != nil {
1✔
128
                        return err
×
129
                }
×
130
        }
131

132
        return nil
1✔
133
}
134

135
func ClearRequestRegistrations() {
1✔
136
        requestHandlersRegistrations = map[reflect.Type]interface{}{}
1✔
137
}
1✔
138

139
func ClearNotificationRegistrations() {
1✔
140
        notificationHandlersRegistrations = map[reflect.Type][]interface{}{}
1✔
141
}
1✔
142

143
func buildRequestHandler[TRequest any, TResponse any](handler any) (RequestHandler[TRequest, TResponse], bool) {
1✔
144
        handlerValue, ok := handler.(RequestHandler[TRequest, TResponse])
1✔
145
        if !ok {
2✔
146
                factory, ok := handler.(RequestHandlerFactory[TRequest, TResponse])
1✔
147
                if !ok {
1✔
148
                        return nil, false
×
149
                }
×
150

151
                return factory(), true
1✔
152
        }
153

154
        return handlerValue, true
1✔
155
}
156

157
// Send the request to its corresponding request handler.
158
func Send[TRequest any, TResponse any](ctx context.Context, request TRequest) (TResponse, error) {
1✔
159
        requestType := reflect.TypeOf(request)
1✔
160
        var response TResponse
1✔
161
        handler, ok := requestHandlersRegistrations[requestType]
1✔
162
        if !ok {
2✔
163
                // request-response strategy should have exactly one handler and if we can't find a corresponding handler, we should return an error
1✔
164
                return *new(TResponse), errors.Errorf("no handler for request %T", request)
1✔
165
        }
1✔
166

167
        handlerValue, ok := buildRequestHandler[TRequest, TResponse](handler)
1✔
168
        if !ok {
1✔
169
                return *new(TResponse), errors.Errorf("handler for request %T is not a Handler", request)
×
170
        }
×
171

172
        if len(pipelineBehaviours) > 0 {
2✔
173
                var reversPipes = reversOrder(pipelineBehaviours)
1✔
174

1✔
175
                var lastHandler RequestHandlerFunc = func() (interface{}, error) {
2✔
176
                        return handlerValue.Handle(ctx, request)
1✔
177
                }
1✔
178

179
                aggregateResult := linq.From(reversPipes).AggregateWithSeedT(lastHandler, func(next RequestHandlerFunc, pipe PipelineBehavior) RequestHandlerFunc {
2✔
180
                        pipeValue := pipe
1✔
181
                        nexValue := next
1✔
182

1✔
183
                        var handlerFunc RequestHandlerFunc = func() (interface{}, error) {
2✔
184
                                return pipeValue.Handle(ctx, request, nexValue)
1✔
185
                        }
1✔
186

187
                        return handlerFunc
1✔
188
                })
189

190
                v := aggregateResult.(RequestHandlerFunc)
1✔
191
                response, err := v()
1✔
192

1✔
193
                if err != nil {
1✔
194
                        return *new(TResponse), errors.Wrap(err, "error handling request")
×
195
                }
×
196

197
                return response.(TResponse), nil
1✔
198
        } else {
1✔
199
                res, err := handlerValue.Handle(ctx, request)
1✔
200
                if err != nil {
2✔
201
                        return *new(TResponse), errors.Wrap(err, "error handling request")
1✔
202
                }
1✔
203

204
                response = res
1✔
205
        }
206

207
        return response, nil
1✔
208
}
209

210
func buildNotificationHandler[TNotification any](handler any) (NotificationHandler[TNotification], bool) {
1✔
211
        handlerValue, ok := handler.(NotificationHandler[TNotification])
1✔
212
        if !ok {
2✔
213
                factory, ok := handler.(NotificationHandlerFactory[TNotification])
1✔
214
                if !ok {
1✔
215
                        return nil, false
×
216
                }
×
217

218
                return factory(), true
1✔
219
        }
220

221
        return handlerValue, true
1✔
222
}
223

224
// Publish the notification event to its corresponding notification handler.
225
func Publish[TNotification any](ctx context.Context, notification TNotification) error {
1✔
226
        eventType := reflect.TypeOf(notification)
1✔
227

1✔
228
        handlers, ok := notificationHandlersRegistrations[eventType]
1✔
229
        if !ok {
2✔
230
                // notification strategy should have zero or more handlers, so it should run without any error if we can't find a corresponding handler
1✔
231
                return nil
1✔
232
        }
1✔
233

234
        for _, handler := range handlers {
2✔
235
                handlerValue, ok := buildNotificationHandler[TNotification](handler)
1✔
236

1✔
237
                if !ok {
1✔
238
                        return errors.Errorf("handler for notification %T is not a Handler", notification)
×
239
                }
×
240

241
                err := handlerValue.Handle(ctx, notification)
1✔
242
                if err != nil {
2✔
243
                        return errors.Wrap(err, "error handling notification")
1✔
244
                }
1✔
245
        }
246

247
        return nil
1✔
248
}
249

250
func reversOrder(values []interface{}) []interface{} {
1✔
251
        var reverseValues []interface{}
1✔
252

1✔
253
        for i := len(values) - 1; i >= 0; i-- {
2✔
254
                reverseValues = append(reverseValues, values[i])
1✔
255
        }
1✔
256

257
        return reverseValues
1✔
258
}
259

260
func existsPipeType(p reflect.Type) bool {
1✔
261
        for _, pipe := range pipelineBehaviours {
2✔
262
                if reflect.TypeOf(pipe) == p {
2✔
263
                        return true
1✔
264
                }
1✔
265
        }
266

267
        return false
1✔
268
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc