• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

optimizely / go-sdk / 18383930381

09 Oct 2025 05:24PM UTC coverage: 91.794% (+0.1%) from 91.688%
18383930381

Pull #421

github

esrakartalOpt
[FSSDK-11915] Fix Go Region EU problem
Pull Request #421: [FSSDK-11915] Fix Go Region EU problem

3 of 5 new or added lines in 1 file covered. (60.0%)

13 existing lines in 1 file now uncovered.

5425 of 5910 relevant lines covered (91.79%)

7953.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.47
/pkg/event/processor.go
1
/****************************************************************************
2
 * Copyright 2019-2020,2023 Optimizely, Inc. and contributors               *
3
 *                                                                          *
4
 * Licensed under the Apache License, Version 2.0 (the "License");          *
5
 * you may not use this file except in compliance with the License.         *
6
 * You may obtain a copy of the License at                                  *
7
 *                                                                          *
8
 *    http://www.apache.org/licenses/LICENSE-2.0                            *
9
 *                                                                          *
10
 * Unless required by applicable law or agreed to in writing, software      *
11
 * distributed under the License is distributed on an "AS IS" BASIS,        *
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
13
 * See the License for the specific language governing permissions and      *
14
 * limitations under the License.                                           *
15
 ***************************************************************************/
16

17
// Package event //
18
package event
19

20
import (
21
        "context"
22
        "errors"
23
        "fmt"
24
        "sync"
25
        "time"
26

27
        "golang.org/x/sync/semaphore"
28

29
        "github.com/optimizely/go-sdk/v2/pkg/logging"
30
        "github.com/optimizely/go-sdk/v2/pkg/metrics"
31
        "github.com/optimizely/go-sdk/v2/pkg/notification"
32
        "github.com/optimizely/go-sdk/v2/pkg/registry"
33
)
34

35
// Processor processes events
36
type Processor interface {
37
        ProcessEvent(event UserEvent) bool
38
        OnEventDispatch(callback func(logEvent LogEvent)) (int, error)
39
        RemoveOnEventDispatch(id int) error
40
}
41

42
// BatchEventProcessor is used out of the box by the SDK to queue up and batch events to be sent to the Optimizely
43
// log endpoint for results processing.
44
type BatchEventProcessor struct {
45
        sdkKey          string
46
        MaxQueueSize    int           // max size of the queue before flush
47
        FlushInterval   time.Duration // in milliseconds
48
        BatchSize       int
49
        EventEndPoint   string
50
        Q               Queue
51
        flushLock       sync.Mutex
52
        Ticker          *time.Ticker
53
        EventDispatcher Dispatcher
54
        processing      *semaphore.Weighted
55
        logger          logging.OptimizelyLogProducer
56
        metricsRegistry metrics.Registry
57
}
58

59
// DefaultBatchSize holds the default value for the batch size
60
const DefaultBatchSize = 10
61

62
// DefaultEventQueueSize holds the default value for the event queue size
63
const DefaultEventQueueSize = 2000
64

65
// DefaultEventFlushInterval holds the default value for the event flush interval
66
const DefaultEventFlushInterval = 30 * time.Second
67

68
// CloseEventDispatchWaitTime holds the checking interval for the dispatching events on client close
69
const CloseEventDispatchWaitTime = 500 * time.Millisecond
70

71
// CloseEventDispatchTimeout holds the timeout value for the waiting for the dispatching events on client close
72
const CloseEventDispatchTimeout = 30 * time.Second
73

74
// EventEndPoints holds the event endpoints for different regions
75
var EventEndPoints = map[string]string{
76
        "US": "https://logx.optimizely.com/v1/events",
77
        "EU": "https://eu.logx.optimizely.com/v1/events",
78
}
79

80
const maxFlushWorkers = 1
81

82
// BPOptionConfig is the BatchProcessor options that give you the ability to add one more more options before the processor is initialized.
83
type BPOptionConfig func(qp *BatchEventProcessor)
84

85
// WithBatchSize sets the batch size as a config option to be passed into the NewProcessor method
86
func WithBatchSize(bsize int) BPOptionConfig {
4✔
87
        return func(qp *BatchEventProcessor) {
8✔
88
                qp.BatchSize = bsize
4✔
89
        }
4✔
90
}
91

92
// WithEventEndPoint sets the end point as a config option to be passed into the NewProcessor method
93
func WithEventEndPoint(endPoint string) BPOptionConfig {
2✔
94
        return func(qp *BatchEventProcessor) {
4✔
95
                qp.EventEndPoint = endPoint
2✔
96
        }
2✔
97
}
98

99
// WithQueueSize sets the queue size as a config option to be passed into the NewProcessor method
100
func WithQueueSize(qsize int) BPOptionConfig {
18✔
101
        return func(qp *BatchEventProcessor) {
36✔
102
                qp.MaxQueueSize = qsize
18✔
103
        }
18✔
104
}
105

106
// WithFlushInterval sets the flush interval as a config option to be passed into the NewProcessor method
107
func WithFlushInterval(flushInterval time.Duration) BPOptionConfig {
10✔
108
        return func(qp *BatchEventProcessor) {
20✔
109
                qp.FlushInterval = flushInterval
10✔
110
        }
10✔
111
}
112

113
// WithQueue sets the Processor Queue as a config option to be passed into the NewProcessor method
114
func WithQueue(q Queue) BPOptionConfig {
17✔
115
        return func(qp *BatchEventProcessor) {
34✔
116
                qp.Q = q
17✔
117
        }
17✔
118
}
119

120
// WithEventDispatcher sets the Processor Dispatcher as a config option to be passed into the NewProcessor method
121
func WithEventDispatcher(d Dispatcher) BPOptionConfig {
22✔
122
        return func(qp *BatchEventProcessor) {
44✔
123
                qp.EventDispatcher = d
22✔
124
        }
22✔
125
}
126

127
// WithSDKKey sets the SDKKey used to register for notifications.  This should be removed when the project
128
// config supports sdk key.
129
func WithSDKKey(sdkKey string) BPOptionConfig {
3✔
130
        return func(qp *BatchEventProcessor) {
6✔
131
                qp.sdkKey = sdkKey
3✔
132
        }
3✔
133
}
134

135
// WithEventDispatcherMetrics sets metrics into the NewProcessor method
136
func WithEventDispatcherMetrics(metricsRegistry metrics.Registry) BPOptionConfig {
×
137
        return func(qp *BatchEventProcessor) {
×
138
                qp.metricsRegistry = metricsRegistry
×
139
        }
×
140
}
141

142
// NewBatchEventProcessor returns a new instance of BatchEventProcessor with queueSize and flushInterval
143
func NewBatchEventProcessor(options ...BPOptionConfig) *BatchEventProcessor {
27✔
144
        p := &BatchEventProcessor{processing: semaphore.NewWeighted(int64(maxFlushWorkers))}
27✔
145

27✔
146
        for _, opt := range options {
103✔
147
                opt(p)
76✔
148
        }
76✔
149

150
        p.logger = logging.GetLogger(p.sdkKey, "BatchEventProcessor")
27✔
151

27✔
152
        if p.MaxQueueSize == 0 {
36✔
153
                p.MaxQueueSize = defaultQueueSize
9✔
154
        }
9✔
155

156
        if p.FlushInterval == 0 {
44✔
157
                p.FlushInterval = DefaultEventFlushInterval
17✔
158
        }
17✔
159

160
        if p.BatchSize == 0 {
50✔
161
                p.BatchSize = DefaultBatchSize
23✔
162
        }
23✔
163

164
        if p.EventEndPoint == "" {
52✔
165
                p.EventEndPoint = EventEndPoints["US"]
25✔
166
        }
25✔
167

168
        if p.BatchSize > p.MaxQueueSize {
28✔
169
                p.logger.Warning(
1✔
170
                        fmt.Sprintf("Batch size %d is larger than queue size %d.  Setting to defaults",
1✔
171
                                p.BatchSize, p.MaxQueueSize))
1✔
172

1✔
173
                p.BatchSize = DefaultBatchSize
1✔
174
                p.MaxQueueSize = defaultQueueSize
1✔
175
        }
1✔
176

177
        if p.Q == nil {
37✔
178
                p.Q = NewInMemoryQueueWithLogger(p.MaxQueueSize, p.logger)
10✔
179
        }
10✔
180

181
        if p.EventDispatcher == nil {
32✔
182
                dispatcher := NewQueueEventDispatcher(p.sdkKey, p.metricsRegistry)
5✔
183
                p.EventDispatcher = dispatcher
5✔
184
        }
5✔
185

186
        return p
27✔
187
}
188

189
// Start does not do any initialization, just starts the ticker
190
func (p *BatchEventProcessor) Start(ctx context.Context) {
18✔
191

18✔
192
        p.logger.Info("Batch event processor started")
18✔
193
        p.startTicker(ctx)
18✔
194
}
18✔
195

196
// ProcessEvent takes the given user event (can be an impression or conversion event) and queues it up to be dispatched
197
// to the Optimizely log endpoint. A dispatch happens when we flush the events, which can happen on a set interval or
198
// when the specified batch size (defaulted to 10) is reached.
199
func (p *BatchEventProcessor) ProcessEvent(event UserEvent) bool {
160✔
200

160✔
201
        if p.Q.Size() >= p.MaxQueueSize {
162✔
202
                p.logger.Warning("MaxQueueSize has been met. Discarding event")
2✔
203
                return false
2✔
204
        }
2✔
205

206
        p.Q.Add(event)
158✔
207

158✔
208
        if p.Q.Size() < p.BatchSize {
262✔
209
                return true
104✔
210
        }
104✔
211

212
        if p.processing.TryAcquire(1) {
58✔
213
                // it doesn't matter if the timer has kicked in here.
4✔
214
                // we just want to start one go routine when the batch size is met.
4✔
215
                p.logger.Debug("batch size reached.  Flushing routine being called")
4✔
216
                go func() {
8✔
217
                        p.flushEvents()
4✔
218
                        p.processing.Release(1)
4✔
219
                }()
4✔
220
        }
221

222
        return true
54✔
223
}
224

225
// eventsCount returns size of an event queue
226
func (p *BatchEventProcessor) eventsCount() int {
2,034,064✔
227
        return p.Q.Size()
2,034,064✔
228
}
2,034,064✔
229

230
// getEvents returns events from event queue for count
231
func (p *BatchEventProcessor) getEvents(count int) []interface{} {
28✔
232
        return p.Q.Get(count)
28✔
233
}
28✔
234

235
// remove removes events from queue for count
236
func (p *BatchEventProcessor) remove(count int) []interface{} {
26✔
237
        return p.Q.Remove(count)
26✔
238
}
26✔
239

240
// StartTicker starts new ticker for flushing events
241
func (p *BatchEventProcessor) startTicker(ctx context.Context) {
18✔
242
        if p.Ticker != nil {
18✔
UNCOV
243
                return
×
UNCOV
244
        }
×
245
        p.Ticker = time.NewTicker(p.FlushInterval)
18✔
246

18✔
247
        for {
2,034,082✔
248
                select {
2,034,064✔
249
                case <-p.Ticker.C:
2,034,047✔
250
                        p.flushEvents()
2,034,047✔
251
                case <-ctx.Done():
13✔
252
                        p.logger.Debug("Event processor stopped, flushing events.")
13✔
253
                        p.flushEvents()
13✔
254
                        d, ok := p.EventDispatcher.(*QueueEventDispatcher)
13✔
255
                        if ok {
14✔
256
                                d.flushEvents()
1✔
257
                                d.waitForDispatchingEventsOnClose(CloseEventDispatchTimeout)
1✔
258
                        }
1✔
259
                        p.Ticker.Stop()
13✔
260
                        return
13✔
261
                }
262
        }
263
}
264

265
// check if user event can be batched in the current batch
266
func (p *BatchEventProcessor) canBatch(current *Batch, user UserEvent) bool {
134✔
267
        if current.ProjectID == user.EventContext.ProjectID &&
134✔
268
                current.Revision == user.EventContext.Revision {
264✔
269
                return true
130✔
270
        }
130✔
271

272
        return false
4✔
273
}
274

275
// add the visitor to the current batch
276
func (p *BatchEventProcessor) addToBatch(current *Batch, visitor Visitor) {
130✔
277
        current.Visitors = append(current.Visitors, visitor)
130✔
278
}
130✔
279

280
// flushEvents flushes events in queue
281
func (p *BatchEventProcessor) flushEvents() {
2,034,084✔
282
        // we flush when queue size is reached.
2,034,084✔
283
        // however, if there is a ticker cycle already processing, we should wait
2,034,084✔
284
        p.flushLock.Lock()
2,034,084✔
285
        defer p.flushLock.Unlock()
2,034,084✔
286

2,034,084✔
287
        var batchEvent Batch
2,034,084✔
288
        var batchEventCount = 0
2,034,084✔
289
        var failedToSend = false
2,034,084✔
290

2,034,084✔
291
        for p.eventsCount() > 0 {
2,034,114✔
292
                if failedToSend {
32✔
293
                        p.logger.Error("last Event Batch failed to send; retry on next flush", errors.New("dispatcher failed"))
2✔
294
                        break
2✔
295
                }
296
                events := p.getEvents(p.BatchSize)
28✔
297

28✔
298
                if len(events) > 0 {
56✔
299
                        for i := 0; i < len(events); i++ {
190✔
300
                                userEvent, ok := events[i].(UserEvent)
162✔
301
                                if ok {
324✔
302
                                        if batchEventCount == 0 {
190✔
303
                                                batchEvent = createBatchEvent(userEvent, createVisitorFromUserEvent(userEvent))
28✔
304
                                                batchEventCount = 1
28✔
305
                                        } else {
162✔
306
                                                if !p.canBatch(&batchEvent, userEvent) {
138✔
307
                                                        // this could happen if the project config was updated for instance.
4✔
308
                                                        p.logger.Info("Can't batch last event. Sending current batch.")
4✔
309
                                                        break
4✔
310
                                                } else {
130✔
311
                                                        p.addToBatch(&batchEvent, createVisitorFromUserEvent(userEvent))
130✔
312
                                                        batchEventCount++
130✔
313
                                                }
130✔
314
                                        }
315

316
                                        if batchEventCount >= p.BatchSize {
163✔
317
                                                // the batch size is reached so take the current batchEvent and send it.
5✔
318
                                                break
5✔
319
                                        }
320
                                }
321
                        }
322
                }
323
                if batchEventCount > 0 {
56✔
324
                        eventEndPoint := ""
28✔
325
                        if p.EventEndPoint != EventEndPoints["US"] && p.EventEndPoint != EventEndPoints["EU"] {
28✔
NEW
UNCOV
326
                                eventEndPoint = p.EventEndPoint
×
NEW
327
                        }
×
328
                        // TODO: figure out what to do with the error
329
                        logEvent := createLogEvent(batchEvent, eventEndPoint)
28✔
330
                        notificationCenter := registry.GetNotificationCenter(p.sdkKey)
28✔
331

28✔
332
                        err := notificationCenter.Send(notification.LogEvent, logEvent)
28✔
333

28✔
334
                        if err != nil {
28✔
UNCOV
335
                                p.logger.Error("Send Log Event notification failed.", err)
×
UNCOV
336
                        }
×
337
                        if success, _ := p.EventDispatcher.DispatchEvent(logEvent); success {
54✔
338
                                p.logger.Debug("Dispatched event successfully")
26✔
339
                                p.remove(batchEventCount)
26✔
340
                                batchEventCount = 0
26✔
341
                                batchEvent = Batch{}
26✔
342
                        } else {
28✔
343
                                p.logger.Warning("Failed to dispatch event successfully")
2✔
344
                                failedToSend = true
2✔
345
                        }
2✔
346
                }
347
        }
348
}
349

350
// OnEventDispatch registers a handler for LogEvent notifications
351
func (p *BatchEventProcessor) OnEventDispatch(callback func(logEvent LogEvent)) (int, error) {
3✔
352
        notificationCenter := registry.GetNotificationCenter(p.sdkKey)
3✔
353

3✔
354
        handler := func(payload interface{}) {
6✔
355
                if ev, ok := payload.(LogEvent); ok {
6✔
356
                        callback(ev)
3✔
357
                } else {
3✔
UNCOV
358
                        p.logger.Warning(fmt.Sprintf("Unable to convert notification payload %v into LogEventNotification", payload))
×
UNCOV
359
                }
×
360
        }
361
        id, err := notificationCenter.AddHandler(notification.LogEvent, handler)
3✔
362
        if err != nil {
3✔
UNCOV
363
                p.logger.Error("Problem with adding notification handler.", err)
×
UNCOV
364
                return 0, err
×
UNCOV
365
        }
×
366
        return id, nil
3✔
367
}
368

369
// RemoveOnEventDispatch removes handler for LogEvent notification with given id
370
func (p *BatchEventProcessor) RemoveOnEventDispatch(id int) error {
3✔
371
        notificationCenter := registry.GetNotificationCenter(p.sdkKey)
3✔
372

3✔
373
        if err := notificationCenter.RemoveHandler(id, notification.LogEvent); err != nil {
3✔
UNCOV
374
                p.logger.Warning("Problem with removing notification handler.")
×
UNCOV
375
                return err
×
UNCOV
376
        }
×
377
        return nil
3✔
378
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc