• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cinode / go / 4218690867

19 Feb 2023 11:31PM UTC coverage: 89.663%. Remained the same
4218690867

push

github

Bartłomiej Święcki
utils/httpserver: Fix tests due to lack of signals in windows

1570 of 1751 relevant lines covered (89.66%)

1.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.06
/pkg/datastore/multi_source.go
1
/*
2
Copyright © 2022 Bartłomiej Święcki (byo)
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package datastore
18

19
import (
20
        "context"
21
        "io"
22
        "sync"
23
        "time"
24

25
        "github.com/cinode/go/pkg/common"
26
)
27

28
type multiSourceDatastoreBlobState struct {
29
        lastUpdateTime           time.Time
30
        downloading              bool
31
        downloadingFinishedCChan chan struct{}
32
}
33

34
type multiSourceDatastore struct {
35
        // Main datastore
36
        main DS
37

38
        // Additional sources that will be queried whenever the main source
39
        // does not contain the data or contains outdated content
40
        additional []DS
41

42
        // Average time between dynamic content refreshes
43
        dynamicDataRefreshTime time.Duration
44

45
        // Last update time for blobs, either for dynamic content or last result of not found for
46
        // static ones
47
        blobStates map[string]multiSourceDatastoreBlobState
48

49
        // Guard additional sources and update time map
50
        m sync.Mutex
51
}
52

53
func NewMultiSource(main DS, refreshTime time.Duration, additional ...DS) DS {
1✔
54
        return &multiSourceDatastore{
1✔
55
                main:                   main,
1✔
56
                additional:             additional,
1✔
57
                dynamicDataRefreshTime: refreshTime,
1✔
58
                blobStates:             map[string]multiSourceDatastoreBlobState{},
1✔
59
        }
1✔
60
}
1✔
61

62
var _ DS = (*multiSourceDatastore)(nil)
63

64
func (m *multiSourceDatastore) Kind() string {
1✔
65
        return "MultiSource"
1✔
66
}
1✔
67

68
func (m *multiSourceDatastore) Open(ctx context.Context, name common.BlobName) (io.ReadCloser, error) {
1✔
69
        m.fetch(ctx, name)
1✔
70
        return m.main.Open(ctx, name)
1✔
71
}
1✔
72

73
func (m *multiSourceDatastore) Update(ctx context.Context, name common.BlobName, r io.Reader) error {
1✔
74
        return m.main.Update(ctx, name, r)
1✔
75
}
1✔
76

77
func (m *multiSourceDatastore) Exists(ctx context.Context, name common.BlobName) (bool, error) {
1✔
78
        m.fetch(ctx, name)
1✔
79
        return m.main.Exists(ctx, name)
1✔
80
}
1✔
81

82
func (m *multiSourceDatastore) Delete(ctx context.Context, name common.BlobName) error {
1✔
83
        return m.main.Delete(ctx, name)
1✔
84
}
1✔
85

86
func (m *multiSourceDatastore) fetch(ctx context.Context, name common.BlobName) {
1✔
87
        // TODO:
1✔
88
        // if not found locally, go over all additional sources and check if exists,
1✔
89
        // for dynamic content, perform merge operation if found in more than one,
1✔
90
        // initially in the background
1✔
91
        // also if dynamic content is queried and it was not updated in enough time,
1✔
92
        // do an update process
1✔
93

1✔
94
        for {
2✔
95
                waitChan, startDownload := func() (chan struct{}, bool) {
2✔
96
                        m.m.Lock()
1✔
97
                        defer m.m.Unlock()
1✔
98

1✔
99
                        needsDownload := false
1✔
100

1✔
101
                        state, found := m.blobStates[name.String()]
1✔
102

1✔
103
                        switch {
1✔
104
                        case !found:
1✔
105
                                // Seen for the first time
1✔
106
                                needsDownload = true
1✔
107

108
                        case state.downloading:
×
109
                                // Blob currently being downloaded
×
110
                                return state.downloadingFinishedCChan, false
×
111

112
                        case time.Since(state.lastUpdateTime) > m.dynamicDataRefreshTime:
1✔
113
                                // We should update the blob
1✔
114
                                needsDownload = true
1✔
115
                        }
116

117
                        if !needsDownload {
2✔
118
                                return nil, false
1✔
119
                        }
1✔
120

121
                        // State not found, request new download
122
                        ch := make(chan struct{})
1✔
123
                        m.blobStates[name.String()] = multiSourceDatastoreBlobState{
1✔
124
                                downloading:              true,
1✔
125
                                downloadingFinishedCChan: ch,
1✔
126
                        }
1✔
127
                        return ch, true
1✔
128
                }()
129

130
                if startDownload {
2✔
131
                        for _, ds := range m.additional {
2✔
132
                                r, err := ds.Open(ctx, name)
1✔
133
                                if err == nil {
2✔
134
                                        m.main.Update(ctx, name, r)
1✔
135
                                        r.Close()
1✔
136
                                }
1✔
137
                        }
138
                        defer close(waitChan)
1✔
139

1✔
140
                        m.m.Lock()
1✔
141
                        defer m.m.Unlock()
1✔
142

1✔
143
                        m.blobStates[name.String()] = multiSourceDatastoreBlobState{
1✔
144
                                lastUpdateTime: time.Now(),
1✔
145
                        }
1✔
146
                        return
1✔
147
                }
148

149
                if waitChan == nil {
2✔
150
                        return
1✔
151
                }
1✔
152

153
                <-waitChan
×
154
        }
155
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc