• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

optimizely / agent / 19177705593

07 Nov 2025 06:32PM UTC coverage: 86.321% (+0.1%) from 86.21%
19177705593

push

github

web-flow
[FSSDK-11990] Add Redis cache support for CMAB following ODP cache pattern (#447)

* Add Redis cache support for CMAB following ODP cache pattern

Implement Redis caching for CMAB (Contextual Multi-Armed Bandit)
decisions using the same plugin-based architecture as ODP cache.

Changes:
- Add cmabcache plugin with registry and service implementations
- Implement in-memory LRU cache (size: 10000, TTL: 30m)
- Implement Redis cache with JSON serialization
- Update CMABCacheConfig from struct to service-based map config
- Add comprehensive unit and integration tests (all passing)
- Update config.yaml with new service-based CMAB cache format
- Add cmabcache plugin import to main.go
- Fix cache.go to initialize CMAB cache via plugin system
- Add tests to improve CMAB config parsing coverage

Test coverage:
- In-memory cache tests: 8/8 passing
- Redis cache tests: 8/8 passing
- Integration tests: 8/8 passing
- Config parsing coverage improved for lines 149-167
- All package tests: passing

* Apply linter formatting fixes

* move redis cmab config into existing config

* expose cmab prediction endpoint in config

145 of 164 new or added lines in 6 files covered. (88.41%)

1 existing line in 1 file now uncovered.

2947 of 3414 relevant lines covered (86.32%)

3135.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.36
/pkg/optimizely/cache.go
1
/****************************************************************************
2
 * Copyright 2019,2022-2024 Optimizely, Inc. and contributors               *
3
 *                                                                          *
4
 * Licensed under the Apache License, Version 2.0 (the "License");          *
5
 * you may not use this file except in compliance with the License.         *
6
 * You may obtain a copy of the License at                                  *
7
 *                                                                          *
8
 *    http://www.apache.org/licenses/LICENSE-2.0                            *
9
 *                                                                          *
10
 * Unless required by applicable law or agreed to in writing, software      *
11
 * distributed under the License is distributed on an "AS IS" BASIS,        *
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
13
 * See the License for the specific language governing permissions and      *
14
 * limitations under the License.                                           *
15
 ***************************************************************************/
16

17
// Package optimizely wraps the Optimizely SDK
18
package optimizely
19

20
import (
21
        "context"
22
        "encoding/json"
23
        "errors"
24
        "os"
25
        "regexp"
26
        "strings"
27
        "sync"
28

29
        cmap "github.com/orcaman/concurrent-map"
30
        "github.com/rs/zerolog/log"
31
        "go.opentelemetry.io/otel/trace"
32

33
        "github.com/optimizely/agent/config"
34
        "github.com/optimizely/agent/pkg/syncer"
35
        "github.com/optimizely/agent/plugins/cmabcache"
36
        "github.com/optimizely/agent/plugins/odpcache"
37
        "github.com/optimizely/agent/plugins/userprofileservice"
38
        cachePkg "github.com/optimizely/go-sdk/v2/pkg/cache"
39
        "github.com/optimizely/go-sdk/v2/pkg/client"
40
        "github.com/optimizely/go-sdk/v2/pkg/cmab"
41
        sdkconfig "github.com/optimizely/go-sdk/v2/pkg/config"
42
        "github.com/optimizely/go-sdk/v2/pkg/decision"
43
        "github.com/optimizely/go-sdk/v2/pkg/event"
44
        "github.com/optimizely/go-sdk/v2/pkg/logging"
45
        "github.com/optimizely/go-sdk/v2/pkg/odp"
46
        odpEventPkg "github.com/optimizely/go-sdk/v2/pkg/odp/event"
47
        odpSegmentPkg "github.com/optimizely/go-sdk/v2/pkg/odp/segment"
48
        "github.com/optimizely/go-sdk/v2/pkg/tracing"
49
        "github.com/optimizely/go-sdk/v2/pkg/utils"
50
)
51

52
// User plugin strings required for internal usage
53
const (
54
        userProfileServicePlugin = "UserProfileService"
55
        odpCachePlugin           = "ODP Cache"
56
        cmabCachePlugin          = "CMAB Cache"
57
)
58

59
// OptlyCache implements the Cache interface backed by a concurrent map.
60
// The default OptlyClient lookup is based on supplied configuration via env variables.
61
type OptlyCache struct {
62
        loader                func(string) (*OptlyClient, error)
63
        optlyMap              cmap.ConcurrentMap
64
        userProfileServiceMap cmap.ConcurrentMap
65
        odpCacheMap           cmap.ConcurrentMap
66
        cmabCacheMap          cmap.ConcurrentMap
67
        ctx                   context.Context
68
        wg                    sync.WaitGroup
69
}
70

71
// NewCache returns a new implementation of OptlyCache interface backed by a concurrent map.
72
func NewCache(ctx context.Context, conf config.AgentConfig, metricsRegistry *MetricsRegistry, tracer trace.Tracer) *OptlyCache {
1✔
73

1✔
74
        // TODO is there a cleaner way to handle this translation???
1✔
75
        cmLoader := func(sdkkey string, options ...sdkconfig.OptionFunc) SyncedConfigManager {
1✔
76
                return sdkconfig.NewPollingProjectConfigManager(sdkkey, options...)
×
77
        }
×
78

79
        userProfileServiceMap := cmap.New()
1✔
80
        odpCacheMap := cmap.New()
1✔
81
        cmabCacheMap := cmap.New()
1✔
82
        cache := &OptlyCache{
1✔
83
                ctx:                   ctx,
1✔
84
                wg:                    sync.WaitGroup{},
1✔
85
                loader:                defaultLoader(conf, metricsRegistry, tracer, userProfileServiceMap, odpCacheMap, cmabCacheMap, cmLoader, event.NewBatchEventProcessor),
1✔
86
                optlyMap:              cmap.New(),
1✔
87
                userProfileServiceMap: userProfileServiceMap,
1✔
88
                odpCacheMap:           odpCacheMap,
1✔
89
                cmabCacheMap:          cmabCacheMap,
1✔
90
        }
1✔
91

1✔
92
        return cache
1✔
93
}
94

95
// Init takes a slice of sdkKeys to warm the cache upon startup
96
func (c *OptlyCache) Init(sdkKeys []string) {
1✔
97
        for _, sdkKey := range sdkKeys {
3✔
98
                if _, err := c.GetClient(sdkKey); err != nil {
2✔
99
                        message := "Failed to initialize Optimizely Client."
×
100
                        if ShouldIncludeSDKKey {
×
101
                                log.Warn().Str("sdkKey", sdkKey).Msg(message)
×
102
                                continue
×
103
                        }
104
                        log.Warn().Msg(message)
×
105
                }
106
        }
107
}
108

109
// GetClient is used to fetch an instance of the OptlyClient when the SDK Key is explicitly supplied.
110
func (c *OptlyCache) GetClient(sdkKey string) (*OptlyClient, error) {
11✔
111
        val, ok := c.optlyMap.Get(sdkKey)
11✔
112
        if ok {
12✔
113
                return val.(*OptlyClient), nil
1✔
114
        }
1✔
115

116
        oc, err := c.loader(sdkKey)
10✔
117
        if err != nil {
11✔
118
                return oc, err
1✔
119
        }
1✔
120

121
        set := c.optlyMap.SetIfAbsent(sdkKey, oc)
9✔
122
        if set {
18✔
123
                c.wg.Add(1)
9✔
124
                go func() {
18✔
125
                        defer c.wg.Done()
9✔
126
                        <-c.ctx.Done()
9✔
127
                        oc.Close()
9✔
128
                }()
9✔
129
                return oc, err
9✔
130
        }
131

132
        // Clean-up to not leave any lingering un-unused goroutines
133
        go oc.Close()
×
134

×
135
        // If we didn't "set" the key in this method execution then it was set in another thread.
×
136
        // Recursively lookuping up the SDK key "should" only happen once.
×
137
        return c.GetClient(sdkKey)
×
138
}
139

140
// UpdateConfigs is used to update config for all clients corresponding to a particular SDK key.
141
func (c *OptlyCache) UpdateConfigs(sdkKey string) {
1✔
142
        for clientInfo := range c.optlyMap.IterBuffered() {
4✔
143
                if strings.HasPrefix(clientInfo.Key, sdkKey) {
6✔
144
                        optlyClient, ok := clientInfo.Val.(*OptlyClient)
3✔
145
                        if !ok {
3✔
146
                                log.Error().Msgf("Value not instance of OptlyClient.")
×
147
                        }
×
148
                        optlyClient.UpdateConfig()
3✔
149
                }
150
        }
151
}
152

153
// SetUserProfileService sets userProfileService to be used for the given sdkKey
154
func (c *OptlyCache) SetUserProfileService(sdkKey, userProfileService string) {
7✔
155
        c.userProfileServiceMap.SetIfAbsent(sdkKey, userProfileService)
7✔
156
}
7✔
157

158
// SetODPCache sets odpCache to be used for the given sdkKey
159
func (c *OptlyCache) SetODPCache(sdkKey, odpCache string) {
7✔
160
        c.odpCacheMap.SetIfAbsent(sdkKey, odpCache)
7✔
161
}
7✔
162

163
// SetCMABCache sets CMAB cache for the given sdkKey
164
func (c *OptlyCache) SetCMABCache(sdkKey, cmabCache string) {
9✔
165
        c.cmabCacheMap.SetIfAbsent(sdkKey, cmabCache)
9✔
166
}
9✔
167

168
// Wait for all optimizely clients to gracefully shutdown
169
func (c *OptlyCache) Wait() {
28✔
170
        c.wg.Wait()
28✔
171
}
28✔
172

173
// ErrValidationFailure is returned when the provided SDK key fails initial validation
174
var ErrValidationFailure = errors.New("sdkKey failed validation")
175

176
func regexValidator(sdkKeyRegex string) func(string) bool {
16✔
177
        r, err := regexp.Compile(sdkKeyRegex)
16✔
178
        if err != nil {
16✔
179
                log.Fatal().Err(err).Msgf("invalid sdkKeyRegex configuration")
×
180
        }
×
181

182
        return r.MatchString
16✔
183
}
184

185
func defaultLoader(
186
        agentConf config.AgentConfig,
187
        metricsRegistry *MetricsRegistry,
188
        tracer trace.Tracer,
189
        userProfileServiceMap cmap.ConcurrentMap,
190
        odpCacheMap cmap.ConcurrentMap,
191
        cmabCacheMap cmap.ConcurrentMap,
192
        pcFactory func(sdkKey string, options ...sdkconfig.OptionFunc) SyncedConfigManager,
193
        bpFactory func(options ...event.BPOptionConfig) *event.BatchEventProcessor) func(clientKey string) (*OptlyClient, error) {
15✔
194
        clientConf := agentConf.Client
15✔
195
        validator := regexValidator(clientConf.SdkKeyRegex)
15✔
196

15✔
197
        return func(clientKey string) (*OptlyClient, error) {
30✔
198
                var sdkKey string
15✔
199
                var datafileAccessToken string
15✔
200
                var configManager SyncedConfigManager
15✔
201

15✔
202
                if !validator(clientKey) {
16✔
203
                        message := "failed to validate sdk key"
1✔
204
                        if ShouldIncludeSDKKey {
2✔
205
                                log.Warn().Msgf("%v: %q", message, sdkKey)
1✔
206
                        } else {
1✔
207
                                log.Warn().Msg(message)
×
208
                        }
×
209
                        return &OptlyClient{}, ErrValidationFailure
1✔
210
                }
211

212
                clientKeySplit := strings.Split(clientKey, ":")
14✔
213

14✔
214
                // If there is a : then it is an authenticated datafile.
14✔
215
                // First part is the sdkKey.
14✔
216
                // Second part is the datafileAccessToken
14✔
217
                sdkKey = clientKeySplit[0]
14✔
218
                if len(clientKeySplit) == 2 {
14✔
219
                        datafileAccessToken = clientKeySplit[1]
×
220
                }
×
221

222
                message := "Loading Optimizely instance"
14✔
223
                if ShouldIncludeSDKKey {
28✔
224
                        log.Info().Str("sdkKey", sdkKey).Msg(message)
14✔
225
                } else {
14✔
226
                        log.Info().Msg(message)
×
227
                }
×
228

229
                if datafileAccessToken != "" {
14✔
230
                        configManager = pcFactory(
×
231
                                sdkKey,
×
232
                                sdkconfig.WithPollingInterval(clientConf.PollingInterval),
×
233
                                sdkconfig.WithDatafileURLTemplate(clientConf.DatafileURLTemplate),
×
234
                                sdkconfig.WithDatafileAccessToken(datafileAccessToken),
×
235
                        )
×
236
                } else {
14✔
237
                        configManager = pcFactory(
14✔
238
                                sdkKey,
14✔
239
                                sdkconfig.WithPollingInterval(clientConf.PollingInterval),
14✔
240
                                sdkconfig.WithDatafileURLTemplate(clientConf.DatafileURLTemplate),
14✔
241
                        )
14✔
242
                }
14✔
243

244
                if _, err := configManager.GetConfig(); err != nil {
14✔
245
                        return &OptlyClient{}, err
×
246
                }
×
247

248
                q := event.NewInMemoryQueue(clientConf.QueueSize)
14✔
249
                ep := bpFactory(
14✔
250
                        event.WithSDKKey(sdkKey),
14✔
251
                        event.WithQueueSize(clientConf.QueueSize),
14✔
252
                        event.WithBatchSize(clientConf.BatchSize),
14✔
253
                        event.WithEventEndPoint(clientConf.EventURL),
14✔
254
                        event.WithFlushInterval(clientConf.FlushInterval),
14✔
255
                        event.WithQueue(q),
14✔
256
                        event.WithEventDispatcherMetrics(metricsRegistry),
14✔
257
                )
14✔
258

14✔
259
                forcedVariations := decision.NewMapExperimentOverridesStore()
14✔
260
                optimizelyFactory := &client.OptimizelyFactory{SDKKey: sdkKey}
14✔
261

14✔
262
                clientOptions := []client.OptionFunc{
14✔
263
                        client.WithConfigManager(configManager),
14✔
264
                        client.WithExperimentOverrides(forcedVariations),
14✔
265
                        client.WithEventProcessor(ep),
14✔
266
                        client.WithOdpDisabled(clientConf.ODP.Disable),
14✔
267
                        client.WithTracer(tracing.NewOtelTracer(tracer)),
14✔
268
                }
14✔
269

14✔
270
                if agentConf.Synchronization.Notification.Enable {
14✔
271
                        syncedNC, err := syncer.NewSyncedNotificationCenter(context.Background(), sdkKey, agentConf.Synchronization)
×
272
                        if err != nil {
×
273
                                log.Error().Err(err).Msgf("Failed to create SyncedNotificationCenter, reason: %s", err.Error())
×
274
                        } else {
×
275
                                clientOptions = append(clientOptions, client.WithNotificationCenter(syncedNC))
×
276
                        }
×
277
                }
278

279
                var clientUserProfileService decision.UserProfileService
14✔
280
                var rawUPS = getServiceWithType(userProfileServicePlugin, sdkKey, userProfileServiceMap, clientConf.UserProfileService)
14✔
281
                // Check if ups was provided by user
14✔
282
                if rawUPS != nil {
19✔
283
                        // convert ups to UserProfileService interface
5✔
284
                        if convertedUPS, ok := rawUPS.(decision.UserProfileService); ok && convertedUPS != nil {
10✔
285
                                clientUserProfileService = convertedUPS
5✔
286
                                clientOptions = append(clientOptions, client.WithUserProfileService(clientUserProfileService))
5✔
287
                        }
5✔
288
                }
289

290
                var clientODPCache cachePkg.Cache
14✔
291
                var rawODPCache = getServiceWithType(odpCachePlugin, sdkKey, odpCacheMap, clientConf.ODP.SegmentsCache)
14✔
292
                // Check if odp cache was provided by user
14✔
293
                if rawODPCache != nil {
19✔
294
                        // convert odpCache to Cache interface
5✔
295
                        if convertedODPCache, ok := rawODPCache.(cachePkg.Cache); ok && convertedODPCache != nil {
10✔
296
                                clientODPCache = convertedODPCache
5✔
297
                        }
5✔
298
                }
299

300
                // Create segment manager with odpConfig and custom cache
301
                segmentManager := odpSegmentPkg.NewSegmentManager(
14✔
302
                        sdkKey,
14✔
303
                        odpSegmentPkg.WithAPIManager(
14✔
304
                                odpSegmentPkg.NewSegmentAPIManager(sdkKey, utils.NewHTTPRequester(logging.GetLogger(sdkKey, "SegmentAPIManager"), utils.Timeout(clientConf.ODP.SegmentsRequestTimeout))),
14✔
305
                        ),
14✔
306
                        odpSegmentPkg.WithSegmentsCache(clientODPCache),
14✔
307
                )
14✔
308

14✔
309
                // Create event manager with odpConfig
14✔
310
                eventManager := odpEventPkg.NewBatchEventManager(
14✔
311
                        odpEventPkg.WithAPIManager(
14✔
312
                                odpEventPkg.NewEventAPIManager(
14✔
313
                                        sdkKey, utils.NewHTTPRequester(logging.GetLogger(sdkKey, "EventAPIManager"), utils.Timeout(clientConf.ODP.EventsRequestTimeout)),
14✔
314
                                ),
14✔
315
                        ),
14✔
316
                        odpEventPkg.WithFlushInterval(clientConf.ODP.EventsFlushInterval),
14✔
317
                )
14✔
318

14✔
319
                // Create odp manager with custom segment and event manager
14✔
320
                odpManager := odp.NewOdpManager(
14✔
321
                        sdkKey,
14✔
322
                        clientConf.ODP.Disable,
14✔
323
                        odp.WithSegmentManager(segmentManager),
14✔
324
                        odp.WithEventManager(eventManager),
14✔
325
                )
14✔
326
                clientOptions = append(clientOptions, client.WithOdpManager(odpManager))
14✔
327

14✔
328
                // Configure CMAB prediction endpoint with priority: env var > config > default
14✔
329
                // Environment variable allows test/runtime overrides
14✔
330
                if cmabEndpoint := os.Getenv("OPTIMIZELY_CMAB_PREDICTIONENDPOINT"); cmabEndpoint != "" {
16✔
331
                        // Environment variable takes highest priority
2✔
332
                        cmab.CMABPredictionEndpoint = cmabEndpoint
2✔
333
                        log.Info().Str("endpoint", cmabEndpoint).Str("source", "environment").Msg("Using CMAB prediction endpoint")
2✔
334
                } else if clientConf.CMAB.PredictionEndpoint != "" {
15✔
335
                        // Use config value if environment variable not set
1✔
336
                        cmab.CMABPredictionEndpoint = clientConf.CMAB.PredictionEndpoint
1✔
337
                        log.Info().Str("endpoint", clientConf.CMAB.PredictionEndpoint).Str("source", "config").Msg("Using CMAB prediction endpoint")
1✔
338
                }
1✔
339

340
                // Get CMAB cache from service configuration
341
                var clientCMABCache cachePkg.CacheWithRemove
14✔
342
                var rawCMABCache = getServiceWithType(cmabCachePlugin, sdkKey, cmabCacheMap, clientConf.CMAB.Cache)
14✔
343
                // Check if CMAB cache was provided by user
14✔
344
                if rawCMABCache != nil {
17✔
345
                        // convert cmabCache to CacheWithRemove interface
3✔
346
                        if convertedCMABCache, ok := rawCMABCache.(cachePkg.CacheWithRemove); ok && convertedCMABCache != nil {
6✔
347
                                clientCMABCache = convertedCMABCache
3✔
348
                        }
3✔
349
                }
350

351
                // Create CMAB config using client API with custom cache
352
                cmabConfig := client.CmabConfig{
14✔
353
                        Cache:       clientCMABCache,
14✔
354
                        HTTPTimeout: clientConf.CMAB.RequestTimeout,
14✔
355
                }
14✔
356

14✔
357
                // Add to client options
14✔
358
                clientOptions = append(clientOptions, client.WithCmabConfig(&cmabConfig))
14✔
359

14✔
360
                optimizelyClient, err := optimizelyFactory.Client(
14✔
361
                        clientOptions...,
14✔
362
                )
14✔
363
                return &OptlyClient{optimizelyClient, configManager, forcedVariations, clientUserProfileService, clientODPCache}, err
14✔
364
        }
365
}
366

367
func getServiceWithType(serviceType, sdkKey string, serviceMap cmap.ConcurrentMap, serviceConf map[string]interface{}) interface{} {
62✔
368

62✔
369
        intializeServiceWithName := func(serviceName string) interface{} {
95✔
370
                if clientConfigMap, ok := serviceConf["services"].(map[string]interface{}); ok {
63✔
371
                        if serviceConfig, ok := clientConfigMap[serviceName].(map[string]interface{}); ok {
57✔
372
                                // Check if any such service was added using `Add` method
27✔
373
                                var serviceInstance interface{}
27✔
374
                                switch serviceType {
27✔
375
                                case userProfileServicePlugin:
9✔
376
                                        if upsCreator, ok := userprofileservice.Creators[serviceName]; ok {
18✔
377
                                                serviceInstance = upsCreator()
9✔
378
                                        }
9✔
379
                                case odpCachePlugin:
9✔
380
                                        if odpCreator, ok := odpcache.Creators[serviceName]; ok {
18✔
381
                                                serviceInstance = odpCreator()
9✔
382
                                        }
9✔
383
                                case cmabCachePlugin:
9✔
384
                                        if cmabCreator, ok := cmabcache.Creators[serviceName]; ok && cmabCreator != nil {
18✔
385
                                                serviceInstance = cmabCreator()
9✔
386
                                        }
9✔
UNCOV
387
                                default:
×
388
                                }
389

390
                                if serviceInstance != nil {
48✔
391
                                        // Trying to map service from client config to struct
21✔
392
                                        if serviceConfig, err := json.Marshal(serviceConfig); err != nil {
24✔
393
                                                log.Warn().Err(err).Msgf(`Error marshaling %s config: %q`, serviceType, serviceName)
3✔
394
                                        } else if err := json.Unmarshal(serviceConfig, serviceInstance); err != nil {
24✔
395
                                                log.Warn().Err(err).Msgf(`Error unmarshalling %s config: %q`, serviceType, serviceName)
3✔
396
                                        } else {
18✔
397
                                                log.Info().Msgf(`%s of type: %q created for sdkKey: %q`, serviceType, serviceName, sdkKey)
15✔
398
                                                return serviceInstance
15✔
399
                                        }
15✔
400
                                }
401
                                return nil
12✔
402
                        }
403
                }
404
                return nil
6✔
405
        }
406

407
        // Check if service name was provided in the request headers
408
        if service, ok := serviceMap.Get(sdkKey); ok {
84✔
409
                if serviceNameStr, ok := service.(string); ok && serviceNameStr != "" {
44✔
410
                        return intializeServiceWithName(serviceNameStr)
22✔
411
                }
22✔
412
        }
413

414
        // Check if any default service was provided and if it exists in client config
415
        if defaultServiceName, isAvailable := serviceConf["default"].(string); isAvailable && defaultServiceName != "" {
51✔
416
                return intializeServiceWithName(defaultServiceName)
11✔
417
        }
11✔
418
        return nil
29✔
419
}
420

421
// ResetClient removes the optimizely client from cache to ensure clean state for testing
422
// This is primarily used by FSC tests to clear CMAB cache between test scenarios
423
func (c *OptlyCache) ResetClient(sdkKey string) {
2✔
424
        // Remove the client from the cache
2✔
425
        if val, exists := c.optlyMap.Get(sdkKey); exists {
3✔
426
                c.optlyMap.Remove(sdkKey)
1✔
427

1✔
428
                // Close the client to clean up resources
1✔
429
                if client, ok := val.(*OptlyClient); ok {
2✔
430
                        client.Close()
1✔
431
                }
1✔
432

433
                message := "Reset Optimizely client for testing"
1✔
434
                if ShouldIncludeSDKKey {
2✔
435
                        log.Info().Str("sdkKey", sdkKey).Msg(message)
1✔
436
                } else {
1✔
437
                        log.Info().Msg(message)
×
438
                }
×
439
        }
440
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc