• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deadc0de6 / catcli / 5506286099

pending completion
5506286099

push

github

web-flow
Merge pull request #37 from deadc0de6/fix-36

size attr fix for #36

72 of 72 new or added lines in 4 files covered. (100.0%)

1329 of 1751 relevant lines covered (75.9%)

3.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.5
/catcli/noder.py
1
"""
2
author: deadc0de6 (https://github.com/deadc0de6)
3
Copyright (c) 2017, deadc0de6
4

5
Class that process nodes in the catalog tree
6
"""
7

8
import os
5✔
9
import shutil
5✔
10
import time
5✔
11
from typing import List, Union, Tuple, Any, Optional, Dict, cast
5✔
12
import anytree  # type: ignore
5✔
13

14
# local imports
15
from catcli import nodes
5✔
16
from catcli.nodes import NodeAny, NodeStorage, \
5✔
17
    NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta
18
from catcli.utils import size_to_str, epoch_to_str, md5sum, fix_badchars
5✔
19
from catcli.logger import Logger
5✔
20
from catcli.nodeprinter import NodePrinter
5✔
21
from catcli.decomp import Decomp
5✔
22
from catcli.version import __version__ as VERSION
5✔
23
from catcli.exceptions import CatcliException
5✔
24

25

26
class Noder:
5✔
27
    """
28
    handles node in the catalog tree
29
    There are 4 types of node:
30
    * "top" node representing the top node (generic node)
31
    * "storage" node representing a storage
32
    * "dir" node representing a directory
33
    * "file" node representing a file
34
    """
35

36
    CSV_HEADER = ('name,type,path,size,indexed_at,'
5✔
37
                  'maccess,md5,nbfiles,free_space,'
38
                  'total_space,meta')
39

40
    def __init__(self, debug: bool = False,
5✔
41
                 sortsize: bool = False,
42
                 arc: bool = False) -> None:
43
        """
44
        @debug: debug mode
45
        @sortsize: sort nodes by size
46
        @arch: handle archive
47
        """
48
        self.hash = True
5✔
49
        self.debug = debug
5✔
50
        self.sortsize = sortsize
5✔
51
        self.arc = arc
5✔
52
        if self.arc:
5✔
53
            self.decomp = Decomp()
×
54

55
    @staticmethod
5✔
56
    def get_storage_names(top: NodeTop) -> List[str]:
5✔
57
        """return a list of all storage names"""
58
        return [x.name for x in list(top.children)]
5✔
59

60
    def get_storage_node(self, top: NodeTop,
5✔
61
                         name: str,
62
                         newpath: str = '') -> NodeStorage:
63
        """
64
        return the storage node if any
65
        if newpath is submitted, it will update the media info
66
        """
67
        found = None
5✔
68
        for node in top.children:
5✔
69
            if node.type != nodes.TYPE_STORAGE:
5✔
70
                continue
×
71
            if node.name == name:
5✔
72
                found = node
5✔
73
                break
5✔
74
        if found and newpath and os.path.exists(newpath):
5✔
75
            found.free = shutil.disk_usage(newpath).free
5✔
76
            found.total = shutil.disk_usage(newpath).total
5✔
77
            found.ts = int(time.time())
5✔
78
        return cast(NodeStorage, found)
5✔
79

80
    @staticmethod
5✔
81
    def get_node(top: NodeTop,
5✔
82
                 path: str,
83
                 quiet: bool = False) -> Optional[NodeAny]:
84
        """get the node by internal tree path"""
85
        resolv = anytree.resolver.Resolver('name')
5✔
86
        try:
5✔
87
            bpath = os.path.basename(path)
5✔
88
            the_node = resolv.get(top, bpath)
5✔
89
            return cast(NodeAny, the_node)
5✔
90
        except anytree.resolver.ChildResolverError:
5✔
91
            if not quiet:
5✔
92
                Logger.err(f'No node at path \"{bpath}\"')
×
93
            return None
5✔
94

95
    def get_node_if_changed(self,
5✔
96
                            top: NodeTop,
97
                            path: str,
98
                            treepath: str) -> Tuple[Optional[NodeAny], bool]:
99
        """
100
        return the node (if any) and if it has changed
101
        @top: top node (storage)
102
        @path: abs path to file
103
        @treepath: rel path from indexed directory
104
        """
105
        treepath = treepath.lstrip(os.sep)
5✔
106
        node = self.get_node(top, treepath, quiet=True)
5✔
107
        # node does not exist
108
        if not node:
5✔
109
            self._debug('\tchange: node does not exist')
5✔
110
            return None, True
5✔
111
        if os.path.isdir(path):
5✔
112
            return node, False
5✔
113
        # force re-indexing if no maccess
114
        maccess = os.path.getmtime(path)
5✔
115
        if not self._has_attr(node, 'maccess') or \
5✔
116
                not node.maccess:
117
            self._debug('\tchange: no maccess found')
×
118
            return node, True
×
119
        # maccess changed
120
        old_maccess = node.maccess
5✔
121
        if float(maccess) != float(old_maccess):
5✔
122
            self._debug(f'\tchange: maccess changed for \"{path}\"')
5✔
123
            return node, True
5✔
124
        # test hash
125
        if self.hash and node.md5:
5✔
126
            md5 = self._get_hash(path)
5✔
127
            if md5 and md5 != node.md5:
5✔
128
                msg = f'\tchange: checksum changed for \"{path}\"'
5✔
129
                self._debug(msg)
5✔
130
                return node, True
5✔
131
        self._debug(f'\tchange: no change for \"{path}\"')
5✔
132
        return node, False
5✔
133

134
    def rec_size(self, node: Union[NodeDir, NodeStorage],
5✔
135
                 store: bool = True) -> int:
136
        """
137
        recursively traverse tree and return size
138
        @store: store the size in the node
139
        """
140
        if node.type == nodes.TYPE_FILE:
5✔
141
            node.__class__ = NodeFile
5✔
142
            msg = f'size of {node.type} \"{node.name}\": {node.nodesize}'
5✔
143
            self._debug(msg)
5✔
144
            return node.nodesize
5✔
145
        msg = f'getting node size recursively for \"{node.name}\"'
5✔
146
        self._debug(msg)
5✔
147
        fullsize: int = 0
5✔
148
        for i in node.children:
5✔
149
            if node.type == nodes.TYPE_DIR:
5✔
150
                sub_size = self.rec_size(i, store=store)
5✔
151
                if store:
5✔
152
                    i.nodesize = sub_size
5✔
153
                fullsize += sub_size
5✔
154
                continue
5✔
155
            if node.type == nodes.TYPE_STORAGE:
5✔
156
                sub_size = self.rec_size(i, store=store)
5✔
157
                if store:
5✔
158
                    i.nodesize = sub_size
5✔
159
                fullsize += sub_size
5✔
160
                continue
5✔
161
            self._debug(f'skipping {node.name}')
×
162
        if store:
5✔
163
            node.nodesize = fullsize
5✔
164
        self._debug(f'size of {node.type} \"{node.name}\": {fullsize}')
5✔
165
        return fullsize
5✔
166

167
    ###############################################################
168
    # public helpers
169
    ###############################################################
170
    @staticmethod
5✔
171
    def attrs_to_string(attr: Union[List[str], Dict[str, str], str]) -> str:
5✔
172
        """format the storage attr for saving"""
173
        if not attr:
5✔
174
            return ''
5✔
175
        if isinstance(attr, list):
5✔
176
            return ', '.join(attr)
5✔
177
        if isinstance(attr, dict):
×
178
            ret = []
×
179
            for key, val in attr.items():
×
180
                ret.append(f'{key}={val}')
×
181
            return ', '.join(ret)
×
182
        attr = attr.rstrip()
×
183
        return attr
×
184

185
    def do_hashing(self, val: bool) -> None:
5✔
186
        """hash files when indexing"""
187
        self.hash = val
5✔
188

189
    ###############################################################
190
    # node creation
191
    ###############################################################
192
    def new_top_node(self) -> NodeTop:
5✔
193
        """create a new top node"""
194
        top = NodeTop(nodes.NAME_TOP)
5✔
195
        self._debug(f'new top node: {top}')
5✔
196
        return top
5✔
197

198
    def new_file_node(self, name: str, path: str,
5✔
199
                      parent: NodeAny, storagepath: str) -> Optional[NodeFile]:
200
        """create a new node representing a file"""
201
        if not os.path.exists(path):
5✔
202
            Logger.err(f'File \"{path}\" does not exist')
×
203
            return None
×
204
        path = os.path.abspath(path)
5✔
205
        try:
5✔
206
            stat = os.lstat(path)
5✔
207
        except OSError as exc:
×
208
            Logger.err(f'OSError: {exc}')
×
209
            return None
×
210
        md5 = ''
5✔
211
        if self.hash:
5✔
212
            md5 = self._get_hash(path)
5✔
213
        relpath = os.sep.join([storagepath, name])
5✔
214

215
        maccess = os.path.getmtime(path)
5✔
216
        node = NodeFile(name,
5✔
217
                        relpath,
218
                        stat.st_size,
219
                        md5,
220
                        maccess,
221
                        parent=parent)
222
        if self.arc:
5✔
223
            ext = os.path.splitext(path)[1][1:]
×
224
            if ext.lower() in self.decomp.get_formats():
×
225
                self._debug(f'{path} is an archive')
×
226
                names = self.decomp.get_names(path)
×
227
                self.list_to_tree(node, names)
×
228
            else:
229
                self._debug(f'{path} is NOT an archive')
×
230
        return node
5✔
231

232
    def new_dir_node(self, name: str, path: str,
5✔
233
                     parent: NodeAny, storagepath: str) -> NodeDir:
234
        """create a new node representing a directory"""
235
        path = os.path.abspath(path)
5✔
236
        relpath = os.sep.join([storagepath, name])
5✔
237
        maccess = os.path.getmtime(path)
5✔
238
        return NodeDir(name,
5✔
239
                       relpath,
240
                       0,
241
                       maccess,
242
                       parent=parent)
243

244
    def new_storage_node(self, name: str,
5✔
245
                         path: str,
246
                         parent: str,
247
                         attrs: Dict[str, Any]) \
248
            -> NodeStorage:
249
        """create a new node representing a storage"""
250
        path = os.path.abspath(path)
5✔
251
        free = shutil.disk_usage(path).free
5✔
252
        total = shutil.disk_usage(path).total
5✔
253
        epoch = int(time.time())
5✔
254
        return NodeStorage(name,
5✔
255
                           free,
256
                           total,
257
                           0,
258
                           epoch,
259
                           self.attrs_to_string(attrs),
260
                           parent=parent)
261

262
    def new_archive_node(self, name: str, path: str,
5✔
263
                         parent: str, archive: str) -> NodeArchived:
264
        """create a new node for archive data"""
265
        return NodeArchived(name=name, relpath=path,
×
266
                            parent=parent, nodesize=0, md5='',
267
                            archive=archive)
268

269
    ###############################################################
270
    # node management
271
    ###############################################################
272
    def update_metanode(self, top: NodeTop) -> NodeMeta:
5✔
273
        """create or update meta node information"""
274
        meta = self._get_meta_node(top)
5✔
275
        epoch = int(time.time())
5✔
276
        if not meta:
5✔
277
            attrs: Dict[str, Any] = {}
5✔
278
            attrs['created'] = epoch
5✔
279
            attrs['created_version'] = VERSION
5✔
280
            meta = NodeMeta(name=nodes.NAME_META,
5✔
281
                            attr=attrs)
282
        meta.attr['access'] = epoch
5✔
283
        meta.attr['access_version'] = VERSION
5✔
284
        return meta
5✔
285

286
    def _get_meta_node(self, top: NodeTop) -> Optional[NodeMeta]:
5✔
287
        """return the meta node if any"""
288
        try:
5✔
289
            found = next(filter(lambda x: x.type == nodes.TYPE_META,
5✔
290
                         top.children))
291
            return cast(NodeMeta, found)
5✔
292
        except StopIteration:
5✔
293
            return None
5✔
294

295
    def clean_not_flagged(self, top: NodeTop) -> int:
5✔
296
        """remove any node not flagged and clean flags"""
297
        cnt = 0
5✔
298
        for node in anytree.PreOrderIter(top):
5✔
299
            if node.type not in [nodes.TYPE_DIR, nodes.TYPE_FILE]:
5✔
300
                continue
5✔
301
            if self._clean(node):
5✔
302
                cnt += 1
5✔
303
        return cnt
5✔
304

305
    def _clean(self, node: NodeAny) -> bool:
5✔
306
        """remove node if not flagged"""
307
        if not node.flagged():
5✔
308
            node.parent = None
5✔
309
            return True
5✔
310
        node.unflag()
5✔
311
        return False
5✔
312

313
    ###############################################################
314
    # printing
315
    ###############################################################
316
    def _node_to_csv(self, node: NodeAny,
5✔
317
                     sep: str = ',',
318
                     raw: bool = False) -> None:
319
        """
320
        print a node to csv
321
        @node: the node to consider
322
        @sep: CSV separator character
323
        @raw: print raw size rather than human readable
324
        """
325
        if not node:
5✔
326
            return
×
327
        if node.type == nodes.TYPE_TOP:
5✔
328
            return
5✔
329

330
        out = []
5✔
331
        if node.type == nodes.TYPE_STORAGE:
5✔
332
            # handle storage
333
            out.append(node.name)   # name
5✔
334
            out.append(node.type)   # type
5✔
335
            out.append('')          # fake full path
5✔
336
            size = self.rec_size(node, store=False)
5✔
337
            out.append(size_to_str(size, raw=raw))  # size
5✔
338
            out.append(epoch_to_str(node.ts))  # indexed_at
5✔
339
            out.append('')  # fake maccess
5✔
340
            out.append('')  # fake md5
5✔
341
            out.append(str(len(node.children)))  # nbfiles
5✔
342
            # fake free_space
343
            out.append(size_to_str(node.free, raw=raw))
5✔
344
            # fake total_space
345
            out.append(size_to_str(node.total, raw=raw))
5✔
346
            out.append(node.attr)  # meta
5✔
347
        else:
348
            # handle other nodes
349
            out.append(node.name.replace('"', '""'))  # name
5✔
350
            out.append(node.type)  # type
5✔
351
            parents = self._get_parents(node)
5✔
352
            storage = self._get_storage(node)
5✔
353
            fullpath = os.path.join(storage.name, parents)
5✔
354
            out.append(fullpath.replace('"', '""'))  # full path
5✔
355

356
            out.append(size_to_str(node.nodesize, raw=raw))  # size
5✔
357
            out.append(epoch_to_str(storage.ts))  # indexed_at
5✔
358
            if self._has_attr(node, 'maccess'):
5✔
359
                out.append(epoch_to_str(node.maccess))  # maccess
5✔
360
            else:
361
                out.append('')  # fake maccess
×
362
            if self._has_attr(node, 'md5'):
5✔
363
                out.append(node.md5)  # md5
5✔
364
            else:
365
                out.append('')  # fake md5
5✔
366
            if node.type == nodes.TYPE_DIR:
5✔
367
                out.append(str(len(node.children)))  # nbfiles
5✔
368
            else:
369
                out.append('')  # fake nbfiles
5✔
370
            out.append('')  # fake free_space
5✔
371
            out.append('')  # fake total_space
5✔
372
            out.append('')  # fake meta
5✔
373

374
        line = sep.join(['"' + o + '"' for o in out])
5✔
375
        if len(line) > 0:
5✔
376
            Logger.stdout_nocolor(line)
5✔
377

378
    def _print_node_native(self, node: NodeAny,
5✔
379
                           pre: str = '',
380
                           withpath: bool = False,
381
                           withdepth: bool = False,
382
                           withstorage: bool = False,
383
                           recalcparent: bool = False,
384
                           raw: bool = False) -> None:
385
        """
386
        print a node
387
        @node: the node to print
388
        @pre: string to print before node
389
        @withpath: print the node path
390
        @withdepth: print the node depth info
391
        @withstorage: print the node storage it belongs to
392
        @recalcparent: get relpath from tree instead of relpath field
393
        @raw: print raw size rather than human readable
394
        """
395
        if node.type == nodes.TYPE_TOP:
5✔
396
            # top node
397
            node.__class__ = NodeTop
5✔
398
            Logger.stdout_nocolor(f'{pre}{node.name}')
5✔
399
        elif node.type == nodes.TYPE_FILE:
5✔
400
            # node of type file
401
            node.__class__ = NodeFile
5✔
402
            name = node.name
5✔
403
            if withpath:
5✔
404
                if recalcparent:
5✔
405
                    name = os.sep.join([self._get_parents(node.parent), name])
×
406
                else:
407
                    name = node.relpath
5✔
408
            name = name.lstrip(os.sep)
5✔
409
            if withstorage:
5✔
410
                storage = self._get_storage(node)
5✔
411
            attr_str = ''
5✔
412
            if node.md5:
5✔
413
                attr_str = f', md5:{node.md5}'
5✔
414
            size = size_to_str(node.nodesize, raw=raw)
5✔
415
            compl = f'size:{size}{attr_str}'
5✔
416
            if withstorage:
5✔
417
                content = Logger.get_bold_text(storage.name)
5✔
418
                compl += f', storage:{content}'
5✔
419
            NodePrinter.print_file_native(pre, name, compl)
5✔
420
        elif node.type == nodes.TYPE_DIR:
5✔
421
            # node of type directory
422
            node.__class__ = NodeDir
5✔
423
            name = node.name
5✔
424
            if withpath:
5✔
425
                if recalcparent:
×
426
                    name = os.sep.join([self._get_parents(node.parent), name])
×
427
                else:
428
                    name = node.relpath
×
429
            name = name.lstrip(os.sep)
5✔
430
            depth = 0
5✔
431
            if withdepth:
5✔
432
                depth = len(node.children)
5✔
433
            if withstorage:
5✔
434
                storage = self._get_storage(node)
×
435
            attr: List[Tuple[str, str]] = []
5✔
436
            if node.nodesize:
5✔
437
                attr.append(('totsize', size_to_str(node.nodesize, raw=raw)))
5✔
438
            if withstorage:
5✔
439
                attr.append(('storage', Logger.get_bold_text(storage.name)))
×
440
            NodePrinter.print_dir_native(pre, name, depth=depth, attr=attr)
5✔
441
        elif node.type == nodes.TYPE_STORAGE:
5✔
442
            # node of type storage
443
            node.__class__ = NodeStorage
5✔
444
            sztotal = size_to_str(node.total, raw=raw)
5✔
445
            szused = size_to_str(node.total - node.free, raw=raw)
5✔
446
            nbchildren = len(node.children)
5✔
447
            pcent = 0
5✔
448
            if node.total > 0:
5✔
449
                pcent = node.free * 100 / node.total
5✔
450
            freepercent = f'{pcent:.1f}%'
5✔
451
            # get the date
452
            timestamp = ''
5✔
453
            if self._has_attr(node, 'ts'):
5✔
454
                timestamp = 'date:'
5✔
455
                timestamp += epoch_to_str(node.ts)
5✔
456
            disksize = ''
5✔
457
            # the children size
458
            recsize = self.rec_size(node, store=False)
5✔
459
            sizestr = size_to_str(recsize, raw=raw)
5✔
460
            disksize = 'totsize:' + f'{sizestr}'
5✔
461
            # format the output
462
            name = node.name
5✔
463
            args = [
5✔
464
                'nbfiles:' + f'{nbchildren}',
465
                disksize,
466
                f'free:{freepercent}',
467
                'du:' + f'{szused}/{sztotal}',
468
                timestamp]
469
            argsstring = ' | '.join(args)
5✔
470
            NodePrinter.print_storage_native(pre,
5✔
471
                                             name,
472
                                             argsstring,
473
                                             node.attr)
474
        elif node.type == nodes.TYPE_ARCHIVED:
×
475
            # archive node
476
            node.__class__ = NodeArchived
×
477
            if self.arc:
×
478
                NodePrinter.print_archive_native(pre, node.name, node.archive)
×
479
        else:
480
            Logger.err(f'bad node encountered: {node}')
×
481

482
    def print_tree(self, node: NodeAny,
5✔
483
                   fmt: str = 'native',
484
                   raw: bool = False) -> None:
485
        """
486
        print the tree in different format
487
        @node: start node
488
        @style: when fmt=native, defines the tree style
489
        @fmt: output format
490
        @raw: print the raw size rather than human readable
491
        """
492
        if fmt == 'native':
5✔
493
            # "tree" style
494
            rend = anytree.RenderTree(node, childiter=self._sort_tree)
5✔
495
            for pre, _, thenode in rend:
5✔
496
                self._print_node_native(thenode, pre=pre,
5✔
497
                                        withdepth=True, raw=raw)
498
        elif fmt == 'csv':
5✔
499
            # csv output
500
            self._to_csv(node, raw=raw)
5✔
501
        elif fmt == 'csv-with-header':
×
502
            # csv output
503
            Logger.stdout_nocolor(self.CSV_HEADER)
×
504
            self._to_csv(node, raw=raw)
×
505

506
    def _to_csv(self, node: NodeAny,
5✔
507
                raw: bool = False) -> None:
508
        """print the tree to csv"""
509
        rend = anytree.RenderTree(node, childiter=self._sort_tree)
5✔
510
        for _, _, item in rend:
5✔
511
            self._node_to_csv(item, raw=raw)
5✔
512

513
    @staticmethod
5✔
514
    def _fzf_prompt(strings: Any) -> Any:
5✔
515
        """prompt with fzf"""
516
        try:
×
517
            from pyfzf.pyfzf import FzfPrompt  # type: ignore # pylint: disable=C0415 # noqa
×
518
            fzf = FzfPrompt()
×
519
            selected = fzf.prompt(strings)
×
520
            return selected
×
521
        except ModuleNotFoundError:
×
522
            Logger.err('install pyfzf to use fzf')
×
523
            return None
×
524

525
    def _to_fzf(self, node: NodeAny, fmt: str) -> None:
5✔
526
        """
527
        fzf prompt with list and print selected node(s)
528
        @node: node to start with
529
        @fmt: output format for selected nodes
530
        """
531
        rendered = anytree.RenderTree(node, childiter=self._sort_tree)
×
532
        the_nodes = {}
×
533
        # construct node names list
534
        for _, _, rend in rendered:
×
535
            if not rend:
×
536
                continue
×
537
            parents = self._get_parents(rend)
×
538
            storage = self._get_storage(rend)
×
539
            fullpath = os.path.join(storage.name, parents)
×
540
            the_nodes[fullpath] = rend
×
541
        # prompt with fzf
542
        paths = self._fzf_prompt(the_nodes.keys())
×
543
        # print the resulting tree
544
        subfmt = fmt.replace('fzf-', '')
×
545
        for path in paths:
×
546
            if not path:
×
547
                continue
×
548
            if path not in the_nodes:
×
549
                continue
×
550
            rend = the_nodes[path]
×
551
            self.print_tree(rend, fmt=subfmt)
×
552

553
    @staticmethod
5✔
554
    def to_dot(top: NodeTop,
5✔
555
               path: str = 'tree.dot') -> str:
556
        """export to dot for graphing"""
557
        anytree.exporter.DotExporter(top).to_dotfile(path)
5✔
558
        Logger.info(f'dot file created under \"{path}\"')
5✔
559
        return f'dot {path} -T png -o /tmp/tree.png'
5✔
560

561
    ###############################################################
562
    # searching
563
    ###############################################################
564
    def find_name(self, top: NodeTop,
5✔
565
                  key: str,
566
                  script: bool = False,
567
                  only_dir: bool = False,
568
                  startnode: Optional[NodeAny] = None,
569
                  parentfromtree: bool = False,
570
                  fmt: str = 'native',
571
                  raw: bool = False) -> List[NodeAny]:
572
        """
573
        find files based on their names
574
        @top: top node
575
        @key: term to search for
576
        @script: output script
577
        @directory: only search for directories
578
        @startpath: node to start with
579
        @parentfromtree: get path from parent instead of stored relpath
580
        @fmt: output format
581
        @raw: raw size output
582
        returns the found nodes
583
        """
584
        self._debug(f'searching for \"{key}\"')
5✔
585

586
        # search for nodes based on path
587
        start: Optional[NodeAny] = top
5✔
588
        if startnode:
5✔
589
            start = self.get_node(top, startnode)
×
590
        filterfunc = self._callback_find_name(key, only_dir)
5✔
591
        found = anytree.findall(start, filter_=filterfunc)
5✔
592
        self._debug(f'found {len(found)} node(s)')
5✔
593

594
        # compile found nodes
595
        paths = {}
5✔
596
        for item in found:
5✔
597
            item.name = fix_badchars(item.name)
5✔
598
            if hasattr(item, 'relpath'):
5✔
599
                item.relpath = fix_badchars(item.relpath)
5✔
600
            storage = self._get_storage(item)
5✔
601
            if parentfromtree:
5✔
602
                parent = self._get_parents(item)
×
603
                key = f'{storage}/{parent}/{item.relpath}'
×
604
                paths[parent] = item
×
605
            else:
606
                key = f'{storage}/{item.path}'
5✔
607
                paths[key] = item
5✔
608

609
        # handle fzf mode
610
        if fmt.startswith('fzf'):
5✔
611
            selected = self._fzf_prompt(paths.keys())
×
612
            newpaths = {}
×
613
            subfmt = fmt.replace('fzf-', '')
×
614
            for item in selected:
×
615
                if item not in paths:
×
616
                    continue
×
617
                newpaths[item] = paths[item]
×
618
                self.print_tree(newpaths[item], fmt=subfmt)
×
619
            paths = newpaths
×
620
        else:
621
            if fmt == 'native':
5✔
622
                for _, item in paths.items():
5✔
623
                    self._print_node_native(item, withpath=True,
5✔
624
                                            withdepth=True,
625
                                            withstorage=True,
626
                                            recalcparent=parentfromtree,
627
                                            raw=raw)
628
            elif fmt.startswith('csv'):
×
629
                if fmt == 'csv-with-header':
×
630
                    Logger.stdout_nocolor(self.CSV_HEADER)
×
631
                for _, item in paths.items():
×
632
                    self._node_to_csv(item, raw=raw)
×
633

634
        # execute script if any
635
        if script:
5✔
636
            tmp = ['${source}/' + x for x in paths]
5✔
637
            tmpstr = ' '.join(tmp)
5✔
638
            cmd = f'op=file; source=/media/mnt; $op {tmpstr}'
5✔
639
            Logger.info(cmd)
5✔
640

641
        return list(paths.values())
5✔
642

643
    def _callback_find_name(self, term: str, only_dir: bool) -> Any:
5✔
644
        """callback for finding files"""
645
        def find_name(node: NodeAny) -> bool:
5✔
646
            if node.type == nodes.TYPE_STORAGE:
5✔
647
                # ignore storage nodes
648
                return False
5✔
649
            if node.type == nodes.TYPE_TOP:
5✔
650
                # ignore top nodes
651
                return False
5✔
652
            if node.type == nodes.TYPE_META:
5✔
653
                # ignore meta nodes
654
                return False
×
655
            if only_dir and node.type == nodes.TYPE_DIR:
5✔
656
                # ignore non directory
657
                return False
×
658

659
            # filter
660
            if not term:
5✔
661
                return True
×
662
            if term.lower() in node.name.lower():
5✔
663
                return True
5✔
664

665
            # ignore
666
            return False
5✔
667
        return find_name
5✔
668

669
    ###############################################################
670
    # ls
671
    ###############################################################
672
    def list(self, top: NodeTop,
5✔
673
             path: str,
674
             rec: bool = False,
675
             fmt: str = 'native',
676
             raw: bool = False) -> List[NodeAny]:
677
        """
678
        list nodes for "ls"
679
        @top: top node
680
        @path: path to search for
681
        @rec: recursive walk
682
        @fmt: output format
683
        @raw: print raw size
684
        """
685
        self._debug(f'walking path: \"{path}\" from {top}')
5✔
686

687
        resolv = anytree.resolver.Resolver('name')
5✔
688
        found = []
5✔
689
        try:
5✔
690
            # resolve the path in the tree
691
            found = resolv.glob(top, path)
5✔
692
            if len(found) < 1:
5✔
693
                # nothing found
694
                self._debug('nothing found')
×
695
                return []
×
696

697
            if rec:
5✔
698
                # print the entire tree
699
                self.print_tree(found[0].parent, fmt=fmt, raw=raw)
5✔
700
                return found
5✔
701

702
            # sort found nodes
703
            found = sorted(found, key=self._sort, reverse=self.sortsize)
5✔
704

705
            # print the parent
706
            if fmt == 'native':
5✔
707
                self._print_node_native(found[0].parent,
5✔
708
                                        withpath=False,
709
                                        withdepth=True,
710
                                        raw=raw)
711
            elif fmt.startswith('csv'):
×
712
                self._node_to_csv(found[0].parent, raw=raw)
×
713
            elif fmt.startswith('fzf'):
×
714
                pass
715

716
            # print all found nodes
717
            if fmt == 'csv-with-header':
5✔
718
                Logger.stdout_nocolor(self.CSV_HEADER)
×
719
            for item in found:
5✔
720
                if fmt == 'native':
5✔
721
                    self._print_node_native(item, withpath=False,
5✔
722
                                            pre='- ',
723
                                            withdepth=True,
724
                                            raw=raw)
725
                elif fmt.startswith('csv'):
×
726
                    self._node_to_csv(item, raw=raw)
×
727
                elif fmt.startswith('fzf'):
×
728
                    self._to_fzf(item, fmt)
×
729

730
        except anytree.resolver.ChildResolverError:
5✔
731
            pass
5✔
732
        return found
5✔
733

734
    ###############################################################
735
    # tree creation
736
    ###############################################################
737
    def _add_entry(self, name: str,
5✔
738
                   top: NodeTop,
739
                   resolv: Any) -> None:
740
        """add an entry to the tree"""
741
        entries = name.rstrip(os.sep).split(os.sep)
×
742
        if len(entries) == 1:
×
743
            self.new_archive_node(name, name, top, top.name)
×
744
            return
×
745
        sub = os.sep.join(entries[:-1])
×
746
        nodename = entries[-1]
×
747
        try:
×
748
            parent = resolv.get(top, sub)
×
749
            parent = self.new_archive_node(nodename, name, parent, top.name)
×
750
        except anytree.resolver.ChildResolverError:
×
751
            self.new_archive_node(nodename, name, top, top.name)
×
752

753
    def list_to_tree(self, parent: NodeAny, names: List[str]) -> None:
5✔
754
        """convert list of files to a tree"""
755
        if not names:
×
756
            return
×
757
        resolv = anytree.resolver.Resolver('name')
×
758
        for name in names:
×
759
            name = name.rstrip(os.sep)
×
760
            self._add_entry(name, parent, resolv)
×
761

762
    ###############################################################
763
    # diverse
764
    ###############################################################
765
    def _sort_tree(self,
5✔
766
                   items: List[NodeAny]) -> List[NodeAny]:
767
        """sorting a list of items"""
768
        return sorted(items, key=self._sort, reverse=self.sortsize)
5✔
769

770
    def _sort(self, lst: NodeAny) -> Any:
5✔
771
        """sort a list"""
772
        if self.sortsize:
5✔
773
            return self._sort_size(lst)
×
774
        return self._sort_fs(lst)
5✔
775

776
    @staticmethod
5✔
777
    def _sort_fs(node: NodeAny) -> Tuple[str, str]:
5✔
778
        """sorting nodes dir first and alpha"""
779
        return (node.type, node.name.lstrip('.').lower())
5✔
780

781
    @staticmethod
5✔
782
    def _sort_size(node: NodeAny) -> float:
5✔
783
        """sorting nodes by size"""
784
        try:
×
785
            if not node.nodesize:
×
786
                return 0
×
787
            return float(node.nodesize)
×
788
        except AttributeError:
×
789
            return 0
×
790

791
    def _get_storage(self, node: NodeAny) -> NodeStorage:
5✔
792
        """recursively traverse up to find storage"""
793
        if node.type == nodes.TYPE_STORAGE:
5✔
794
            return node
×
795
        return cast(NodeStorage, node.ancestors[1])
5✔
796

797
    @staticmethod
5✔
798
    def _has_attr(node: NodeAny, attr: str) -> bool:
5✔
799
        """return True if node has attr as attribute"""
800
        return attr in node.__dict__.keys()
5✔
801

802
    def _get_parents(self, node: NodeAny) -> str:
5✔
803
        """get all parents recursively"""
804
        if node.type == nodes.TYPE_STORAGE:
5✔
805
            return ''
5✔
806
        if node.type == nodes.TYPE_TOP:
5✔
807
            return ''
×
808
        parent = self._get_parents(node.parent)
5✔
809
        if parent:
5✔
810
            return os.sep.join([parent, node.name])
5✔
811
        return str(node.name)
5✔
812

813
    @staticmethod
5✔
814
    def _get_hash(path: str) -> str:
5✔
815
        """return md5 hash of node"""
816
        try:
5✔
817
            return md5sum(path)
5✔
818
        except CatcliException as exc:
×
819
            Logger.err(str(exc))
×
820
            return ''
×
821

822
    def _debug(self, string: str) -> None:
5✔
823
        """print debug"""
824
        if not self.debug:
5✔
825
            return
5✔
826
        Logger.debug(string)
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc