• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

optimizely / agent / 18271603781

06 Oct 2025 06:04AM UTC coverage: 86.065% (-0.1%) from 86.177%
18271603781

Pull #444

github

Mat001
add missing test coverage in syncer
Pull Request #444: [FSSDK-] Add redis-streams

424 of 511 new or added lines in 6 files covered. (82.97%)

3 existing lines in 1 file now uncovered.

3298 of 3832 relevant lines covered (86.06%)

2992.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.42
/pkg/optimizely/cache.go
1
/****************************************************************************
2
 * Copyright 2019,2022-2024 Optimizely, Inc. and contributors               *
3
 *                                                                          *
4
 * Licensed under the Apache License, Version 2.0 (the "License");          *
5
 * you may not use this file except in compliance with the License.         *
6
 * You may obtain a copy of the License at                                  *
7
 *                                                                          *
8
 *    http://www.apache.org/licenses/LICENSE-2.0                            *
9
 *                                                                          *
10
 * Unless required by applicable law or agreed to in writing, software      *
11
 * distributed under the License is distributed on an "AS IS" BASIS,        *
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
13
 * See the License for the specific language governing permissions and      *
14
 * limitations under the License.                                           *
15
 ***************************************************************************/
16

17
// Package optimizely wraps the Optimizely SDK
18
package optimizely
19

20
import (
21
        "context"
22
        "encoding/json"
23
        "errors"
24
        "os"
25
        "regexp"
26
        "strings"
27
        "sync"
28

29
        cmap "github.com/orcaman/concurrent-map"
30
        "github.com/rs/zerolog/log"
31
        "go.opentelemetry.io/otel/trace"
32

33
        "github.com/optimizely/agent/config"
34
        "github.com/optimizely/agent/pkg/syncer"
35
        "github.com/optimizely/agent/plugins/odpcache"
36
        "github.com/optimizely/agent/plugins/userprofileservice"
37
        odpCachePkg "github.com/optimizely/go-sdk/v2/pkg/cache"
38
        "github.com/optimizely/go-sdk/v2/pkg/client"
39
        "github.com/optimizely/go-sdk/v2/pkg/cmab"
40
        sdkconfig "github.com/optimizely/go-sdk/v2/pkg/config"
41
        "github.com/optimizely/go-sdk/v2/pkg/decision"
42
        "github.com/optimizely/go-sdk/v2/pkg/event"
43
        "github.com/optimizely/go-sdk/v2/pkg/logging"
44
        "github.com/optimizely/go-sdk/v2/pkg/odp"
45
        odpEventPkg "github.com/optimizely/go-sdk/v2/pkg/odp/event"
46
        odpSegmentPkg "github.com/optimizely/go-sdk/v2/pkg/odp/segment"
47
        "github.com/optimizely/go-sdk/v2/pkg/tracing"
48
        "github.com/optimizely/go-sdk/v2/pkg/utils"
49
)
50

51
// User plugin strings required for internal usage
52
const (
53
        userProfileServicePlugin = "UserProfileService"
54
        odpCachePlugin           = "ODP Cache"
55
)
56

57
// OptlyCache implements the Cache interface backed by a concurrent map.
58
// The default OptlyClient lookup is based on supplied configuration via env variables.
59
type OptlyCache struct {
60
        loader                func(string) (*OptlyClient, error)
61
        optlyMap              cmap.ConcurrentMap
62
        userProfileServiceMap cmap.ConcurrentMap
63
        odpCacheMap           cmap.ConcurrentMap
64
        ctx                   context.Context
65
        wg                    sync.WaitGroup
66
}
67

68
// NewCache returns a new implementation of OptlyCache interface backed by a concurrent map.
69
func NewCache(ctx context.Context, conf config.AgentConfig, metricsRegistry *MetricsRegistry, tracer trace.Tracer) *OptlyCache {
1✔
70

1✔
71
        // TODO is there a cleaner way to handle this translation???
1✔
72
        cmLoader := func(sdkkey string, options ...sdkconfig.OptionFunc) SyncedConfigManager {
1✔
73
                return sdkconfig.NewPollingProjectConfigManager(sdkkey, options...)
×
74
        }
×
75

76
        userProfileServiceMap := cmap.New()
1✔
77
        odpCacheMap := cmap.New()
1✔
78
        cache := &OptlyCache{
1✔
79
                ctx:                   ctx,
1✔
80
                wg:                    sync.WaitGroup{},
1✔
81
                loader:                defaultLoader(conf, metricsRegistry, tracer, userProfileServiceMap, odpCacheMap, cmLoader, event.NewBatchEventProcessor),
1✔
82
                optlyMap:              cmap.New(),
1✔
83
                userProfileServiceMap: userProfileServiceMap,
1✔
84
                odpCacheMap:           odpCacheMap,
1✔
85
        }
1✔
86

1✔
87
        return cache
1✔
88
}
89

90
// Init takes a slice of sdkKeys to warm the cache upon startup
91
func (c *OptlyCache) Init(sdkKeys []string) {
1✔
92
        for _, sdkKey := range sdkKeys {
3✔
93
                if _, err := c.GetClient(sdkKey); err != nil {
2✔
94
                        message := "Failed to initialize Optimizely Client."
×
95
                        if ShouldIncludeSDKKey {
×
96
                                log.Warn().Str("sdkKey", sdkKey).Msg(message)
×
97
                                continue
×
98
                        }
99
                        log.Warn().Msg(message)
×
100
                }
101
        }
102
}
103

104
// GetClient is used to fetch an instance of the OptlyClient when the SDK Key is explicitly supplied.
105
func (c *OptlyCache) GetClient(sdkKey string) (*OptlyClient, error) {
11✔
106
        val, ok := c.optlyMap.Get(sdkKey)
11✔
107
        if ok {
12✔
108
                return val.(*OptlyClient), nil
1✔
109
        }
1✔
110

111
        oc, err := c.loader(sdkKey)
10✔
112
        if err != nil {
11✔
113
                return oc, err
1✔
114
        }
1✔
115

116
        set := c.optlyMap.SetIfAbsent(sdkKey, oc)
9✔
117
        if set {
18✔
118
                c.wg.Add(1)
9✔
119
                go func() {
18✔
120
                        defer c.wg.Done()
9✔
121
                        <-c.ctx.Done()
9✔
122
                        oc.Close()
9✔
123
                }()
9✔
124
                return oc, err
9✔
125
        }
126

127
        // Clean-up to not leave any lingering un-unused goroutines
128
        go oc.Close()
×
129

×
130
        // If we didn't "set" the key in this method execution then it was set in another thread.
×
131
        // Recursively lookuping up the SDK key "should" only happen once.
×
132
        return c.GetClient(sdkKey)
×
133
}
134

135
// UpdateConfigs is used to update config for all clients corresponding to a particular SDK key.
136
func (c *OptlyCache) UpdateConfigs(sdkKey string) {
1✔
137
        for clientInfo := range c.optlyMap.IterBuffered() {
4✔
138
                if strings.HasPrefix(clientInfo.Key, sdkKey) {
6✔
139
                        optlyClient, ok := clientInfo.Val.(*OptlyClient)
3✔
140
                        if !ok {
3✔
141
                                log.Error().Msgf("Value not instance of OptlyClient.")
×
142
                        }
×
143
                        optlyClient.UpdateConfig()
3✔
144
                }
145
        }
146
}
147

148
// SetUserProfileService sets userProfileService to be used for the given sdkKey
149
func (c *OptlyCache) SetUserProfileService(sdkKey, userProfileService string) {
7✔
150
        c.userProfileServiceMap.SetIfAbsent(sdkKey, userProfileService)
7✔
151
}
7✔
152

153
// SetODPCache sets odpCache to be used for the given sdkKey
154
func (c *OptlyCache) SetODPCache(sdkKey, odpCache string) {
7✔
155
        c.odpCacheMap.SetIfAbsent(sdkKey, odpCache)
7✔
156
}
7✔
157

158
// Wait for all optimizely clients to gracefully shutdown
159
func (c *OptlyCache) Wait() {
20✔
160
        c.wg.Wait()
20✔
161
}
20✔
162

163
// ErrValidationFailure is returned when the provided SDK key fails initial validation
164
var ErrValidationFailure = errors.New("sdkKey failed validation")
165

166
func regexValidator(sdkKeyRegex string) func(string) bool {
26✔
167
        r, err := regexp.Compile(sdkKeyRegex)
26✔
168
        if err != nil {
26✔
169
                log.Fatal().Err(err).Msgf("invalid sdkKeyRegex configuration")
×
170
        }
×
171

172
        return r.MatchString
26✔
173
}
174

175
func defaultLoader(
176
        agentConf config.AgentConfig,
177
        metricsRegistry *MetricsRegistry,
178
        tracer trace.Tracer,
179
        userProfileServiceMap cmap.ConcurrentMap,
180
        odpCacheMap cmap.ConcurrentMap,
181
        pcFactory func(sdkKey string, options ...sdkconfig.OptionFunc) SyncedConfigManager,
182
        bpFactory func(options ...event.BPOptionConfig) *event.BatchEventProcessor) func(clientKey string) (*OptlyClient, error) {
25✔
183
        clientConf := agentConf.Client
25✔
184
        validator := regexValidator(clientConf.SdkKeyRegex)
25✔
185

25✔
186
        return func(clientKey string) (*OptlyClient, error) {
50✔
187
                var sdkKey string
25✔
188
                var datafileAccessToken string
25✔
189
                var configManager SyncedConfigManager
25✔
190

25✔
191
                if !validator(clientKey) {
26✔
192
                        message := "failed to validate sdk key"
1✔
193
                        if ShouldIncludeSDKKey {
2✔
194
                                log.Warn().Msgf("%v: %q", message, sdkKey)
1✔
195
                        } else {
1✔
196
                                log.Warn().Msg(message)
×
197
                        }
×
198
                        return &OptlyClient{}, ErrValidationFailure
1✔
199
                }
200

201
                clientKeySplit := strings.Split(clientKey, ":")
24✔
202

24✔
203
                // If there is a : then it is an authenticated datafile.
24✔
204
                // First part is the sdkKey.
24✔
205
                // Second part is the datafileAccessToken
24✔
206
                sdkKey = clientKeySplit[0]
24✔
207
                if len(clientKeySplit) == 2 {
24✔
208
                        datafileAccessToken = clientKeySplit[1]
×
209
                }
×
210

211
                message := "Loading Optimizely instance"
24✔
212
                if ShouldIncludeSDKKey {
48✔
213
                        log.Info().Str("sdkKey", sdkKey).Msg(message)
24✔
214
                } else {
24✔
215
                        log.Info().Msg(message)
×
216
                }
×
217

218
                if datafileAccessToken != "" {
24✔
219
                        configManager = pcFactory(
×
220
                                sdkKey,
×
221
                                sdkconfig.WithPollingInterval(clientConf.PollingInterval),
×
222
                                sdkconfig.WithDatafileURLTemplate(clientConf.DatafileURLTemplate),
×
223
                                sdkconfig.WithDatafileAccessToken(datafileAccessToken),
×
224
                        )
×
225
                } else {
24✔
226
                        configManager = pcFactory(
24✔
227
                                sdkKey,
24✔
228
                                sdkconfig.WithPollingInterval(clientConf.PollingInterval),
24✔
229
                                sdkconfig.WithDatafileURLTemplate(clientConf.DatafileURLTemplate),
24✔
230
                        )
24✔
231
                }
24✔
232

233
                if _, err := configManager.GetConfig(); err != nil {
24✔
234
                        return &OptlyClient{}, err
×
235
                }
×
236

237
                q := event.NewInMemoryQueue(clientConf.QueueSize)
24✔
238
                ep := bpFactory(
24✔
239
                        event.WithSDKKey(sdkKey),
24✔
240
                        event.WithQueueSize(clientConf.QueueSize),
24✔
241
                        event.WithBatchSize(clientConf.BatchSize),
24✔
242
                        event.WithEventEndPoint(clientConf.EventURL),
24✔
243
                        event.WithFlushInterval(clientConf.FlushInterval),
24✔
244
                        event.WithQueue(q),
24✔
245
                        event.WithEventDispatcherMetrics(metricsRegistry),
24✔
246
                )
24✔
247

24✔
248
                forcedVariations := decision.NewMapExperimentOverridesStore()
24✔
249
                optimizelyFactory := &client.OptimizelyFactory{SDKKey: sdkKey}
24✔
250

24✔
251
                clientOptions := []client.OptionFunc{
24✔
252
                        client.WithConfigManager(configManager),
24✔
253
                        client.WithExperimentOverrides(forcedVariations),
24✔
254
                        client.WithEventProcessor(ep),
24✔
255
                        client.WithOdpDisabled(clientConf.ODP.Disable),
24✔
256
                        client.WithTracer(tracing.NewOtelTracer(tracer)),
24✔
257
                }
24✔
258

24✔
259
                if agentConf.Synchronization.Notification.Enable {
24✔
260
                        syncedNC, err := syncer.NewSyncedNotificationCenter(context.Background(), sdkKey, agentConf.Synchronization)
×
261
                        if err != nil {
×
262
                                log.Error().Err(err).Msgf("Failed to create SyncedNotificationCenter, reason: %s", err.Error())
×
263
                        } else {
×
264
                                clientOptions = append(clientOptions, client.WithNotificationCenter(syncedNC))
×
265
                        }
×
266
                }
267

268
                var clientUserProfileService decision.UserProfileService
24✔
269
                var rawUPS = getServiceWithType(userProfileServicePlugin, sdkKey, userProfileServiceMap, clientConf.UserProfileService)
24✔
270
                // Check if ups was provided by user
24✔
271
                if rawUPS != nil {
30✔
272
                        // convert ups to UserProfileService interface
6✔
273
                        if convertedUPS, ok := rawUPS.(decision.UserProfileService); ok && convertedUPS != nil {
12✔
274
                                clientUserProfileService = convertedUPS
6✔
275
                                clientOptions = append(clientOptions, client.WithUserProfileService(clientUserProfileService))
6✔
276
                        }
6✔
277
                }
278

279
                var clientODPCache odpCachePkg.Cache
24✔
280
                var rawODPCache = getServiceWithType(odpCachePlugin, sdkKey, odpCacheMap, clientConf.ODP.SegmentsCache)
24✔
281
                // Check if odp cache was provided by user
24✔
282
                if rawODPCache != nil {
30✔
283
                        // convert odpCache to Cache interface
6✔
284
                        if convertedODPCache, ok := rawODPCache.(odpCachePkg.Cache); ok && convertedODPCache != nil {
12✔
285
                                clientODPCache = convertedODPCache
6✔
286
                        }
6✔
287
                }
288

289
                // Create segment manager with odpConfig and custom cache
290
                segmentManager := odpSegmentPkg.NewSegmentManager(
24✔
291
                        sdkKey,
24✔
292
                        odpSegmentPkg.WithAPIManager(
24✔
293
                                odpSegmentPkg.NewSegmentAPIManager(sdkKey, utils.NewHTTPRequester(logging.GetLogger(sdkKey, "SegmentAPIManager"), utils.Timeout(clientConf.ODP.SegmentsRequestTimeout))),
24✔
294
                        ),
24✔
295
                        odpSegmentPkg.WithSegmentsCache(clientODPCache),
24✔
296
                )
24✔
297

24✔
298
                // Create event manager with odpConfig
24✔
299
                eventManager := odpEventPkg.NewBatchEventManager(
24✔
300
                        odpEventPkg.WithAPIManager(
24✔
301
                                odpEventPkg.NewEventAPIManager(
24✔
302
                                        sdkKey, utils.NewHTTPRequester(logging.GetLogger(sdkKey, "EventAPIManager"), utils.Timeout(clientConf.ODP.EventsRequestTimeout)),
24✔
303
                                ),
24✔
304
                        ),
24✔
305
                        odpEventPkg.WithFlushInterval(clientConf.ODP.EventsFlushInterval),
24✔
306
                )
24✔
307

24✔
308
                // Create odp manager with custom segment and event manager
24✔
309
                odpManager := odp.NewOdpManager(
24✔
310
                        sdkKey,
24✔
311
                        clientConf.ODP.Disable,
24✔
312
                        odp.WithSegmentManager(segmentManager),
24✔
313
                        odp.WithEventManager(eventManager),
24✔
314
                )
24✔
315
                clientOptions = append(clientOptions, client.WithOdpManager(odpManager))
24✔
316

24✔
317
                // Configure CMAB prediction endpoint from environment variable
24✔
318
                // This allows FSC tests to override the endpoint by setting OPTIMIZELY_CMAB_PREDICTIONENDPOINT
24✔
319
                if cmabEndpoint := os.Getenv("OPTIMIZELY_CMAB_PREDICTIONENDPOINT"); cmabEndpoint != "" {
24✔
320
                        // Set the global variable that go-sdk uses (FSC already includes the /%s format)
×
321
                        cmab.CMABPredictionEndpoint = cmabEndpoint
×
322
                        log.Info().Str("endpoint", cmabEndpoint).Msg("Using custom CMAB prediction endpoint")
×
323
                }
×
324

325
                // Parse CMAB cache configuration
326
                cacheSize := clientConf.CMAB.Cache.Size
24✔
327
                if cacheSize == 0 {
43✔
328
                        cacheSize = cmab.DefaultCacheSize
19✔
329
                }
19✔
330

331
                cacheTTL := clientConf.CMAB.Cache.TTL
24✔
332
                if cacheTTL == 0 {
40✔
333
                        cacheTTL = cmab.DefaultCacheTTL
16✔
334
                }
16✔
335

336
                // Create retry config
337
                retryConfig := &cmab.RetryConfig{
24✔
338
                        MaxRetries:        clientConf.CMAB.RetryConfig.MaxRetries,
24✔
339
                        InitialBackoff:    clientConf.CMAB.RetryConfig.InitialBackoff,
24✔
340
                        MaxBackoff:        clientConf.CMAB.RetryConfig.MaxBackoff,
24✔
341
                        BackoffMultiplier: clientConf.CMAB.RetryConfig.BackoffMultiplier,
24✔
342
                }
24✔
343

24✔
344
                // Apply defaults for retry config if not set
24✔
345
                if retryConfig.MaxRetries == 0 {
43✔
346
                        retryConfig.MaxRetries = cmab.DefaultMaxRetries
19✔
347
                }
19✔
348
                if retryConfig.InitialBackoff == 0 {
44✔
349
                        retryConfig.InitialBackoff = cmab.DefaultInitialBackoff
20✔
350
                }
20✔
351
                if retryConfig.MaxBackoff == 0 {
44✔
352
                        retryConfig.MaxBackoff = cmab.DefaultMaxBackoff
20✔
353
                }
20✔
354
                if retryConfig.BackoffMultiplier == 0 {
44✔
355
                        retryConfig.BackoffMultiplier = cmab.DefaultBackoffMultiplier
20✔
356
                }
20✔
357

358
                // Create CMAB config (NO endpoint configuration - not configurable)
359
                cmabConfig := cmab.Config{
24✔
360
                        CacheSize:   cacheSize,
24✔
361
                        CacheTTL:    cacheTTL,
24✔
362
                        HTTPTimeout: clientConf.CMAB.RequestTimeout,
24✔
363
                        RetryConfig: retryConfig,
24✔
364
                }
24✔
365

24✔
366
                // Add to client options
24✔
367
                clientOptions = append(clientOptions, client.WithCmabConfig(&cmabConfig))
24✔
368

24✔
369
                optimizelyClient, err := optimizelyFactory.Client(
24✔
370
                        clientOptions...,
24✔
371
                )
24✔
372
                return &OptlyClient{optimizelyClient, configManager, forcedVariations, clientUserProfileService, clientODPCache}, err
24✔
373
        }
374
}
375

376
func getServiceWithType(serviceType, sdkKey string, serviceMap cmap.ConcurrentMap, serviceConf map[string]interface{}) interface{} {
60✔
377

60✔
378
        intializeServiceWithName := func(serviceName string) interface{} {
84✔
379
                if clientConfigMap, ok := serviceConf["services"].(map[string]interface{}); ok {
46✔
380
                        if serviceConfig, ok := clientConfigMap[serviceName].(map[string]interface{}); ok {
42✔
381
                                // Check if any such service was added using `Add` method
20✔
382
                                var serviceInstance interface{}
20✔
383
                                switch serviceType {
20✔
384
                                case userProfileServicePlugin:
10✔
385
                                        if upsCreator, ok := userprofileservice.Creators[serviceName]; ok {
20✔
386
                                                serviceInstance = upsCreator()
10✔
387
                                        }
10✔
388
                                case odpCachePlugin:
10✔
389
                                        if odpCreator, ok := odpcache.Creators[serviceName]; ok {
20✔
390
                                                serviceInstance = odpCreator()
10✔
391
                                        }
10✔
UNCOV
392
                                default:
×
393
                                }
394

395
                                if serviceInstance != nil {
36✔
396
                                        // Trying to map service from client config to struct
16✔
397
                                        if serviceConfig, err := json.Marshal(serviceConfig); err != nil {
18✔
398
                                                log.Warn().Err(err).Msgf(`Error marshaling %s config: %q`, serviceType, serviceName)
2✔
399
                                        } else if err := json.Unmarshal(serviceConfig, serviceInstance); err != nil {
18✔
400
                                                log.Warn().Err(err).Msgf(`Error unmarshalling %s config: %q`, serviceType, serviceName)
2✔
401
                                        } else {
14✔
402
                                                log.Info().Msgf(`%s of type: %q created for sdkKey: %q`, serviceType, serviceName, sdkKey)
12✔
403
                                                return serviceInstance
12✔
404
                                        }
12✔
405
                                }
406
                                return nil
8✔
407
                        }
408
                }
409
                return nil
4✔
410
        }
411

412
        // Check if service name was provided in the request headers
413
        if service, ok := serviceMap.Get(sdkKey); ok {
74✔
414
                if serviceNameStr, ok := service.(string); ok && serviceNameStr != "" {
28✔
415
                        return intializeServiceWithName(serviceNameStr)
14✔
416
                }
14✔
417
        }
418

419
        // Check if any default service was provided and if it exists in client config
420
        if defaultServiceName, isAvailable := serviceConf["default"].(string); isAvailable && defaultServiceName != "" {
56✔
421
                return intializeServiceWithName(defaultServiceName)
10✔
422
        }
10✔
423
        return nil
36✔
424
}
425

426
// ResetClient removes the optimizely client from cache to ensure clean state for testing
427
// This is primarily used by FSC tests to clear CMAB cache between test scenarios
428
func (c *OptlyCache) ResetClient(sdkKey string) {
2✔
429
        // Remove the client from the cache
2✔
430
        if val, exists := c.optlyMap.Get(sdkKey); exists {
3✔
431
                c.optlyMap.Remove(sdkKey)
1✔
432

1✔
433
                // Close the client to clean up resources
1✔
434
                if client, ok := val.(*OptlyClient); ok {
2✔
435
                        client.Close()
1✔
436
                }
1✔
437

438
                message := "Reset Optimizely client for testing"
1✔
439
                if ShouldIncludeSDKKey {
2✔
440
                        log.Info().Str("sdkKey", sdkKey).Msg(message)
1✔
441
                } else {
1✔
UNCOV
442
                        log.Info().Msg(message)
×
UNCOV
443
                }
×
444
        }
445
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc