• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

achannarasappa / ticker / 15313856726

29 May 2025 01:02AM UTC coverage: 88.382% (-0.06%) from 88.444%
15313856726

push

github

achannarasappa
fix: update to v5 in module path

2868 of 3245 relevant lines covered (88.38%)

8.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.84
/internal/monitor/monitor.go
1
package monitor
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "log"
8
        "sync"
9
        "time"
10

11
        c "github.com/achannarasappa/ticker/v5/internal/common"
12
        monitorPriceCoinbase "github.com/achannarasappa/ticker/v5/internal/monitor/coinbase/monitor-price"
13
        monitorCurrencyRate "github.com/achannarasappa/ticker/v5/internal/monitor/yahoo/monitor-currency-rates"
14
        monitorPriceYahoo "github.com/achannarasappa/ticker/v5/internal/monitor/yahoo/monitor-price"
15
        unaryClientYahoo "github.com/achannarasappa/ticker/v5/internal/monitor/yahoo/unary"
16
)
17

18
// Monitor represents an overall monitor which manages API specific monitors
19
type Monitor struct {
20
        monitors                map[c.QuoteSource]c.Monitor
21
        monitorCurrencyRate     c.MonitorCurrencyRate
22
        chanError               chan error
23
        chanUpdateAssetQuote    chan c.MessageUpdate[c.AssetQuote]
24
        chanUpdateCurrencyRates chan c.CurrencyRates
25
        onUpdateAssetQuote      func(symbol string, assetQuote c.AssetQuote, versionVector int)
26
        onUpdateAssetGroupQuote func(assetGroupQuote c.AssetGroupQuote, versionVector int)
27
        assetGroupVersionVector int
28
        assetGroup              c.AssetGroup
29
        mu                      sync.RWMutex
30
        logger                  *log.Logger
31
        ctx                     context.Context
32
        cancel                  context.CancelFunc
33
}
34

35
// ConfigMonitor represents the configuration for the main monitor
36
type ConfigMonitor struct {
37
        RefreshInterval int
38
        TargetCurrency  string
39
        Logger          *log.Logger
40
        ConfigMonitorPriceCoinbase
41
        ConfigMonitorsYahoo
42
}
43

44
// ConfigMonitorPriceCoinbase represents the configuration for the Coinbase monitor
45
type ConfigMonitorPriceCoinbase struct {
46
        BaseURL      string
47
        StreamingURL string
48
}
49

50
// ConfigMonitorsYahoo represents the configuration for the Yahoo monitors (price and currency rate)
51
type ConfigMonitorsYahoo struct {
52
        BaseURL           string
53
        SessionRootURL    string
54
        SessionCrumbURL   string
55
        SessionConsentURL string
56
}
57

58
// ConfigUpdateFns represents the callback functions for when asset quotes are updated
59
type ConfigUpdateFns struct {
60
        OnUpdateAssetQuote      func(symbol string, assetQuote c.AssetQuote, versionVector int)
61
        OnUpdateAssetGroupQuote func(assetGroupQuote c.AssetGroupQuote, versionVector int)
62
}
63

64
// New creates a new instance of the Coinbase monitor
65
func NewMonitor(configMonitor ConfigMonitor) (*Monitor, error) {
7✔
66

7✔
67
        chanError := make(chan error, 5)
7✔
68
        chanUpdateAssetQuote := make(chan c.MessageUpdate[c.AssetQuote], 10)
7✔
69
        chanUpdateCurrencyRate := make(chan c.CurrencyRates, 10)
7✔
70
        chanRequestCurrencyRate := make(chan []string, 10)
7✔
71

7✔
72
        ctx, cancel := context.WithCancel(context.Background())
7✔
73

7✔
74
        coinbase := monitorPriceCoinbase.NewMonitorPriceCoinbase(
7✔
75
                monitorPriceCoinbase.Config{
7✔
76
                        Ctx:                      ctx,
7✔
77
                        UnaryURL:                 configMonitor.ConfigMonitorPriceCoinbase.BaseURL,
7✔
78
                        ChanError:                chanError,
7✔
79
                        ChanUpdateAssetQuote:     chanUpdateAssetQuote,
7✔
80
                        ChanRequestCurrencyRates: chanRequestCurrencyRate,
7✔
81
                },
7✔
82
                monitorPriceCoinbase.WithStreamingURL(configMonitor.ConfigMonitorPriceCoinbase.StreamingURL),
7✔
83
                monitorPriceCoinbase.WithRefreshInterval(time.Duration(configMonitor.RefreshInterval)*time.Second),
7✔
84
        )
7✔
85

7✔
86
        // Create and configure the API client for the Yahoo API shared between monitors
7✔
87
        unaryAPI := unaryClientYahoo.NewUnaryAPI(unaryClientYahoo.Config{
7✔
88
                BaseURL:           configMonitor.ConfigMonitorsYahoo.BaseURL,
7✔
89
                SessionRootURL:    configMonitor.ConfigMonitorsYahoo.SessionRootURL,
7✔
90
                SessionCrumbURL:   configMonitor.ConfigMonitorsYahoo.SessionCrumbURL,
7✔
91
                SessionConsentURL: configMonitor.ConfigMonitorsYahoo.SessionConsentURL,
7✔
92
        })
7✔
93

7✔
94
        yahoo := monitorPriceYahoo.NewMonitorPriceYahoo(
7✔
95
                monitorPriceYahoo.Config{
7✔
96
                        Ctx:                      ctx,
7✔
97
                        UnaryAPI:                 unaryAPI,
7✔
98
                        ChanError:                chanError,
7✔
99
                        ChanUpdateAssetQuote:     chanUpdateAssetQuote,
7✔
100
                        ChanRequestCurrencyRates: chanRequestCurrencyRate,
7✔
101
                },
7✔
102
                monitorPriceYahoo.WithRefreshInterval(time.Duration(configMonitor.RefreshInterval)*time.Second),
7✔
103
        )
7✔
104

7✔
105
        yahooCurrencyRate := monitorCurrencyRate.NewMonitorCurrencyRateYahoo(
7✔
106
                monitorCurrencyRate.Config{
7✔
107
                        Ctx:                      ctx,
7✔
108
                        UnaryAPI:                 unaryAPI,
7✔
109
                        ChanUpdateCurrencyRates:  chanUpdateCurrencyRate,
7✔
110
                        ChanRequestCurrencyRates: chanRequestCurrencyRate,
7✔
111
                        ChanError:                chanError,
7✔
112
                },
7✔
113
        )
7✔
114

7✔
115
        yahooCurrencyRate.SetTargetCurrency(configMonitor.TargetCurrency)
7✔
116

7✔
117
        m := &Monitor{
7✔
118
                monitors: map[c.QuoteSource]c.Monitor{
7✔
119
                        c.QuoteSourceCoinbase: coinbase,
7✔
120
                        c.QuoteSourceYahoo:    yahoo,
7✔
121
                },
7✔
122
                monitorCurrencyRate:     yahooCurrencyRate,
7✔
123
                chanUpdateAssetQuote:    chanUpdateAssetQuote,
7✔
124
                chanUpdateCurrencyRates: chanUpdateCurrencyRate,
7✔
125
                chanError:               chanError,
7✔
126
                onUpdateAssetGroupQuote: func(assetGroupQuote c.AssetGroupQuote, versionVector int) {},
11✔
127
                onUpdateAssetQuote:      func(symbol string, assetQuote c.AssetQuote, versionVector int) {},
×
128
                logger:                  configMonitor.Logger,
129
                ctx:                     ctx,
130
                cancel:                  cancel,
131
        }
132

133
        return m, nil
7✔
134
}
135

136
// SetAssetGroup sets the asset group for the monitor
137
func (m *Monitor) SetAssetGroup(assetGroup c.AssetGroup, versionVector int) error {
5✔
138
        var wg sync.WaitGroup
5✔
139

5✔
140
        // Create a channel for timeout
5✔
141
        done := make(chan bool)
5✔
142
        // Create error channel for collecting errors from each monitor
5✔
143
        chanError := make(chan error, len(assetGroup.SymbolsBySource))
5✔
144
        // Create a slice to collect errors
5✔
145
        var errors []error
5✔
146

5✔
147
        // Concurrently set symbols for each monitor (execute a synchronous call to update quotes for each monitor)
5✔
148
        for _, symbolBySource := range assetGroup.SymbolsBySource {
11✔
149
                if monitor, exists := m.monitors[symbolBySource.Source]; exists {
12✔
150
                        wg.Add(1)
6✔
151
                        go func(mon c.Monitor, symbols []string) {
12✔
152
                                defer wg.Done()
6✔
153
                                err := mon.SetSymbols(symbols, versionVector)
6✔
154
                                if err != nil {
7✔
155
                                        chanError <- err
1✔
156
                                }
1✔
157
                        }(monitor, symbolBySource.Symbols)
158
                }
159
        }
160

161
        // Wait for the waitgroup to finish in the background
162
        go func() {
10✔
163
                wg.Wait()
5✔
164
                close(done)
5✔
165
        }()
5✔
166

167
        // Continue when the waitgroup is finished or a timeout is reached
168
        timeout := time.After(3 * time.Second)
5✔
169
        for {
11✔
170
                select {
6✔
171
                case <-done:
4✔
172
                        // If there are any errors, return them
4✔
173
                        if len(errors) > 0 {
5✔
174

1✔
175
                                return fmt.Errorf("errors setting symbols on monitor(s): %v", errors)
1✔
176
                        }
1✔
177

178
                        goto Continue
3✔
179
                case err := <-chanError:
1✔
180
                        errors = append(errors, err)
1✔
181
                case <-timeout:
1✔
182

1✔
183
                        // If there are any errors, return them along with the timeout error
1✔
184
                        return fmt.Errorf("timeout waiting for monitor(s) to set symbols. Additional non-timeout errors: %v", errors)
1✔
185
                }
186
        }
187
Continue:
188

189
        m.mu.Lock()
3✔
190
        defer m.mu.Unlock()
3✔
191

3✔
192
        // Update the versionVector so that any messages from the previous asset group can be ignored
3✔
193
        m.assetGroupVersionVector = versionVector
3✔
194
        m.assetGroup = assetGroup
3✔
195

3✔
196
        // Get asset quotes for all sources
3✔
197
        assetGroupQuote := m.GetAssetGroupQuote()
3✔
198

3✔
199
        // Run the callback in a goroutine to avoid blocking
3✔
200
        go m.onUpdateAssetGroupQuote(assetGroupQuote, versionVector)
3✔
201

3✔
202
        return nil
3✔
203
}
204

205
// SetOnUpdate sets the callback functions for when asset quotes are updated
206
func (m *Monitor) SetOnUpdate(config ConfigUpdateFns) error {
4✔
207

4✔
208
        if config.OnUpdateAssetQuote == nil || config.OnUpdateAssetGroupQuote == nil {
6✔
209
                return errors.New("onUpdateAssetQuote and onUpdateAssetGroupQuote must be set")
2✔
210
        }
2✔
211

212
        m.onUpdateAssetQuote = config.OnUpdateAssetQuote
2✔
213
        m.onUpdateAssetGroupQuote = config.OnUpdateAssetGroupQuote
2✔
214

2✔
215
        return nil
2✔
216
}
217

218
// Start starts all monitors
219
func (m *Monitor) Start() {
3✔
220

3✔
221
        m.monitorCurrencyRate.Start() //nolint:errcheck
3✔
222

3✔
223
        for _, monitor := range m.monitors {
9✔
224
                monitor.Start() //nolint:errcheck
6✔
225
        }
6✔
226

227
        go m.handleUpdates()
3✔
228
}
229

230
// GetAssetGroupQuote synchronously gets price quotes a group of assets across all sources
231
func (m *Monitor) GetAssetGroupQuote(ignoreCache ...bool) c.AssetGroupQuote {
5✔
232

5✔
233
        assetQuotesFromAllSources := make([]c.AssetQuote, 0)
5✔
234

5✔
235
        for _, symbolBySource := range m.assetGroup.SymbolsBySource {
11✔
236

6✔
237
                assetQuotes, _ := m.monitors[symbolBySource.Source].GetAssetQuotes(ignoreCache...)
6✔
238
                assetQuotesFromAllSources = append(assetQuotesFromAllSources, assetQuotes...)
6✔
239

6✔
240
        }
6✔
241

242
        return c.AssetGroupQuote{
5✔
243
                AssetQuotes: assetQuotesFromAllSources,
5✔
244
                AssetGroup:  m.assetGroup,
5✔
245
        }
5✔
246
}
247

248
// handleUpdates listens for asset quote updates and errors from monitors
249
func (m *Monitor) handleUpdates() {
3✔
250
        for {
10✔
251
                select {
7✔
252
                case <-m.ctx.Done():
3✔
253

3✔
254
                        return
3✔
255
                case update := <-m.chanUpdateAssetQuote:
1✔
256

1✔
257
                        m.mu.RLock()
1✔
258

1✔
259
                        // Skip updates from previous asset groups
1✔
260
                        if update.VersionVector != m.assetGroupVersionVector {
1✔
261
                                m.mu.RUnlock()
×
262

×
263
                                continue
×
264
                        }
265
                        m.mu.RUnlock()
1✔
266

1✔
267
                        // Call the callback function for individual asset quote updates
1✔
268
                        go m.onUpdateAssetQuote(update.Data.Symbol, update.Data, update.VersionVector)
1✔
269

270
                case err := <-m.chanError:
1✔
271
                        // Log errors using the configured logger if one is set
1✔
272
                        if m.logger != nil {
2✔
273
                                m.logger.Printf("%v", err)
1✔
274
                        }
1✔
275

276
                case currencyRates := <-m.chanUpdateCurrencyRates:
2✔
277
                        // Set currency rates on each each monitor
2✔
278
                        for _, monitor := range m.monitors {
6✔
279
                                err := monitor.SetCurrencyRates(currencyRates)
4✔
280
                                if err != nil {
4✔
281
                                        m.chanError <- err
×
282
                                }
×
283
                        }
284

285
                        // Get asset quotes for all sources with new currency rates
286
                        assetGroupQuote := m.GetAssetGroupQuote()
2✔
287

2✔
288
                        // Callback with new asset quotes which include the new currency rates
2✔
289
                        go m.onUpdateAssetGroupQuote(assetGroupQuote, m.assetGroupVersionVector)
2✔
290
                }
291
        }
292
}
293

294
// Stop stops all monitors and cancels the context
295
func (m *Monitor) Stop() {
3✔
296

3✔
297
        for _, monitor := range m.monitors {
9✔
298
                monitor.Stop() //nolint:errcheck
6✔
299
        }
6✔
300

301
        m.cancel()
3✔
302

303
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc