• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vocdoni / vocdoni-node / 6863543637

14 Nov 2023 12:18PM UTC coverage: 61.774% (+0.1%) from 61.668%
6863543637

Pull #1137

github

altergui
ipfs: refactor removing very old clutter and abstractions

* data.Storage.Init now needs no args
* move DataMockTest into its own package datamock
* assert at compile time that datamock.DataMockTest and ipfs.Handler satisfy data.Storage
* move fsrepo.IsInitialized check into startNode
* remove ancient abstraction types.DataStore
* fixup "ipfs: add EnableLocalDiscovery option": move field EnableLocalDiscovery into ipfs.Handler
Pull Request #1137: feat/ipfs refactor

42 of 45 new or added lines in 7 files covered. (93.33%)

4 existing lines in 2 files now uncovered.

14625 of 23675 relevant lines covered (61.77%)

31034.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.7
/vochain/indexer/archive.go
1
package indexer
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "fmt"
7
        "time"
8

9
        "go.vocdoni.io/dvote/data"
10
        "go.vocdoni.io/dvote/log"
11
        "go.vocdoni.io/dvote/types"
12
        indexerdb "go.vocdoni.io/dvote/vochain/indexer/db"
13
        "go.vocdoni.io/dvote/vochain/indexer/indexertypes"
14
        "go.vocdoni.io/dvote/vochain/results"
15
        "go.vocdoni.io/proto/build/go/models"
16
)
17

18
const (
19
        marxArchiveFileSize     = 1024 * 100 // 100KB
20
        timeoutArchiveRetrieval = 120 * time.Second
21
        archiveFetchInterval    = 60 * time.Minute
22
        archiveFileNameSize     = types.ProcessIDsize * 2 // 64 hex chars
23
)
24

25
// ArchiveProcess is the struct used to store the process data in the archive.
26
type ArchiveProcess struct {
27
        ChainID     string                `json:"chainId,omitempty"`
28
        ProcessInfo *indexertypes.Process `json:"process"`
29
        Results     *results.Results      `json:"results"`
30
        StartDate   *time.Time            `json:"startDate,omitempty"`
31
        EndDate     *time.Time            `json:"endDate,omitempty"`
32
}
33

34
// ImportArchive imports an archive list of processes into the indexer database.
35
// It checks if the process already exists in the database and if not, it creates it.
36
// Returns those processes that have been added to the database.
37
func (idx *Indexer) ImportArchive(archive []*ArchiveProcess) ([]*ArchiveProcess, error) {
2✔
38
        tx, err := idx.readWriteDB.Begin()
2✔
39
        if err != nil {
2✔
40
                return nil, err
×
41
        }
×
42
        defer tx.Rollback()
2✔
43
        queries := indexerdb.New(tx)
2✔
44
        added := []*ArchiveProcess{}
2✔
45
        for _, p := range archive {
269✔
46
                if idx.App.ChainID() == p.ChainID {
267✔
47
                        log.Debugw("skipping import of archive process from current chain", "chainID", p.ChainID, "processID", p.ProcessInfo.ID.String())
×
48
                        continue
×
49
                }
50
                if p.ProcessInfo == nil {
267✔
51
                        log.Debugw("skipping import of archive process with nil process info")
×
52
                        continue
×
53
                }
54

55
                // Check if election already exists
56
                if _, err := idx.ProcessInfo(p.ProcessInfo.ID); err != nil {
534✔
57
                        if err != ErrProcessNotFound {
267✔
58
                                return nil, fmt.Errorf("process info: %w", err)
×
59
                        }
×
60
                } else {
×
61
                        continue
×
62
                }
63

64
                // For backward compatibility, we try to fetch the start/end date from multiple sources.
65
                // If not found, we calculate them from the block count and the default block time.
66
                startDate := p.ProcessInfo.StartDate
267✔
67
                if startDate.IsZero() {
534✔
68
                        if p.StartDate != nil {
534✔
69
                                startDate = *p.StartDate
267✔
70
                        } else {
267✔
71
                                // Calculate startDate equal to time.Now() minus defaultBlockTime*p.ProcessInfo.BlockCount
×
72
                                startDate = time.Now().Add(-types.DefaultBlockTime * time.Duration(p.ProcessInfo.BlockCount))
×
73
                        }
×
74
                }
75
                endDate := p.ProcessInfo.EndDate
267✔
76
                if endDate.IsZero() {
534✔
77
                        // Calculate endDate equal to startDate plus defaultBlockTime*p.ProcessInfo.BlockCount
267✔
78
                        endDate = startDate.Add(types.DefaultBlockTime * time.Duration(p.ProcessInfo.BlockCount))
267✔
79
                }
267✔
80

81
                // Create and store process in the indexer database
82
                procParams := indexerdb.CreateProcessParams{
267✔
83
                        ID:                nonNullBytes(p.ProcessInfo.ID),
267✔
84
                        EntityID:          nonNullBytes(p.ProcessInfo.EntityID),
267✔
85
                        StartBlock:        int64(p.ProcessInfo.StartBlock),
267✔
86
                        StartDate:         startDate,
267✔
87
                        EndBlock:          int64(p.ProcessInfo.EndBlock),
267✔
88
                        EndDate:           endDate,
267✔
89
                        BlockCount:        int64(p.ProcessInfo.BlockCount),
267✔
90
                        HaveResults:       p.ProcessInfo.HaveResults,
267✔
91
                        FinalResults:      p.ProcessInfo.FinalResults,
267✔
92
                        CensusRoot:        nonNullBytes(p.ProcessInfo.CensusRoot),
267✔
93
                        MaxCensusSize:     int64(p.ProcessInfo.MaxCensusSize),
267✔
94
                        CensusUri:         p.ProcessInfo.CensusURI,
267✔
95
                        CensusOrigin:      int64(p.ProcessInfo.CensusOrigin),
267✔
96
                        Status:            int64(p.ProcessInfo.Status),
267✔
97
                        Namespace:         int64(p.ProcessInfo.Namespace),
267✔
98
                        Envelope:          indexertypes.EncodeProtoJSON(p.ProcessInfo.Envelope),
267✔
99
                        Mode:              indexertypes.EncodeProtoJSON(p.ProcessInfo.Mode),
267✔
100
                        VoteOpts:          indexertypes.EncodeProtoJSON(p.ProcessInfo.VoteOpts),
267✔
101
                        PrivateKeys:       indexertypes.EncodeJSON(p.ProcessInfo.PrivateKeys),
267✔
102
                        PublicKeys:        indexertypes.EncodeJSON(p.ProcessInfo.PublicKeys),
267✔
103
                        CreationTime:      time.Now(),
267✔
104
                        SourceBlockHeight: int64(p.ProcessInfo.SourceBlockHeight),
267✔
105
                        SourceNetworkID:   int64(models.SourceNetworkId_value[p.ProcessInfo.SourceNetworkId]),
267✔
106
                        Metadata:          p.ProcessInfo.Metadata,
267✔
107
                        ResultsVotes:      indexertypes.EncodeJSON(p.Results.Votes),
267✔
108
                        VoteCount:         int64(p.ProcessInfo.VoteCount),
267✔
109
                        ChainID:           p.ChainID,
267✔
110
                        FromArchive:       true,
267✔
111
                }
267✔
112

267✔
113
                if _, err := queries.CreateProcess(context.TODO(), procParams); err != nil {
267✔
114
                        return nil, fmt.Errorf("create archive process: %w", err)
×
115
                }
×
116
                added = append(added, p)
267✔
117
        }
118
        return added, tx.Commit()
2✔
119
}
120

121
// StartArchiveRetrieval starts the archive retrieval process. It is a blocking function that runs continuously.
122
// Retrieves the archive directory from the storage and imports the processes into the indexer database.
123
func (idx *Indexer) StartArchiveRetrieval(storage data.Storage, archiveURL string) {
1✔
124
        for {
2✔
125
                ctx, cancel := context.WithTimeout(context.Background(), timeoutArchiveRetrieval)
1✔
126
                dirMap, err := storage.RetrieveDir(ctx, archiveURL, marxArchiveFileSize)
1✔
127
                cancel()
1✔
128
                if err != nil {
1✔
UNCOV
129
                        log.Warnw("cannot retrieve archive directory", "url", archiveURL, "err", err)
×
UNCOV
130
                        continue
×
131
                }
132
                archive := []*ArchiveProcess{}
1✔
133
                for name, data := range dirMap {
268✔
134
                        if len(data) == 0 {
267✔
135
                                continue
×
136
                        }
137
                        if len(name) != archiveFileNameSize {
268✔
138
                                continue
1✔
139
                        }
140
                        var p ArchiveProcess
266✔
141
                        if err := json.Unmarshal(data, &p); err != nil {
266✔
142
                                log.Warnw("cannot unmarshal archive process", "name", name, "err", err)
×
143
                                continue
×
144
                        }
145
                        archive = append(archive, &p)
266✔
146
                }
147

148
                log.Debugw("archive processes unmarshaled", "processes", len(archive))
1✔
149
                added, err := idx.ImportArchive(archive)
1✔
150
                if err != nil {
1✔
151
                        log.Warnw("cannot import archive", "err", err)
×
152
                }
×
153
                if len(added) > 0 {
2✔
154
                        log.Infow("new archive imported", "count", len(added))
1✔
155
                        for _, p := range added {
267✔
156
                                ctx, cancel := context.WithTimeout(context.Background(), timeoutArchiveRetrieval)
266✔
157
                                if err := storage.Pin(ctx, p.ProcessInfo.Metadata); err != nil {
266✔
158
                                        log.Warnw("cannot pin metadata", "err", err.Error())
×
159
                                }
×
160
                                cancel()
266✔
161
                        }
162
                }
163
                time.Sleep(archiveFetchInterval)
1✔
164
        }
165
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc