• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cinode / go / 15811505851

22 Jun 2025 10:34PM UTC coverage: 93.768% (+1.0%) from 92.74%
15811505851

push

github

byo
Merge branch 'test-improvements'

55 of 128 new or added lines in 6 files covered. (42.97%)

3 existing lines in 1 file now uncovered.

3340 of 3562 relevant lines covered (93.77%)

1.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.97
/pkg/datastore/multisource/multi_source.go
1
/*
2
Copyright © 2025 Bartłomiej Święcki (byo)
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package multisource
18

19
import (
20
        "context"
21
        "io"
22
        "log/slog"
23
        "sync"
24
        "time"
25

26
        "github.com/cinode/go/pkg/blobtypes"
27
        "github.com/cinode/go/pkg/common"
28
        "github.com/cinode/go/pkg/datastore"
29
)
30

31
const (
32
        defaultDynamicDataRefreshTime = time.Minute
33
        defaultNotFoundRecheckTime    = time.Minute
34
)
35

36
type multiSourceDatastoreBlobState struct {
37
        lastUpdateTime           time.Time
38
        downloading              bool
39
        notFound                 bool
40
        downloadingFinishedCChan chan struct{}
41
}
42

43
type multiSourceDatastore struct {
44
        // Main datastore
45
        main datastore.DS
46

47
        // Additional sources that will be queried whenever the main source
48
        // does not contain the data or contains outdated content
49
        additional []datastore.DS
50

51
        // Average time between dynamic content refreshes
52
        dynamicDataRefreshTime time.Duration
53

54
        // Time between re-checking blob existence in additional datastores
55
        notFoundRecheckTime time.Duration
56

57
        // Last update time for blobs, either for dynamic content or last result of not found for
58
        // static ones
59
        blobStates map[string]multiSourceDatastoreBlobState
60

61
        // Guard additional sources and update time map
62
        m sync.Mutex
63

64
        // Logger output
65
        log *slog.Logger
66
}
67

68
func New(main datastore.DS, options ...Option) datastore.DS {
1✔
69
        ds := &multiSourceDatastore{
1✔
70
                main:                   main,
1✔
71
                additional:             nil,
1✔
72
                dynamicDataRefreshTime: defaultDynamicDataRefreshTime,
1✔
73
                notFoundRecheckTime:    defaultNotFoundRecheckTime,
1✔
74
                blobStates:             map[string]multiSourceDatastoreBlobState{},
1✔
75
                log:                    slog.Default(),
1✔
76
        }
1✔
77

1✔
78
        for _, option := range options {
2✔
79
                option(ds)
1✔
80
        }
1✔
81

82
        return ds
1✔
83
}
84

85
var _ datastore.DS = (*multiSourceDatastore)(nil)
86

87
func (m *multiSourceDatastore) Kind() string {
1✔
88
        return "MultiSource"
1✔
89
}
1✔
90

91
func (m *multiSourceDatastore) Address() string {
1✔
92
        return "multi-source://"
1✔
93
}
1✔
94

95
func (m *multiSourceDatastore) Open(ctx context.Context, name *common.BlobName) (io.ReadCloser, error) {
1✔
96
        m.fetch(ctx, name)
1✔
97
        return m.main.Open(ctx, name)
1✔
98
}
1✔
99

100
func (m *multiSourceDatastore) Update(ctx context.Context, name *common.BlobName, r io.Reader) error {
1✔
101
        return m.main.Update(ctx, name, r)
1✔
102
}
1✔
103

104
func (m *multiSourceDatastore) Exists(ctx context.Context, name *common.BlobName) (bool, error) {
1✔
105
        m.fetch(ctx, name)
1✔
106
        return m.main.Exists(ctx, name)
1✔
107
}
1✔
108

109
func (m *multiSourceDatastore) Delete(ctx context.Context, name *common.BlobName) error {
1✔
110
        return m.main.Delete(ctx, name)
1✔
111
}
1✔
112

113
func (m *multiSourceDatastore) fetch(ctx context.Context, name *common.BlobName) {
1✔
114
        // TODO:
1✔
115
        // if not found locally, go over all additional sources and check if exists,
1✔
116
        // for dynamic content, perform merge operation if found in more than one,
1✔
117
        // initially in the background
1✔
118
        // also if dynamic content is queried and it was not updated in enough time,
1✔
119
        // do an update process
1✔
120

1✔
121
        for {
2✔
122
                waitChan, startDownload := func() (chan struct{}, bool) {
2✔
123
                        m.m.Lock()
1✔
124
                        defer m.m.Unlock()
1✔
125

1✔
126
                        needsDownload := false
1✔
127

1✔
128
                        state, found := m.blobStates[name.String()]
1✔
129

1✔
130
                        switch {
1✔
131
                        case !found:
1✔
132
                                // Seen for the first time
1✔
133
                                needsDownload = true
1✔
134

135
                        case state.downloading:
1✔
136
                                // Blob currently being downloaded
1✔
137
                                return state.downloadingFinishedCChan, false
1✔
138

139
                        case m.needsDownload(state, name, time.Now()):
1✔
140
                                // We should update the blob
1✔
141
                                needsDownload = true
1✔
142
                        }
143

144
                        if !needsDownload {
2✔
145
                                return nil, false
1✔
146
                        }
1✔
147

148
                        // State not found, request new download
149
                        ch := make(chan struct{})
1✔
150
                        m.blobStates[name.String()] = multiSourceDatastoreBlobState{
1✔
151
                                downloading:              true,
1✔
152
                                downloadingFinishedCChan: ch,
1✔
153
                        }
1✔
154
                        return ch, true
1✔
155
                }()
156

157
                if startDownload {
2✔
158
                        m.log.Info("Starting download",
1✔
159
                                "blob", name.String(),
1✔
160
                        )
1✔
161
                        wasFound := false
1✔
162
                        for i, ds := range m.additional {
2✔
163
                                r, err := ds.Open(ctx, name)
1✔
164
                                if err != nil {
2✔
165
                                        m.log.Debug("Failed to fetch blob from additional datastore",
1✔
166
                                                "blob", name.String(),
1✔
167
                                                "datastore", ds.Address(),
1✔
168
                                                "err", err,
1✔
169
                                        )
1✔
170
                                        continue
1✔
171
                                }
172

173
                                m.log.Info("Blob found in additional datastore",
1✔
174
                                        "blob", name.String(),
1✔
175
                                        "datastore-num", i+1,
1✔
176
                                )
1✔
177
                                err = m.main.Update(ctx, name, r)
1✔
178
                                r.Close()
1✔
179
                                if err != nil {
1✔
NEW
180
                                        m.log.Error("Failed to store blob in local datastore",
×
NEW
181
                                                slog.Any("err", err),
×
NEW
182
                                                slog.String("blob", name.String()),
×
183
                                        )
×
184
                                }
×
185
                                wasFound = true
1✔
186
                        }
187
                        if !wasFound {
2✔
188
                                m.log.Warn("Did not find blob in any datastore",
1✔
189
                                        "blob", name.String(),
1✔
190
                                )
1✔
191
                        }
1✔
192
                        defer close(waitChan)
1✔
193

1✔
194
                        m.m.Lock()
1✔
195
                        defer m.m.Unlock()
1✔
196

1✔
197
                        m.blobStates[name.String()] = multiSourceDatastoreBlobState{
1✔
198
                                lastUpdateTime: time.Now(),
1✔
199
                                notFound:       !wasFound,
1✔
200
                        }
1✔
201
                        return
1✔
202
                }
203

204
                if waitChan == nil {
2✔
205
                        return
1✔
206
                }
1✔
207

208
                <-waitChan
1✔
209
        }
210
}
211

212
// needsDownload checks if the blob needs to be downloaded based on the state and the current time.
213
func (m *multiSourceDatastore) needsDownload(
214
        state multiSourceDatastoreBlobState,
215
        name *common.BlobName,
216
        now time.Time,
217
) bool {
1✔
218
        switch {
1✔
219
        case state.notFound:
1✔
220
                return now.After(state.lastUpdateTime.Add(m.notFoundRecheckTime))
1✔
221

222
        case name.Type() == blobtypes.Static:
1✔
223
                return false
1✔
224

225
        default:
1✔
226
                return now.After(state.lastUpdateTime.Add(m.dynamicDataRefreshTime))
1✔
227
        }
228
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc