• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vocdoni / vocdoni-node / 6863543637

14 Nov 2023 12:18PM UTC coverage: 61.774% (+0.1%) from 61.668%
6863543637

Pull #1137

github

altergui
ipfs: refactor removing very old clutter and abstractions

* data.Storage.Init now needs no args
* move DataMockTest into its own package datamock
* assert at compile time that datamock.DataMockTest and ipfs.Handler satisfy data.Storage
* move fsrepo.IsInitialized check into startNode
* remove ancient abstraction types.DataStore
* fixup "ipfs: add EnableLocalDiscovery option": move field EnableLocalDiscovery into ipfs.Handler
Pull Request #1137: feat/ipfs refactor

42 of 45 new or added lines in 7 files covered. (93.33%)

4 existing lines in 2 files now uncovered.

14625 of 23675 relevant lines covered (61.77%)

31034.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.18
/data/ipfs/ipfs.go
1
package ipfs
2

3
import (
4
        "bytes"
5
        "context"
6
        "fmt"
7
        "io"
8
        "os"
9
        "strings"
10
        "time"
11

12
        lru "github.com/hashicorp/golang-lru/v2"
13
        coreiface "github.com/ipfs/boxo/coreiface"
14
        "github.com/ipfs/boxo/files"
15
        ipfscid "github.com/ipfs/go-cid"
16
        keystore "github.com/ipfs/go-ipfs-keystore"
17
        ipfslog "github.com/ipfs/go-log/v2"
18

19
        "github.com/ipfs/boxo/coreiface/options"
20
        corepath "github.com/ipfs/boxo/coreiface/path"
21
        "github.com/ipfs/boxo/ipns"
22
        ipfscmds "github.com/ipfs/kubo/commands"
23
        ipfscore "github.com/ipfs/kubo/core"
24
        "github.com/ipfs/kubo/core/corehttp"
25
        "github.com/ipfs/kubo/core/corerepo"
26
        "github.com/ipfs/kubo/core/coreunix"
27
        ipfscrypto "github.com/libp2p/go-libp2p/core/crypto"
28
        ma "github.com/multiformats/go-multiaddr"
29
        manet "github.com/multiformats/go-multiaddr/net"
30
        "github.com/multiformats/go-multicodec"
31
        "github.com/multiformats/go-multihash"
32
        "go.vocdoni.io/dvote/data"
33
        "go.vocdoni.io/dvote/log"
34
)
35

36
var _ data.Storage = &Handler{}
37

38
const (
39
        // MaxFileSizeBytes is the maximum size of a file to be published to IPFS
40
        MaxFileSizeBytes = 1024 * 1024 * 100 // 100 MB
41
        // RetrievedFileCacheSize is the maximum number of files to be cached in memory
42
        RetrievedFileCacheSize = 128
43
)
44

45
// Handler is the IPFS data storage node handler.
46
type Handler struct {
47
        Node     *ipfscore.IpfsNode
48
        CoreAPI  coreiface.CoreAPI
49
        DataDir  string
50
        LogLevel string
51

52
        EnableLocalDiscovery bool
53
        // TODO: Replace DataDir, LogLevel and EnableLocalDiscovery with a config.IPFSCfg?
54

55
        retrieveCache *lru.Cache[string, []byte]
56

57
        // cancel helps us stop extra goroutines and listeners which complement
58
        // the IpfsNode above.
59
        cancel func()
60
        maddr  ma.Multiaddr
61
}
62

63
// New returns a Handler
64
func New() *Handler {
2✔
65
        return &Handler{}
2✔
66
}
2✔
67

68
// Init initializes the IPFS node handler and repository.
69
func (i *Handler) Init() error {
2✔
70
        if i.LogLevel == "" {
4✔
71
                i.LogLevel = "ERROR"
2✔
72
        }
2✔
73
        ipfslog.SetLogLevel("*", i.LogLevel)
2✔
74
        if err := installDatabasePlugins(); err != nil {
2✔
75
                return err
×
76
        }
×
77
        ConfigRoot = i.DataDir
2✔
78
        os.Setenv("IPFS_FD_MAX", "4096")
2✔
79

2✔
80
        node, coreAPI, err := i.startNode()
2✔
81
        if err != nil {
2✔
82
                return err
×
83
        }
×
84
        ctx, cancel := context.WithCancel(context.Background())
2✔
85
        i.cancel = cancel
2✔
86

2✔
87
        // Start garbage collector, with our cancellable context.
2✔
88
        go func() {
4✔
89
                if err := corerepo.PeriodicGC(ctx, node); err != nil {
2✔
90
                        log.Errorw(err, "error running ipfs garbage collector")
×
91
                }
×
92
        }()
93
        log.Infow("IPFS initialization",
2✔
94
                "peerID", node.Identity.String(),
2✔
95
                "addresses", node.PeerHost.Addrs(),
2✔
96
                "pubKey", node.PrivateKey.GetPublic(),
2✔
97
        )
2✔
98
        // start http
2✔
99
        cctx := cmdCtx(node, i.DataDir)
2✔
100
        cctx.ReqLog = &ipfscmds.ReqLog{}
2✔
101

2✔
102
        gatewayOpt := corehttp.GatewayOption(corehttp.WebUIPaths...)
2✔
103
        opts := []corehttp.ServeOption{
2✔
104
                corehttp.CommandsOption(cctx),
2✔
105
                corehttp.WebUIOption,
2✔
106
                gatewayOpt,
2✔
107
        }
2✔
108

2✔
109
        if i.maddr == nil {
4✔
110
                if err := i.SetMultiAddress("/ip4/0.0.0.0/tcp/5001"); err != nil {
2✔
111
                        return err
×
112
                }
×
113
        }
114

115
        list, err := manet.Listen(i.maddr)
2✔
116
        if err != nil {
2✔
117
                return err
×
118
        }
×
119
        go func() {
4✔
120
                <-ctx.Done()
2✔
121
                list.Close()
2✔
122
        }()
2✔
123
        // The address might have changed, if the port was 0; use list.Multiaddr
124
        // to fetch the final one.
125

126
        // Avoid corehttp.ListenAndServe, since it doesn't provide the final
127
        // address, and always prints to stdout.
128
        go corehttp.Serve(node, manet.NetListener(list), opts...)
2✔
129

2✔
130
        i.Node = node
2✔
131
        i.CoreAPI = coreAPI
2✔
132
        i.retrieveCache, err = lru.New[string, []byte](RetrievedFileCacheSize)
2✔
133
        if err != nil {
2✔
134
                return err
×
135
        }
×
136

137
        go i.updateStats(time.Minute)
2✔
138

2✔
139
        return nil
2✔
140
}
141

142
// Stop stops the IPFS node handler.
143
func (i *Handler) Stop() error {
×
144
        i.cancel()
×
145
        return i.Node.Close()
×
146
}
×
147

148
// SetMultiAddress sets the multiaddress of the IPFS node.
149
func (i *Handler) SetMultiAddress(addr string) (err error) {
2✔
150
        i.maddr, err = ma.NewMultiaddr(addr)
2✔
151
        return err
2✔
152
}
2✔
153

154
// URIprefix returns the URI prefix which identifies the protocol
155
func (*Handler) URIprefix() string {
228✔
156
        return "ipfs://"
228✔
157
}
228✔
158

159
// PublishReader publishes a reader buffer to ipfs and returns the resulting CID v1.
160
func (i *Handler) PublishReader(ctx context.Context, buf io.Reader) (cid string, err error) {
64✔
161
        adder, err := coreunix.NewAdder(ctx, i.Node.Pinning, i.Node.Blockstore, i.Node.DAG)
64✔
162
        if err != nil {
64✔
163
                return "", err
×
164
        }
×
165
        adder.Chunker = ChunkerTypeSize
64✔
166
        adder.CidBuilder = ipfscid.V1Builder{
64✔
167
                Codec:  uint64(multicodec.DagJson),
64✔
168
                MhType: uint64(multihash.SHA2_256),
64✔
169
        }
64✔
170
        msgFile := files.NewReaderFile(buf)
64✔
171
        format, err := adder.AddAllAndPin(ctx, msgFile)
64✔
172
        if err != nil {
64✔
173
                return "", err
×
174
        }
×
175
        size, err := msgFile.Size()
64✔
176
        if err != nil {
128✔
177
                size = 0
64✔
178
        }
64✔
179
        cid = format.Cid().String()
64✔
180
        log.Infow("published file", "protocol", "ipfs", "cid", cid, "size", size)
64✔
181
        return cid, nil
64✔
182
}
183

184
// Publish publishes a file or message to ipfs and returns the resulting CID v1.
185
func (i *Handler) Publish(ctx context.Context, msg []byte) (cid string, err error) {
64✔
186
        return i.PublishReader(ctx, bytes.NewBuffer(msg))
64✔
187
}
64✔
188

189
// Pin adds a file to ipfs and returns the resulting CID v1.
190
func (i *Handler) Pin(ctx context.Context, path string) error {
414✔
191
        path = strings.Replace(path, "ipfs://", "/ipfs/", 1)
414✔
192
        return i.CoreAPI.Pin().Add(ctx, corepath.New(path))
414✔
193
}
414✔
194

×
195
func (i *Handler) addAndPin(ctx context.Context, path string) (corepath.Resolved, error) {
×
196
        f, err := unixfsFilesNode(path)
414✔
197
        if err != nil {
198
                return nil, err
199
        }
×
200
        defer f.Close()
×
201

×
202
        rpath, err := i.CoreAPI.Unixfs().Add(ctx, f,
×
203
                options.Unixfs.CidVersion(1),
×
204
                options.Unixfs.Pin(true))
×
205
        if err != nil {
×
206
                return nil, err
×
207
        }
×
208

×
209
        return rpath, nil
×
210
}
×
211

×
212
// Unpin removes a file pin from ipfs.
213
func (i *Handler) Unpin(ctx context.Context, path string) error {
×
214
        path = strings.Replace(path, "ipfs://", "/ipfs/", 1)
215
        cpath := corepath.New(path)
216
        if err := cpath.IsValid(); err != nil {
217
                return fmt.Errorf("invalid path %s: %w", path, err)
×
218
        }
×
219
        log.Debugf("removing pin %s", cpath)
×
220
        return i.CoreAPI.Pin().Rm(ctx, cpath, options.Pin.RmRecursive(true))
×
221
}
×
222

×
223
// Stats returns stats about the IPFS node.
×
224
func (i *Handler) Stats() map[string]any {
×
225
        return map[string]any{"peers": stats.Peers.Get(), "addresses": stats.KnownAddrs.Get(), "pins": stats.Pins.Get()}
226
}
227

228
func (i *Handler) countPins(ctx context.Context) (int, error) {
2✔
229
        // Note that pins is a channel that gets closed when finished.
2✔
230
        // We MUST range over the entire channel to not leak goroutines.
2✔
231
        // Maybe there is a way to get the total number of pins without
232
        // iterating over them?
7✔
233
        pins, err := i.CoreAPI.Pin().Ls(ctx)
7✔
234
        if err != nil {
7✔
235
                return 0, err
7✔
236
        }
7✔
237
        count := 0
7✔
238
        for pin := range pins {
7✔
239
                if err := pin.Err(); err != nil {
×
240
                        return 0, err
×
241
                }
7✔
242
                count++
1,129✔
243
        }
1,122✔
244
        return count, nil
×
245
}
×
246

1,122✔
247
// ListPins returns a map of all pinned CIDs and their types
248
func (i *Handler) ListPins(ctx context.Context) (map[string]string, error) {
7✔
249
        // Note that pins is a channel that gets closed when finished.
250
        // We MUST range over the entire channel to not leak goroutines.
251
        pins, err := i.CoreAPI.Pin().Ls(ctx)
252
        if err != nil {
×
253
                return nil, err
×
254
        }
×
255
        pinMap := make(map[string]string)
×
256
        for pin := range pins {
×
257
                if err := pin.Err(); err != nil {
×
258
                        return nil, err
×
259
                }
×
260
                pinMap[pin.Path().String()] = pin.Type()
×
261
        }
×
262
        return pinMap, nil
×
263
}
×
264

×
265
// RetrieveDir gets an IPFS directory and returns a map of all files and their content.
266
// It only supports 1 level of directory depth, so subdirectories are ignored.
×
267
func (i *Handler) RetrieveDir(ctx context.Context, path string, maxSize int64) (map[string][]byte, error) {
268
        path = strings.Replace(path, "ipfs://", "/ipfs/", 1)
269

270
        // first resolve the path
271
        cpath, err := i.CoreAPI.ResolvePath(ctx, corepath.New(path))
1✔
272
        if err != nil {
1✔
273
                return nil, fmt.Errorf("could not resolve path %s", path)
1✔
274
        }
1✔
275
        // then get the file
1✔
276
        f, err := i.CoreAPI.Unixfs().Get(ctx, cpath)
1✔
277
        if err != nil {
×
278
                return nil, fmt.Errorf("could not retrieve unixfs file: %w", err)
×
279
        }
1✔
280

1✔
281
        dirMap := make(map[string][]byte)
×
282
        if dir := files.ToDir(f); dir != nil {
×
283
                if err := files.Walk(dir, func(path string, node files.Node) error {
284
                        if file := files.ToFile(node); file != nil {
1✔
285
                                content, err := fetchFileContent(file)
1✔
UNCOV
286
                                if err != nil {
×
UNCOV
287
                                        log.Warnw("could not retrieve file from directory", "path", path, "error", err)
×
288
                                        return nil
289
                                }
1✔
290
                                dirMap[path] = content
2✔
291
                        }
269✔
292
                        return nil
535✔
293
                }); err != nil {
267✔
294
                        return nil, err
267✔
295
                }
×
296
        }
×
297
        return dirMap, nil
×
298
}
267✔
299

300
// Retrieve gets an IPFS file (either from the p2p network or from the local cache).
268✔
301
// If maxSize is 0, it is set to the hardcoded maximum of MaxFileSizeBytes.
×
302
func (i *Handler) Retrieve(ctx context.Context, path string, maxSize int64) ([]byte, error) {
×
303
        path = strings.Replace(path, "ipfs://", "/ipfs/", 1)
×
304

305
        // check if we have the file in the local cache
1✔
306
        ccontent, _ := i.retrieveCache.Get(path)
307
        if ccontent != nil {
308
                log.Debugf("retrieved file %s from cache", path)
309
                return ccontent, nil
310
        }
278✔
311

278✔
312
        // first resolve the path
278✔
313
        cpath, err := i.CoreAPI.ResolvePath(ctx, corepath.New(path))
278✔
314
        if err != nil {
278✔
315
                return nil, fmt.Errorf("could not resolve path %s", path)
461✔
316
        }
183✔
317
        // then get the file
183✔
318
        f, err := i.CoreAPI.Unixfs().Get(ctx, cpath)
183✔
319
        if err != nil {
320
                return nil, fmt.Errorf("could not retrieve unixfs file: %w", err)
321
        }
95✔
322

112✔
323
        content, err := fetchFileContent(f)
17✔
324
        if err != nil {
17✔
325
                return nil, err
78✔
326
        }
78✔
327

×
328
        if len(content) == 0 {
×
329
                return nil, fmt.Errorf("retrieved file is empty")
330
        }
78✔
331

78✔
332
        if log.Level() >= log.LogLevelDebug {
×
333
                toLog := string(content)
×
334
                if len(toLog) > 1024 {
335
                        toLog = toLog[:1024] + "..."
78✔
336
                }
78✔
337
                log.Debugf("rawdata received: %s", toLog)
×
338
        }
×
339

340
        // Save file to cache for future attempts
78✔
341
        i.retrieveCache.Add(path, content)
×
342

×
343
        log.Infow("retrieved file", "path", path, "size", len(content))
344
        return content, nil
156✔
345
}
78✔
346

79✔
347
func fetchFileContent(node files.Node) ([]byte, error) {
1✔
348
        file := files.ToFile(node)
1✔
349
        if file == nil {
78✔
350
                return nil, fmt.Errorf("object is not a file")
351
        }
352
        defer file.Close()
353

78✔
354
        fsize, err := file.Size()
78✔
355
        if err != nil {
78✔
356
                return nil, err
78✔
357
        }
358

359
        if fsize > MaxFileSizeBytes {
345✔
360
                return nil, fmt.Errorf("file too big: %d", fsize)
345✔
361
        }
345✔
362
        return io.ReadAll(io.LimitReader(file, MaxFileSizeBytes))
×
363
}
×
364

345✔
365
// PublishIPNSpath creates or updates an IPNS record with the content of a
345✔
366
// filesystem path (a single file or a directory).
345✔
367
//
345✔
368
// The IPNS record is published under the scope of the private key identified
×
369
// by the keyalias parameter. New keys can be created using method AddKeyToKeystore
×
370
// and function NewIPFSkey() both available on this package.
371
//
345✔
372
// The execution of this method might take a while (some minutes),
×
373
// so the caller must handle properly the logic by using goroutines, channels or other
×
374
// mechanisms in order to not block the whole program execution.
345✔
375
func (i *Handler) PublishIPNSpath(ctx context.Context, path string, keyalias string) (ipns.Name, corepath.Resolved, error) {
376
        rpath, err := i.addAndPin(ctx, path)
377
        if err != nil {
378
                return ipns.Name{}, nil, err
379
        }
380
        if keyalias == "" {
381
                ck, err := i.CoreAPI.Key().Self(ctx)
382
                if err != nil {
383
                        return ipns.Name{}, nil, err
384
                }
385
                keyalias = ck.Name()
386
        }
387
        name, err := i.CoreAPI.Name().Publish(
×
388
                ctx,
×
389
                rpath,
×
390
                options.Name.TTL(time.Minute*10),
×
391
                options.Name.Key(keyalias),
×
392
        )
×
393
        if err != nil {
×
394
                return ipns.Name{}, nil, err
×
395
        }
×
396
        return name, rpath, nil
×
397
}
×
398

399
// AddKeyToKeystore adds a marshaled IPFS private key to the IPFS keystore.
×
400
// The key is identified by a unique alias name which can be used for referncing
×
401
// that key when using some other IPFS methods.
×
402
// Compatible Keys can be generated with NewIPFSkey() function.
×
403
func (i *Handler) AddKeyToKeystore(keyalias string, privkey []byte) error {
×
404
        pk, err := ipfscrypto.UnmarshalPrivateKey(privkey)
×
405
        if err != nil {
×
406
                return err
×
407
        }
×
408
        _ = i.Node.Repo.Keystore().Delete(keyalias)
×
409
        if err := i.Node.Repo.Keystore().Put(keyalias, pk); err != nil {
410
                if err != keystore.ErrKeyExists {
411
                        return err
412
                }
413
        }
414
        return nil
415
}
×
416

×
417
// NewIPFSkey generates a new IPFS private key (ECDSA/256bit) and returns its
×
418
// marshaled bytes representation.
×
419
func NewIPFSkey() []byte {
×
420
        // These functions must not return error since all input parameters
×
421
        // are predefined, so we panic if an error returned.
×
422
        privKey, _, err := ipfscrypto.GenerateKeyPair(ipfscrypto.ECDSA, 256)
×
423
        if err != nil {
×
424
                panic(err)
×
425
        }
426
        encPrivKey, err := ipfscrypto.MarshalPrivateKey(privKey)
×
427
        if err != nil {
428
                panic(err)
429
        }
430
        return encPrivKey
431
}
×
432

×
433
// unixfsFilesNode returns a go-ipfs files.Node given a unix path
×
434
func unixfsFilesNode(path string) (files.Node, error) {
×
435
        stat, err := os.Lstat(path)
×
436
        if err != nil {
×
437
                return nil, err
438
        }
×
439
        f, err := files.NewSerialFile(path, false, stat)
×
440
        if err != nil {
×
441
                return nil, err
442
        }
×
443
        return f, nil
444
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc