• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cinode / go / 6142967888

11 Sep 2023 06:54AM UTC coverage: 90.647% (-0.2%) from 90.798%
6142967888

push

github

byo
Add cinode analyzer command

This command spawns a simple web server that can be used
to analyze the content of cinode blobs that build the cinodefs
graph.

6 of 6 new or added lines in 1 file covered. (100.0%)

1793 of 1978 relevant lines covered (90.65%)

1.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.66
/pkg/datastore/multi_source.go
1
/*
2
Copyright © 2022 Bartłomiej Święcki (byo)
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package datastore
18

19
import (
20
        "context"
21
        "io"
22
        "sync"
23
        "time"
24

25
        "golang.org/x/exp/slog"
26

27
        "github.com/cinode/go/pkg/common"
28
)
29

30
type multiSourceDatastoreBlobState struct {
31
        lastUpdateTime           time.Time
32
        downloading              bool
33
        downloadingFinishedCChan chan struct{}
34
}
35

36
type multiSourceDatastore struct {
37
        // Main datastore
38
        main DS
39

40
        // Additional sources that will be queried whenever the main source
41
        // does not contain the data or contains outdated content
42
        additional []DS
43

44
        // Average time between dynamic content refreshes
45
        dynamicDataRefreshTime time.Duration
46

47
        // Last update time for blobs, either for dynamic content or last result of not found for
48
        // static ones
49
        blobStates map[string]multiSourceDatastoreBlobState
50

51
        // Guard additional sources and update time map
52
        m sync.Mutex
53

54
        // Logger output
55
        log *slog.Logger
56
}
57

58
func NewMultiSource(main DS, refreshTime time.Duration, additional ...DS) DS {
1✔
59
        return &multiSourceDatastore{
1✔
60
                main:                   main,
1✔
61
                additional:             additional,
1✔
62
                dynamicDataRefreshTime: refreshTime,
1✔
63
                blobStates:             map[string]multiSourceDatastoreBlobState{},
1✔
64
                log:                    slog.Default(),
1✔
65
        }
1✔
66
}
1✔
67

68
var _ DS = (*multiSourceDatastore)(nil)
69

70
func (m *multiSourceDatastore) Kind() string {
1✔
71
        return "MultiSource"
1✔
72
}
1✔
73

74
func (m *multiSourceDatastore) Address() string {
1✔
75
        return "multi-source://"
1✔
76
}
1✔
77

78
func (m *multiSourceDatastore) Open(ctx context.Context, name common.BlobName) (io.ReadCloser, error) {
1✔
79
        m.fetch(ctx, name)
1✔
80
        return m.main.Open(ctx, name)
1✔
81
}
1✔
82

83
func (m *multiSourceDatastore) Update(ctx context.Context, name common.BlobName, r io.Reader) error {
1✔
84
        return m.main.Update(ctx, name, r)
1✔
85
}
1✔
86

87
func (m *multiSourceDatastore) Exists(ctx context.Context, name common.BlobName) (bool, error) {
1✔
88
        m.fetch(ctx, name)
1✔
89
        return m.main.Exists(ctx, name)
1✔
90
}
1✔
91

92
func (m *multiSourceDatastore) Delete(ctx context.Context, name common.BlobName) error {
1✔
93
        return m.main.Delete(ctx, name)
1✔
94
}
1✔
95

96
func (m *multiSourceDatastore) fetch(ctx context.Context, name common.BlobName) {
1✔
97
        // TODO:
1✔
98
        // if not found locally, go over all additional sources and check if exists,
1✔
99
        // for dynamic content, perform merge operation if found in more than one,
1✔
100
        // initially in the background
1✔
101
        // also if dynamic content is queried and it was not updated in enough time,
1✔
102
        // do an update process
1✔
103

1✔
104
        for {
2✔
105
                waitChan, startDownload := func() (chan struct{}, bool) {
2✔
106
                        m.m.Lock()
1✔
107
                        defer m.m.Unlock()
1✔
108

1✔
109
                        needsDownload := false
1✔
110

1✔
111
                        state, found := m.blobStates[name.String()]
1✔
112

1✔
113
                        switch {
1✔
114
                        case !found:
1✔
115
                                // Seen for the first time
1✔
116
                                needsDownload = true
1✔
117

118
                        case state.downloading:
×
119
                                // Blob currently being downloaded
×
120
                                return state.downloadingFinishedCChan, false
×
121

122
                        case time.Since(state.lastUpdateTime) > m.dynamicDataRefreshTime:
1✔
123
                                // We should update the blob
1✔
124
                                needsDownload = true
1✔
125
                        }
126

127
                        if !needsDownload {
2✔
128
                                return nil, false
1✔
129
                        }
1✔
130

131
                        // State not found, request new download
132
                        ch := make(chan struct{})
1✔
133
                        m.blobStates[name.String()] = multiSourceDatastoreBlobState{
1✔
134
                                downloading:              true,
1✔
135
                                downloadingFinishedCChan: ch,
1✔
136
                        }
1✔
137
                        return ch, true
1✔
138
                }()
139

140
                if startDownload {
2✔
141
                        m.log.Info("Starting download",
1✔
142
                                "blob", name.String(),
1✔
143
                        )
1✔
144
                        wasUpdated := false
1✔
145
                        for i, ds := range m.additional {
2✔
146
                                r, err := ds.Open(ctx, name)
1✔
147
                                if err != nil {
2✔
148
                                        m.log.Debug("Failed to fetch blob from additional datastore",
1✔
149
                                                "blob", name.String(),
1✔
150
                                                "datastore", ds.Address(),
1✔
151
                                                "err", err,
1✔
152
                                        )
1✔
153
                                        continue
1✔
154
                                }
155

156
                                m.log.Info("Blob found in additional datastore",
1✔
157
                                        "blob", name.String(),
1✔
158
                                        "datastore-num", i+1,
1✔
159
                                )
1✔
160
                                err = m.main.Update(ctx, name, r)
1✔
161
                                r.Close()
1✔
162
                                if err != nil {
1✔
163
                                        m.log.Error("Failed to store blob in local datastore", err,
×
164
                                                "blob", name.String(),
×
165
                                        )
×
166
                                }
×
167
                                wasUpdated = true
1✔
168
                        }
169
                        if !wasUpdated {
2✔
170
                                m.log.Warn("Did not find blob in any datastore",
1✔
171
                                        "blob", name.String(),
1✔
172
                                )
1✔
173
                        }
1✔
174
                        defer close(waitChan)
1✔
175

1✔
176
                        m.m.Lock()
1✔
177
                        defer m.m.Unlock()
1✔
178

1✔
179
                        m.blobStates[name.String()] = multiSourceDatastoreBlobState{
1✔
180
                                lastUpdateTime: time.Now(),
1✔
181
                        }
1✔
182
                        return
1✔
183
                }
184

185
                if waitChan == nil {
2✔
186
                        return
1✔
187
                }
1✔
188

189
                <-waitChan
×
190
        }
191
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc