• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

optimizely / agent / 7097742085

05 Dec 2023 07:55AM UTC coverage: 86.037% (-0.9%) from 86.9%
7097742085

push

github

web-flow
[FSSDK-9631] feat: add datafile syncer to synchronize datafile across agent nodes for webhook API (#405)

* feat: add redis syncer for webhook

* Modify syncer

* fix bug

* add various fix

* refactor code

* refactor code

* add unit test

* improve logging

* add unit test for pubsub

* add unit test

* add unit test

* add unit test

* add unit test

* refactor code

* cleanup

* update config

* add review changes

* update config doc

115 of 162 new or added lines in 9 files covered. (70.99%)

1 existing line in 1 file now uncovered.

2662 of 3094 relevant lines covered (86.04%)

9.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.19
/pkg/optimizely/cache.go
1
/****************************************************************************
2
 * Copyright 2019,2022-2023, Optimizely, Inc. and contributors              *
3
 *                                                                          *
4
 * Licensed under the Apache License, Version 2.0 (the "License");          *
5
 * you may not use this file except in compliance with the License.         *
6
 * You may obtain a copy of the License at                                  *
7
 *                                                                          *
8
 *    http://www.apache.org/licenses/LICENSE-2.0                            *
9
 *                                                                          *
10
 * Unless required by applicable law or agreed to in writing, software      *
11
 * distributed under the License is distributed on an "AS IS" BASIS,        *
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
13
 * See the License for the specific language governing permissions and      *
14
 * limitations under the License.                                           *
15
 ***************************************************************************/
16

17
// Package optimizely wraps the Optimizely SDK
18
package optimizely
19

20
import (
21
        "context"
22
        "encoding/json"
23
        "errors"
24
        "regexp"
25
        "strings"
26
        "sync"
27

28
        "github.com/optimizely/agent/config"
29
        "github.com/optimizely/agent/pkg/syncer"
30
        "github.com/optimizely/agent/plugins/odpcache"
31
        "github.com/optimizely/agent/plugins/userprofileservice"
32
        "github.com/optimizely/go-sdk/pkg/client"
33
        sdkconfig "github.com/optimizely/go-sdk/pkg/config"
34
        "github.com/optimizely/go-sdk/pkg/decision"
35
        "github.com/optimizely/go-sdk/pkg/event"
36
        "github.com/optimizely/go-sdk/pkg/logging"
37
        "github.com/optimizely/go-sdk/pkg/odp"
38
        odpEventPkg "github.com/optimizely/go-sdk/pkg/odp/event"
39
        odpSegmentPkg "github.com/optimizely/go-sdk/pkg/odp/segment"
40
        "github.com/optimizely/go-sdk/pkg/utils"
41

42
        odpCachePkg "github.com/optimizely/go-sdk/pkg/odp/cache"
43
        cmap "github.com/orcaman/concurrent-map"
44
        "github.com/rs/zerolog/log"
45
)
46

47
// User plugin strings required for internal usage
48
const (
49
        userProfileServicePlugin = "UserProfileService"
50
        odpCachePlugin           = "ODP Cache"
51
)
52

53
// OptlyCache implements the Cache interface backed by a concurrent map.
54
// The default OptlyClient lookup is based on supplied configuration via env variables.
55
type OptlyCache struct {
56
        loader                func(string) (*OptlyClient, error)
57
        optlyMap              cmap.ConcurrentMap
58
        userProfileServiceMap cmap.ConcurrentMap
59
        odpCacheMap           cmap.ConcurrentMap
60
        ctx                   context.Context
61
        wg                    sync.WaitGroup
62
}
63

64
// NewCache returns a new implementation of OptlyCache interface backed by a concurrent map.
65
func NewCache(ctx context.Context, conf config.AgentConfig, metricsRegistry *MetricsRegistry) *OptlyCache {
1✔
66

1✔
67
        // TODO is there a cleaner way to handle this translation???
1✔
68
        cmLoader := func(sdkkey string, options ...sdkconfig.OptionFunc) SyncedConfigManager {
1✔
69
                return sdkconfig.NewPollingProjectConfigManager(sdkkey, options...)
×
70
        }
×
71

72
        userProfileServiceMap := cmap.New()
1✔
73
        odpCacheMap := cmap.New()
1✔
74
        cache := &OptlyCache{
1✔
75
                ctx:                   ctx,
1✔
76
                wg:                    sync.WaitGroup{},
1✔
77
                loader:                defaultLoader(conf, metricsRegistry, userProfileServiceMap, odpCacheMap, cmLoader, event.NewBatchEventProcessor),
1✔
78
                optlyMap:              cmap.New(),
1✔
79
                userProfileServiceMap: userProfileServiceMap,
1✔
80
                odpCacheMap:           odpCacheMap,
1✔
81
        }
1✔
82

1✔
83
        return cache
1✔
84
}
85

86
// Init takes a slice of sdkKeys to warm the cache upon startup
87
func (c *OptlyCache) Init(sdkKeys []string) {
1✔
88
        for _, sdkKey := range sdkKeys {
3✔
89
                if _, err := c.GetClient(sdkKey); err != nil {
2✔
90
                        message := "Failed to initialize Optimizely Client."
×
91
                        if ShouldIncludeSDKKey {
×
92
                                log.Warn().Str("sdkKey", sdkKey).Msg(message)
×
93
                                continue
×
94
                        }
95
                        log.Warn().Msg(message)
×
96
                }
97
        }
98
}
99

100
// GetClient is used to fetch an instance of the OptlyClient when the SDK Key is explicitly supplied.
101
func (c *OptlyCache) GetClient(sdkKey string) (*OptlyClient, error) {
10✔
102
        val, ok := c.optlyMap.Get(sdkKey)
10✔
103
        if ok {
11✔
104
                return val.(*OptlyClient), nil
1✔
105
        }
1✔
106

107
        oc, err := c.loader(sdkKey)
9✔
108
        if err != nil {
10✔
109
                return oc, err
1✔
110
        }
1✔
111

112
        set := c.optlyMap.SetIfAbsent(sdkKey, oc)
8✔
113
        if set {
16✔
114
                c.wg.Add(1)
8✔
115
                go func() {
16✔
116
                        defer c.wg.Done()
8✔
117
                        <-c.ctx.Done()
8✔
118
                        oc.Close()
8✔
119
                }()
8✔
120
                return oc, err
8✔
121
        }
122

123
        // Clean-up to not leave any lingering un-unused goroutines
124
        go oc.Close()
×
125

×
126
        // If we didn't "set" the key in this method execution then it was set in another thread.
×
127
        // Recursively lookuping up the SDK key "should" only happen once.
×
128
        return c.GetClient(sdkKey)
×
129
}
130

131
// UpdateConfigs is used to update config for all clients corresponding to a particular SDK key.
132
func (c *OptlyCache) UpdateConfigs(sdkKey string) {
1✔
133
        for clientInfo := range c.optlyMap.IterBuffered() {
4✔
134
                if strings.HasPrefix(clientInfo.Key, sdkKey) {
6✔
135
                        optlyClient, ok := clientInfo.Val.(*OptlyClient)
3✔
136
                        if !ok {
3✔
137
                                log.Error().Msgf("Value not instance of OptlyClient.")
×
138
                        }
×
139
                        optlyClient.UpdateConfig()
3✔
140
                }
141
        }
142
}
143

144
// SetUserProfileService sets userProfileService to be used for the given sdkKey
145
func (c *OptlyCache) SetUserProfileService(sdkKey, userProfileService string) {
7✔
146
        c.userProfileServiceMap.SetIfAbsent(sdkKey, userProfileService)
7✔
147
}
7✔
148

149
// SetODPCache sets odpCache to be used for the given sdkKey
150
func (c *OptlyCache) SetODPCache(sdkKey, odpCache string) {
7✔
151
        c.odpCacheMap.SetIfAbsent(sdkKey, odpCache)
7✔
152
}
7✔
153

154
// Wait for all optimizely clients to gracefully shutdown
155
func (c *OptlyCache) Wait() {
18✔
156
        c.wg.Wait()
18✔
157
}
18✔
158

159
// ErrValidationFailure is returned when the provided SDK key fails initial validation
160
var ErrValidationFailure = errors.New("sdkKey failed validation")
161

162
func regexValidator(sdkKeyRegex string) func(string) bool {
13✔
163
        r, err := regexp.Compile(sdkKeyRegex)
13✔
164
        if err != nil {
13✔
165
                log.Fatal().Err(err).Msgf("invalid sdkKeyRegex configuration")
×
166
        }
×
167

168
        return r.MatchString
13✔
169
}
170

171
func defaultLoader(
172
        agentConf config.AgentConfig,
173
        metricsRegistry *MetricsRegistry,
174
        userProfileServiceMap cmap.ConcurrentMap,
175
        odpCacheMap cmap.ConcurrentMap,
176
        pcFactory func(sdkKey string, options ...sdkconfig.OptionFunc) SyncedConfigManager,
177
        bpFactory func(options ...event.BPOptionConfig) *event.BatchEventProcessor) func(clientKey string) (*OptlyClient, error) {
12✔
178
        clientConf := agentConf.Client
12✔
179
        validator := regexValidator(clientConf.SdkKeyRegex)
12✔
180

12✔
181
        return func(clientKey string) (*OptlyClient, error) {
24✔
182
                var sdkKey string
12✔
183
                var datafileAccessToken string
12✔
184
                var configManager SyncedConfigManager
12✔
185

12✔
186
                if !validator(clientKey) {
13✔
187
                        message := "failed to validate sdk key"
1✔
188
                        if ShouldIncludeSDKKey {
2✔
189
                                log.Warn().Msgf("%v: %q", message, sdkKey)
1✔
190
                        } else {
1✔
191
                                log.Warn().Msg(message)
×
192
                        }
×
193
                        return &OptlyClient{}, ErrValidationFailure
1✔
194
                }
195

196
                clientKeySplit := strings.Split(clientKey, ":")
11✔
197

11✔
198
                // If there is a : then it is an authenticated datafile.
11✔
199
                // First part is the sdkKey.
11✔
200
                // Second part is the datafileAccessToken
11✔
201
                sdkKey = clientKeySplit[0]
11✔
202
                if len(clientKeySplit) == 2 {
11✔
203
                        datafileAccessToken = clientKeySplit[1]
×
204
                }
×
205

206
                message := "Loading Optimizely instance"
11✔
207
                if ShouldIncludeSDKKey {
22✔
208
                        log.Info().Str("sdkKey", sdkKey).Msg(message)
11✔
209
                } else {
11✔
210
                        log.Info().Msg(message)
×
211
                }
×
212

213
                if datafileAccessToken != "" {
11✔
214
                        configManager = pcFactory(
×
215
                                sdkKey,
×
216
                                sdkconfig.WithPollingInterval(clientConf.PollingInterval),
×
217
                                sdkconfig.WithDatafileURLTemplate(clientConf.DatafileURLTemplate),
×
218
                                sdkconfig.WithDatafileAccessToken(datafileAccessToken),
×
219
                        )
×
220
                } else {
11✔
221
                        configManager = pcFactory(
11✔
222
                                sdkKey,
11✔
223
                                sdkconfig.WithPollingInterval(clientConf.PollingInterval),
11✔
224
                                sdkconfig.WithDatafileURLTemplate(clientConf.DatafileURLTemplate),
11✔
225
                        )
11✔
226
                }
11✔
227

228
                if _, err := configManager.GetConfig(); err != nil {
11✔
229
                        return &OptlyClient{}, err
×
230
                }
×
231

232
                q := event.NewInMemoryQueue(clientConf.QueueSize)
11✔
233
                ep := bpFactory(
11✔
234
                        event.WithSDKKey(sdkKey),
11✔
235
                        event.WithQueueSize(clientConf.QueueSize),
11✔
236
                        event.WithBatchSize(clientConf.BatchSize),
11✔
237
                        event.WithEventEndPoint(clientConf.EventURL),
11✔
238
                        event.WithFlushInterval(clientConf.FlushInterval),
11✔
239
                        event.WithQueue(q),
11✔
240
                        event.WithEventDispatcherMetrics(metricsRegistry),
11✔
241
                )
11✔
242

11✔
243
                forcedVariations := decision.NewMapExperimentOverridesStore()
11✔
244
                optimizelyFactory := &client.OptimizelyFactory{SDKKey: sdkKey}
11✔
245

11✔
246
                clientOptions := []client.OptionFunc{
11✔
247
                        client.WithConfigManager(configManager),
11✔
248
                        client.WithExperimentOverrides(forcedVariations),
11✔
249
                        client.WithEventProcessor(ep),
11✔
250
                        client.WithOdpDisabled(clientConf.ODP.Disable),
11✔
251
                }
11✔
252

11✔
253
                if agentConf.Synchronization.Notification.Enable {
11✔
NEW
254
                        syncedNC, err := syncer.NewSyncedNotificationCenter(context.Background(), sdkKey, agentConf.Synchronization)
×
255
                        if err != nil {
×
NEW
256
                                log.Error().Err(err).Msgf("Failed to create SyncedNotificationCenter, reason: %s", err.Error())
×
NEW
257
                        } else {
×
NEW
258
                                clientOptions = append(clientOptions, client.WithNotificationCenter(syncedNC))
×
259
                        }
×
260
                }
261

262
                var clientUserProfileService decision.UserProfileService
11✔
263
                var rawUPS = getServiceWithType(userProfileServicePlugin, sdkKey, userProfileServiceMap, clientConf.UserProfileService)
11✔
264
                // Check if ups was provided by user
11✔
265
                if rawUPS != nil {
16✔
266
                        // convert ups to UserProfileService interface
5✔
267
                        if convertedUPS, ok := rawUPS.(decision.UserProfileService); ok && convertedUPS != nil {
10✔
268
                                clientUserProfileService = convertedUPS
5✔
269
                                clientOptions = append(clientOptions, client.WithUserProfileService(clientUserProfileService))
5✔
270
                        }
5✔
271
                }
272

273
                var clientODPCache odpCachePkg.Cache
11✔
274
                var rawODPCache = getServiceWithType(odpCachePlugin, sdkKey, odpCacheMap, clientConf.ODP.SegmentsCache)
11✔
275
                // Check if odp cache was provided by user
11✔
276
                if rawODPCache != nil {
16✔
277
                        // convert odpCache to Cache interface
5✔
278
                        if convertedODPCache, ok := rawODPCache.(odpCachePkg.Cache); ok && convertedODPCache != nil {
10✔
279
                                clientODPCache = convertedODPCache
5✔
280
                        }
5✔
281
                }
282

283
                // Create segment manager with odpConfig and custom cache
284
                segmentManager := odpSegmentPkg.NewSegmentManager(
11✔
285
                        sdkKey,
11✔
286
                        odpSegmentPkg.WithAPIManager(
11✔
287
                                odpSegmentPkg.NewSegmentAPIManager(sdkKey, utils.NewHTTPRequester(logging.GetLogger(sdkKey, "SegmentAPIManager"), utils.Timeout(clientConf.ODP.SegmentsRequestTimeout))),
11✔
288
                        ),
11✔
289
                        odpSegmentPkg.WithSegmentsCache(clientODPCache),
11✔
290
                )
11✔
291

11✔
292
                // Create event manager with odpConfig
11✔
293
                eventManager := odpEventPkg.NewBatchEventManager(
11✔
294
                        odpEventPkg.WithAPIManager(
11✔
295
                                odpEventPkg.NewEventAPIManager(
11✔
296
                                        sdkKey, utils.NewHTTPRequester(logging.GetLogger(sdkKey, "EventAPIManager"), utils.Timeout(clientConf.ODP.EventsRequestTimeout)),
11✔
297
                                ),
11✔
298
                        ),
11✔
299
                        odpEventPkg.WithFlushInterval(clientConf.ODP.EventsFlushInterval),
11✔
300
                )
11✔
301

11✔
302
                // Create odp manager with custom segment and event manager
11✔
303
                odpManager := odp.NewOdpManager(
11✔
304
                        sdkKey,
11✔
305
                        clientConf.ODP.Disable,
11✔
306
                        odp.WithSegmentManager(segmentManager),
11✔
307
                        odp.WithEventManager(eventManager),
11✔
308
                )
11✔
309
                clientOptions = append(clientOptions, client.WithOdpManager(odpManager))
11✔
310

11✔
311
                optimizelyClient, err := optimizelyFactory.Client(
11✔
312
                        clientOptions...,
11✔
313
                )
11✔
314
                return &OptlyClient{optimizelyClient, configManager, forcedVariations, clientUserProfileService, clientODPCache}, err
11✔
315
        }
316
}
317

318
func getServiceWithType(serviceType, sdkKey string, serviceMap cmap.ConcurrentMap, serviceConf map[string]interface{}) interface{} {
34✔
319

34✔
320
        intializeServiceWithName := func(serviceName string) interface{} {
56✔
321
                if clientConfigMap, ok := serviceConf["services"].(map[string]interface{}); ok {
42✔
322
                        if serviceConfig, ok := clientConfigMap[serviceName].(map[string]interface{}); ok {
38✔
323
                                // Check if any such service was added using `Add` method
18✔
324
                                var serviceInstance interface{}
18✔
325
                                switch serviceType {
18✔
326
                                case userProfileServicePlugin:
9✔
327
                                        if upsCreator, ok := userprofileservice.Creators[serviceName]; ok {
18✔
328
                                                serviceInstance = upsCreator()
9✔
329
                                        }
9✔
330
                                case odpCachePlugin:
9✔
331
                                        if odpCreator, ok := odpcache.Creators[serviceName]; ok {
18✔
332
                                                serviceInstance = odpCreator()
9✔
333
                                        }
9✔
334
                                default:
×
335
                                }
336

337
                                if serviceInstance != nil {
32✔
338
                                        // Trying to map service from client config to struct
14✔
339
                                        if serviceConfig, err := json.Marshal(serviceConfig); err != nil {
16✔
340
                                                log.Warn().Err(err).Msgf(`Error marshaling %s config: %q`, serviceType, serviceName)
2✔
341
                                        } else if err := json.Unmarshal(serviceConfig, serviceInstance); err != nil {
16✔
342
                                                log.Warn().Err(err).Msgf(`Error unmarshalling %s config: %q`, serviceType, serviceName)
2✔
343
                                        } else {
12✔
344
                                                log.Info().Msgf(`%s of type: %q created for sdkKey: %q`, serviceType, serviceName, sdkKey)
10✔
345
                                                return serviceInstance
10✔
346
                                        }
10✔
347
                                }
348
                                return nil
8✔
349
                        }
350
                }
351
                return nil
4✔
352
        }
353

354
        // Check if service name was provided in the request headers
355
        if service, ok := serviceMap.Get(sdkKey); ok {
48✔
356
                if serviceNameStr, ok := service.(string); ok && serviceNameStr != "" {
28✔
357
                        return intializeServiceWithName(serviceNameStr)
14✔
358
                }
14✔
359
        }
360

361
        // Check if any default service was provided and if it exists in client config
362
        if defaultServiceName, isAvailable := serviceConf["default"].(string); isAvailable && defaultServiceName != "" {
28✔
363
                return intializeServiceWithName(defaultServiceName)
8✔
364
        }
8✔
365
        return nil
12✔
366
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc