• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

optimizely / agent / 18291298483

06 Oct 2025 06:53PM UTC coverage: 86.309% (+0.1%) from 86.177%
18291298483

Pull #444

github

Mat001
Complete fix for goroutine lifecycle race condition in Subscribe

Address comprehensive goroutine lifecycle management issue:

Problem:
The previous fix with buffered channel only prevented blocking, but didn't
prevent goroutine leaks. When the main function returns with an error or
due to context cancellation BEFORE the goroutine enters its main loop, the
goroutine would continue running indefinitely until the original context
expires (could be much later).

Complete Solution:
1. Added stop channel (chan struct{}) to signal goroutine termination
2. Goroutine checks stop channel during initialization (both error and success paths)
3. Goroutine checks stop channel as first case in main select loop
4. Main function closes stop channel when returning with error or ctx.Done()
5. Main function does NOT close stop on success - goroutine continues normally

This ensures:
- No goroutine leaks when Subscribe returns early with error
- No goroutine leaks when context is cancelled during initialization
- Clean resource cleanup via defer statements
- Goroutine runs normally when initialization succeeds

Verified with race detector running 3 consecutive times.
Pull Request #444: [FSSDK-] Add redis-streams

440 of 521 new or added lines in 6 files covered. (84.45%)

3 existing lines in 1 file now uncovered.

3316 of 3842 relevant lines covered (86.31%)

2491.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.42
/pkg/optimizely/cache.go
1
/****************************************************************************
2
 * Copyright 2019,2022-2024 Optimizely, Inc. and contributors               *
3
 *                                                                          *
4
 * Licensed under the Apache License, Version 2.0 (the "License");          *
5
 * you may not use this file except in compliance with the License.         *
6
 * You may obtain a copy of the License at                                  *
7
 *                                                                          *
8
 *    http://www.apache.org/licenses/LICENSE-2.0                            *
9
 *                                                                          *
10
 * Unless required by applicable law or agreed to in writing, software      *
11
 * distributed under the License is distributed on an "AS IS" BASIS,        *
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
13
 * See the License for the specific language governing permissions and      *
14
 * limitations under the License.                                           *
15
 ***************************************************************************/
16

17
// Package optimizely wraps the Optimizely SDK
18
package optimizely
19

20
import (
21
        "context"
22
        "encoding/json"
23
        "errors"
24
        "os"
25
        "regexp"
26
        "strings"
27
        "sync"
28

29
        cmap "github.com/orcaman/concurrent-map"
30
        "github.com/rs/zerolog/log"
31
        "go.opentelemetry.io/otel/trace"
32

33
        "github.com/optimizely/agent/config"
34
        "github.com/optimizely/agent/pkg/syncer"
35
        "github.com/optimizely/agent/plugins/odpcache"
36
        "github.com/optimizely/agent/plugins/userprofileservice"
37
        odpCachePkg "github.com/optimizely/go-sdk/v2/pkg/cache"
38
        "github.com/optimizely/go-sdk/v2/pkg/client"
39
        "github.com/optimizely/go-sdk/v2/pkg/cmab"
40
        sdkconfig "github.com/optimizely/go-sdk/v2/pkg/config"
41
        "github.com/optimizely/go-sdk/v2/pkg/decision"
42
        "github.com/optimizely/go-sdk/v2/pkg/event"
43
        "github.com/optimizely/go-sdk/v2/pkg/logging"
44
        "github.com/optimizely/go-sdk/v2/pkg/odp"
45
        odpEventPkg "github.com/optimizely/go-sdk/v2/pkg/odp/event"
46
        odpSegmentPkg "github.com/optimizely/go-sdk/v2/pkg/odp/segment"
47
        "github.com/optimizely/go-sdk/v2/pkg/tracing"
48
        "github.com/optimizely/go-sdk/v2/pkg/utils"
49
)
50

51
// User plugin strings required for internal usage
52
const (
53
        userProfileServicePlugin = "UserProfileService"
54
        odpCachePlugin           = "ODP Cache"
55
)
56

57
// OptlyCache implements the Cache interface backed by a concurrent map.
58
// The default OptlyClient lookup is based on supplied configuration via env variables.
59
type OptlyCache struct {
60
        loader                func(string) (*OptlyClient, error)
61
        optlyMap              cmap.ConcurrentMap
62
        userProfileServiceMap cmap.ConcurrentMap
63
        odpCacheMap           cmap.ConcurrentMap
64
        ctx                   context.Context
65
        wg                    sync.WaitGroup
66
}
67

68
// NewCache returns a new implementation of OptlyCache interface backed by a concurrent map.
69
func NewCache(ctx context.Context, conf config.AgentConfig, metricsRegistry *MetricsRegistry, tracer trace.Tracer) *OptlyCache {
1✔
70

1✔
71
        // TODO is there a cleaner way to handle this translation???
1✔
72
        cmLoader := func(sdkkey string, options ...sdkconfig.OptionFunc) SyncedConfigManager {
1✔
73
                return sdkconfig.NewPollingProjectConfigManager(sdkkey, options...)
×
74
        }
×
75

76
        userProfileServiceMap := cmap.New()
1✔
77
        odpCacheMap := cmap.New()
1✔
78
        cache := &OptlyCache{
1✔
79
                ctx:                   ctx,
1✔
80
                wg:                    sync.WaitGroup{},
1✔
81
                loader:                defaultLoader(conf, metricsRegistry, tracer, userProfileServiceMap, odpCacheMap, cmLoader, event.NewBatchEventProcessor),
1✔
82
                optlyMap:              cmap.New(),
1✔
83
                userProfileServiceMap: userProfileServiceMap,
1✔
84
                odpCacheMap:           odpCacheMap,
1✔
85
        }
1✔
86

1✔
87
        return cache
1✔
88
}
89

90
// Init takes a slice of sdkKeys to warm the cache upon startup
91
func (c *OptlyCache) Init(sdkKeys []string) {
1✔
92
        for _, sdkKey := range sdkKeys {
3✔
93
                if _, err := c.GetClient(sdkKey); err != nil {
2✔
94
                        message := "Failed to initialize Optimizely Client."
×
95
                        if ShouldIncludeSDKKey {
×
96
                                log.Warn().Str("sdkKey", sdkKey).Msg(message)
×
97
                                continue
×
98
                        }
99
                        log.Warn().Msg(message)
×
100
                }
101
        }
102
}
103

104
// GetClient is used to fetch an instance of the OptlyClient when the SDK Key is explicitly supplied.
105
func (c *OptlyCache) GetClient(sdkKey string) (*OptlyClient, error) {
11✔
106
        val, ok := c.optlyMap.Get(sdkKey)
11✔
107
        if ok {
12✔
108
                return val.(*OptlyClient), nil
1✔
109
        }
1✔
110

111
        oc, err := c.loader(sdkKey)
10✔
112
        if err != nil {
11✔
113
                return oc, err
1✔
114
        }
1✔
115

116
        set := c.optlyMap.SetIfAbsent(sdkKey, oc)
9✔
117
        if set {
18✔
118
                c.wg.Add(1)
9✔
119
                go func() {
18✔
120
                        defer c.wg.Done()
9✔
121
                        <-c.ctx.Done()
9✔
122
                        oc.Close()
9✔
123
                }()
9✔
124
                return oc, err
9✔
125
        }
126

127
        // Clean-up to not leave any lingering un-unused goroutines
128
        go oc.Close()
×
129

×
130
        // If we didn't "set" the key in this method execution then it was set in another thread.
×
131
        // Recursively lookuping up the SDK key "should" only happen once.
×
132
        return c.GetClient(sdkKey)
×
133
}
134

135
// UpdateConfigs is used to update config for all clients corresponding to a particular SDK key.
136
func (c *OptlyCache) UpdateConfigs(sdkKey string) {
1✔
137
        for clientInfo := range c.optlyMap.IterBuffered() {
4✔
138
                if strings.HasPrefix(clientInfo.Key, sdkKey) {
6✔
139
                        optlyClient, ok := clientInfo.Val.(*OptlyClient)
3✔
140
                        if !ok {
3✔
141
                                log.Error().Msgf("Value not instance of OptlyClient.")
×
142
                        }
×
143
                        optlyClient.UpdateConfig()
3✔
144
                }
145
        }
146
}
147

148
// SetUserProfileService sets userProfileService to be used for the given sdkKey
149
func (c *OptlyCache) SetUserProfileService(sdkKey, userProfileService string) {
7✔
150
        c.userProfileServiceMap.SetIfAbsent(sdkKey, userProfileService)
7✔
151
}
7✔
152

153
// SetODPCache sets odpCache to be used for the given sdkKey
154
func (c *OptlyCache) SetODPCache(sdkKey, odpCache string) {
7✔
155
        c.odpCacheMap.SetIfAbsent(sdkKey, odpCache)
7✔
156
}
7✔
157

158
// Wait for all optimizely clients to gracefully shutdown
159
func (c *OptlyCache) Wait() {
20✔
160
        c.wg.Wait()
20✔
161
}
20✔
162

163
// ErrValidationFailure is returned when the provided SDK key fails initial validation
164
var ErrValidationFailure = errors.New("sdkKey failed validation")
165

166
func regexValidator(sdkKeyRegex string) func(string) bool {
26✔
167
        r, err := regexp.Compile(sdkKeyRegex)
26✔
168
        if err != nil {
26✔
169
                log.Fatal().Err(err).Msgf("invalid sdkKeyRegex configuration")
×
170
        }
×
171

172
        return r.MatchString
26✔
173
}
174

175
func defaultLoader(
176
        agentConf config.AgentConfig,
177
        metricsRegistry *MetricsRegistry,
178
        tracer trace.Tracer,
179
        userProfileServiceMap cmap.ConcurrentMap,
180
        odpCacheMap cmap.ConcurrentMap,
181
        pcFactory func(sdkKey string, options ...sdkconfig.OptionFunc) SyncedConfigManager,
182
        bpFactory func(options ...event.BPOptionConfig) *event.BatchEventProcessor) func(clientKey string) (*OptlyClient, error) {
25✔
183
        clientConf := agentConf.Client
25✔
184
        validator := regexValidator(clientConf.SdkKeyRegex)
25✔
185

25✔
186
        return func(clientKey string) (*OptlyClient, error) {
50✔
187
                var sdkKey string
25✔
188
                var datafileAccessToken string
25✔
189
                var configManager SyncedConfigManager
25✔
190

25✔
191
                if !validator(clientKey) {
26✔
192
                        message := "failed to validate sdk key"
1✔
193
                        if ShouldIncludeSDKKey {
2✔
194
                                log.Warn().Msgf("%v: %q", message, sdkKey)
1✔
195
                        } else {
1✔
196
                                log.Warn().Msg(message)
×
197
                        }
×
198
                        return &OptlyClient{}, ErrValidationFailure
1✔
199
                }
200

201
                clientKeySplit := strings.Split(clientKey, ":")
24✔
202

24✔
203
                // If there is a : then it is an authenticated datafile.
24✔
204
                // First part is the sdkKey.
24✔
205
                // Second part is the datafileAccessToken
24✔
206
                sdkKey = clientKeySplit[0]
24✔
207
                if len(clientKeySplit) == 2 {
24✔
208
                        datafileAccessToken = clientKeySplit[1]
×
209
                }
×
210

211
                message := "Loading Optimizely instance"
24✔
212
                if ShouldIncludeSDKKey {
48✔
213
                        log.Info().Str("sdkKey", sdkKey).Msg(message)
24✔
214
                } else {
24✔
215
                        log.Info().Msg(message)
×
216
                }
×
217

218
                if datafileAccessToken != "" {
24✔
219
                        configManager = pcFactory(
×
220
                                sdkKey,
×
221
                                sdkconfig.WithPollingInterval(clientConf.PollingInterval),
×
222
                                sdkconfig.WithDatafileURLTemplate(clientConf.DatafileURLTemplate),
×
223
                                sdkconfig.WithDatafileAccessToken(datafileAccessToken),
×
224
                        )
×
225
                } else {
24✔
226
                        configManager = pcFactory(
24✔
227
                                sdkKey,
24✔
228
                                sdkconfig.WithPollingInterval(clientConf.PollingInterval),
24✔
229
                                sdkconfig.WithDatafileURLTemplate(clientConf.DatafileURLTemplate),
24✔
230
                        )
24✔
231
                }
24✔
232

233
                if _, err := configManager.GetConfig(); err != nil {
24✔
234
                        return &OptlyClient{}, err
×
235
                }
×
236

237
                q := event.NewInMemoryQueue(clientConf.QueueSize)
24✔
238
                ep := bpFactory(
24✔
239
                        event.WithSDKKey(sdkKey),
24✔
240
                        event.WithQueueSize(clientConf.QueueSize),
24✔
241
                        event.WithBatchSize(clientConf.BatchSize),
24✔
242
                        event.WithEventEndPoint(clientConf.EventURL),
24✔
243
                        event.WithFlushInterval(clientConf.FlushInterval),
24✔
244
                        event.WithQueue(q),
24✔
245
                        event.WithEventDispatcherMetrics(metricsRegistry),
24✔
246
                )
24✔
247

24✔
248
                forcedVariations := decision.NewMapExperimentOverridesStore()
24✔
249
                optimizelyFactory := &client.OptimizelyFactory{SDKKey: sdkKey}
24✔
250

24✔
251
                clientOptions := []client.OptionFunc{
24✔
252
                        client.WithConfigManager(configManager),
24✔
253
                        client.WithExperimentOverrides(forcedVariations),
24✔
254
                        client.WithEventProcessor(ep),
24✔
255
                        client.WithOdpDisabled(clientConf.ODP.Disable),
24✔
256
                        client.WithTracer(tracing.NewOtelTracer(tracer)),
24✔
257
                }
24✔
258

24✔
259
                if agentConf.Synchronization.Notification.Enable {
24✔
260
                        syncedNC, err := syncer.NewSyncedNotificationCenter(context.Background(), sdkKey, agentConf.Synchronization)
×
261
                        if err != nil {
×
262
                                log.Error().Err(err).Msgf("Failed to create SyncedNotificationCenter, reason: %s", err.Error())
×
263
                        } else {
×
264
                                clientOptions = append(clientOptions, client.WithNotificationCenter(syncedNC))
×
265
                        }
×
266
                }
267

268
                var clientUserProfileService decision.UserProfileService
24✔
269
                var rawUPS = getServiceWithType(userProfileServicePlugin, sdkKey, userProfileServiceMap, clientConf.UserProfileService)
24✔
270
                // Check if ups was provided by user
24✔
271
                if rawUPS != nil {
30✔
272
                        // convert ups to UserProfileService interface
6✔
273
                        if convertedUPS, ok := rawUPS.(decision.UserProfileService); ok && convertedUPS != nil {
12✔
274
                                clientUserProfileService = convertedUPS
6✔
275
                                clientOptions = append(clientOptions, client.WithUserProfileService(clientUserProfileService))
6✔
276
                        }
6✔
277
                }
278

279
                var clientODPCache odpCachePkg.Cache
24✔
280
                var rawODPCache = getServiceWithType(odpCachePlugin, sdkKey, odpCacheMap, clientConf.ODP.SegmentsCache)
24✔
281
                // Check if odp cache was provided by user
24✔
282
                if rawODPCache != nil {
30✔
283
                        // convert odpCache to Cache interface
6✔
284
                        if convertedODPCache, ok := rawODPCache.(odpCachePkg.Cache); ok && convertedODPCache != nil {
12✔
285
                                clientODPCache = convertedODPCache
6✔
286
                        }
6✔
287
                }
288

289
                // Create segment manager with odpConfig and custom cache
290
                segmentManager := odpSegmentPkg.NewSegmentManager(
24✔
291
                        sdkKey,
24✔
292
                        odpSegmentPkg.WithAPIManager(
24✔
293
                                odpSegmentPkg.NewSegmentAPIManager(sdkKey, utils.NewHTTPRequester(logging.GetLogger(sdkKey, "SegmentAPIManager"), utils.Timeout(clientConf.ODP.SegmentsRequestTimeout))),
24✔
294
                        ),
24✔
295
                        odpSegmentPkg.WithSegmentsCache(clientODPCache),
24✔
296
                )
24✔
297

24✔
298
                // Create event manager with odpConfig
24✔
299
                eventManager := odpEventPkg.NewBatchEventManager(
24✔
300
                        odpEventPkg.WithAPIManager(
24✔
301
                                odpEventPkg.NewEventAPIManager(
24✔
302
                                        sdkKey, utils.NewHTTPRequester(logging.GetLogger(sdkKey, "EventAPIManager"), utils.Timeout(clientConf.ODP.EventsRequestTimeout)),
24✔
303
                                ),
24✔
304
                        ),
24✔
305
                        odpEventPkg.WithFlushInterval(clientConf.ODP.EventsFlushInterval),
24✔
306
                )
24✔
307

24✔
308
                // Create odp manager with custom segment and event manager
24✔
309
                odpManager := odp.NewOdpManager(
24✔
310
                        sdkKey,
24✔
311
                        clientConf.ODP.Disable,
24✔
312
                        odp.WithSegmentManager(segmentManager),
24✔
313
                        odp.WithEventManager(eventManager),
24✔
314
                )
24✔
315
                clientOptions = append(clientOptions, client.WithOdpManager(odpManager))
24✔
316

24✔
317
                // Configure CMAB prediction endpoint from environment variable
24✔
318
                // This allows FSC tests to override the endpoint by setting OPTIMIZELY_CMAB_PREDICTIONENDPOINT
24✔
319
                if cmabEndpoint := os.Getenv("OPTIMIZELY_CMAB_PREDICTIONENDPOINT"); cmabEndpoint != "" {
24✔
320
                        // Set the global variable that go-sdk uses (FSC already includes the /%s format)
×
321
                        cmab.CMABPredictionEndpoint = cmabEndpoint
×
322
                        log.Info().Str("endpoint", cmabEndpoint).Msg("Using custom CMAB prediction endpoint")
×
323
                }
×
324

325
                // Parse CMAB cache configuration
326
                cacheSize := clientConf.CMAB.Cache.Size
24✔
327
                if cacheSize == 0 {
43✔
328
                        cacheSize = cmab.DefaultCacheSize
19✔
329
                }
19✔
330

331
                cacheTTL := clientConf.CMAB.Cache.TTL
24✔
332
                if cacheTTL == 0 {
40✔
333
                        cacheTTL = cmab.DefaultCacheTTL
16✔
334
                }
16✔
335

336
                // Create retry config
337
                retryConfig := &cmab.RetryConfig{
24✔
338
                        MaxRetries:        clientConf.CMAB.RetryConfig.MaxRetries,
24✔
339
                        InitialBackoff:    clientConf.CMAB.RetryConfig.InitialBackoff,
24✔
340
                        MaxBackoff:        clientConf.CMAB.RetryConfig.MaxBackoff,
24✔
341
                        BackoffMultiplier: clientConf.CMAB.RetryConfig.BackoffMultiplier,
24✔
342
                }
24✔
343

24✔
344
                // Apply defaults for retry config if not set
24✔
345
                if retryConfig.MaxRetries == 0 {
43✔
346
                        retryConfig.MaxRetries = cmab.DefaultMaxRetries
19✔
347
                }
19✔
348
                if retryConfig.InitialBackoff == 0 {
44✔
349
                        retryConfig.InitialBackoff = cmab.DefaultInitialBackoff
20✔
350
                }
20✔
351
                if retryConfig.MaxBackoff == 0 {
44✔
352
                        retryConfig.MaxBackoff = cmab.DefaultMaxBackoff
20✔
353
                }
20✔
354
                if retryConfig.BackoffMultiplier == 0 {
44✔
355
                        retryConfig.BackoffMultiplier = cmab.DefaultBackoffMultiplier
20✔
356
                }
20✔
357

358
                // Create CMAB config (NO endpoint configuration - not configurable)
359
                cmabConfig := cmab.Config{
24✔
360
                        CacheSize:   cacheSize,
24✔
361
                        CacheTTL:    cacheTTL,
24✔
362
                        HTTPTimeout: clientConf.CMAB.RequestTimeout,
24✔
363
                        RetryConfig: retryConfig,
24✔
364
                }
24✔
365

24✔
366
                // Add to client options
24✔
367
                clientOptions = append(clientOptions, client.WithCmabConfig(&cmabConfig))
24✔
368

24✔
369
                optimizelyClient, err := optimizelyFactory.Client(
24✔
370
                        clientOptions...,
24✔
371
                )
24✔
372
                return &OptlyClient{optimizelyClient, configManager, forcedVariations, clientUserProfileService, clientODPCache}, err
24✔
373
        }
374
}
375

376
func getServiceWithType(serviceType, sdkKey string, serviceMap cmap.ConcurrentMap, serviceConf map[string]interface{}) interface{} {
60✔
377

60✔
378
        intializeServiceWithName := func(serviceName string) interface{} {
84✔
379
                if clientConfigMap, ok := serviceConf["services"].(map[string]interface{}); ok {
46✔
380
                        if serviceConfig, ok := clientConfigMap[serviceName].(map[string]interface{}); ok {
42✔
381
                                // Check if any such service was added using `Add` method
20✔
382
                                var serviceInstance interface{}
20✔
383
                                switch serviceType {
20✔
384
                                case userProfileServicePlugin:
10✔
385
                                        if upsCreator, ok := userprofileservice.Creators[serviceName]; ok {
20✔
386
                                                serviceInstance = upsCreator()
10✔
387
                                        }
10✔
388
                                case odpCachePlugin:
10✔
389
                                        if odpCreator, ok := odpcache.Creators[serviceName]; ok {
20✔
390
                                                serviceInstance = odpCreator()
10✔
391
                                        }
10✔
UNCOV
392
                                default:
×
393
                                }
394

395
                                if serviceInstance != nil {
36✔
396
                                        // Trying to map service from client config to struct
16✔
397
                                        if serviceConfig, err := json.Marshal(serviceConfig); err != nil {
18✔
398
                                                log.Warn().Err(err).Msgf(`Error marshaling %s config: %q`, serviceType, serviceName)
2✔
399
                                        } else if err := json.Unmarshal(serviceConfig, serviceInstance); err != nil {
18✔
400
                                                log.Warn().Err(err).Msgf(`Error unmarshalling %s config: %q`, serviceType, serviceName)
2✔
401
                                        } else {
14✔
402
                                                log.Info().Msgf(`%s of type: %q created for sdkKey: %q`, serviceType, serviceName, sdkKey)
12✔
403
                                                return serviceInstance
12✔
404
                                        }
12✔
405
                                }
406
                                return nil
8✔
407
                        }
408
                }
409
                return nil
4✔
410
        }
411

412
        // Check if service name was provided in the request headers
413
        if service, ok := serviceMap.Get(sdkKey); ok {
74✔
414
                if serviceNameStr, ok := service.(string); ok && serviceNameStr != "" {
28✔
415
                        return intializeServiceWithName(serviceNameStr)
14✔
416
                }
14✔
417
        }
418

419
        // Check if any default service was provided and if it exists in client config
420
        if defaultServiceName, isAvailable := serviceConf["default"].(string); isAvailable && defaultServiceName != "" {
56✔
421
                return intializeServiceWithName(defaultServiceName)
10✔
422
        }
10✔
423
        return nil
36✔
424
}
425

426
// ResetClient removes the optimizely client from cache to ensure clean state for testing
427
// This is primarily used by FSC tests to clear CMAB cache between test scenarios
428
func (c *OptlyCache) ResetClient(sdkKey string) {
2✔
429
        // Remove the client from the cache
2✔
430
        if val, exists := c.optlyMap.Get(sdkKey); exists {
3✔
431
                c.optlyMap.Remove(sdkKey)
1✔
432

1✔
433
                // Close the client to clean up resources
1✔
434
                if client, ok := val.(*OptlyClient); ok {
2✔
435
                        client.Close()
1✔
436
                }
1✔
437

438
                message := "Reset Optimizely client for testing"
1✔
439
                if ShouldIncludeSDKKey {
2✔
440
                        log.Info().Str("sdkKey", sdkKey).Msg(message)
1✔
441
                } else {
1✔
UNCOV
442
                        log.Info().Msg(message)
×
UNCOV
443
                }
×
444
        }
445
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc