• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deadc0de6 / catcli / 4399176145

pending completion
4399176145

Pull #30

github

GitHub
Merge ee2cf80d9 into 7590ad02c
Pull Request #30: Fuse

474 of 474 new or added lines in 13 files covered. (100.0%)

1304 of 1691 relevant lines covered (77.11%)

3.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.41
/catcli/noder.py
1
"""
2
author: deadc0de6 (https://github.com/deadc0de6)
3
Copyright (c) 2017, deadc0de6
4

5
Class that process nodes in the catalog tree
6
"""
7

8
import os
5✔
9
import shutil
5✔
10
import time
5✔
11
from typing import List, Union, Tuple, Any, Optional, Dict, cast
5✔
12
import anytree  # type: ignore
5✔
13
from pyfzf.pyfzf import FzfPrompt  # type: ignore
5✔
14

15
# local imports
16
from catcli import nodes
5✔
17
from catcli.nodes import NodeAny, NodeStorage, \
5✔
18
    NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta
19
from catcli.utils import size_to_str, epoch_to_str, md5sum, fix_badchars
5✔
20
from catcli.logger import Logger
5✔
21
from catcli.nodeprinter import NodePrinter
5✔
22
from catcli.decomp import Decomp
5✔
23
from catcli.version import __version__ as VERSION
5✔
24
from catcli.exceptions import CatcliException
5✔
25

26

27
class Noder:
5✔
28
    """
29
    handles node in the catalog tree
30
    There are 4 types of node:
31
    * "top" node representing the top node (generic node)
32
    * "storage" node representing a storage
33
    * "dir" node representing a directory
34
    * "file" node representing a file
35
    """
36

37
    CSV_HEADER = ('name,type,path,size,indexed_at,'
5✔
38
                  'maccess,md5,nbfiles,free_space,'
39
                  'total_space,meta')
40

41
    def __init__(self, debug: bool = False,
5✔
42
                 sortsize: bool = False,
43
                 arc: bool = False) -> None:
44
        """
45
        @debug: debug mode
46
        @sortsize: sort nodes by size
47
        @arch: handle archive
48
        """
49
        self.hash = True
5✔
50
        self.debug = debug
5✔
51
        self.sortsize = sortsize
5✔
52
        self.arc = arc
5✔
53
        if self.arc:
5✔
54
            self.decomp = Decomp()
×
55

56
    @staticmethod
5✔
57
    def get_storage_names(top: NodeTop) -> List[str]:
5✔
58
        """return a list of all storage names"""
59
        return [x.name for x in list(top.children)]
5✔
60

61
    def get_storage_node(self, top: NodeTop,
5✔
62
                         name: str,
63
                         newpath: str = '') -> NodeStorage:
64
        """
65
        return the storage node if any
66
        if newpath is submitted, it will update the media info
67
        """
68
        found = None
5✔
69
        for node in top.children:
5✔
70
            if node.type != nodes.TYPE_STORAGE:
5✔
71
                continue
×
72
            if node.name == name:
5✔
73
                found = node
5✔
74
                break
5✔
75
        if found and newpath and os.path.exists(newpath):
5✔
76
            found.free = shutil.disk_usage(newpath).free
5✔
77
            found.total = shutil.disk_usage(newpath).total
5✔
78
            found.ts = int(time.time())
5✔
79
        return cast(NodeStorage, found)
5✔
80

81
    @staticmethod
5✔
82
    def get_node(top: NodeTop,
5✔
83
                 path: str,
84
                 quiet: bool = False) -> Optional[NodeAny]:
85
        """get the node by internal tree path"""
86
        resolv = anytree.resolver.Resolver('name')
5✔
87
        try:
5✔
88
            bpath = os.path.basename(path)
5✔
89
            the_node = resolv.get(top, bpath)
5✔
90
            return cast(NodeAny, the_node)
5✔
91
        except anytree.resolver.ChildResolverError:
5✔
92
            if not quiet:
5✔
93
                Logger.err(f'No node at path \"{bpath}\"')
×
94
            return None
5✔
95

96
    def get_node_if_changed(self,
5✔
97
                            top: NodeTop,
98
                            path: str,
99
                            treepath: str) -> Tuple[Optional[NodeAny], bool]:
100
        """
101
        return the node (if any) and if it has changed
102
        @top: top node (storage)
103
        @path: abs path to file
104
        @treepath: rel path from indexed directory
105
        """
106
        treepath = treepath.lstrip(os.sep)
5✔
107
        node = self.get_node(top, treepath, quiet=True)
5✔
108
        # node does not exist
109
        if not node:
5✔
110
            self._debug('\tchange: node does not exist')
5✔
111
            return None, True
5✔
112
        if os.path.isdir(path):
5✔
113
            return node, False
5✔
114
        # force re-indexing if no maccess
115
        maccess = os.path.getmtime(path)
5✔
116
        if not self._has_attr(node, 'maccess') or \
5✔
117
                not node.maccess:
118
            self._debug('\tchange: no maccess found')
×
119
            return node, True
×
120
        # maccess changed
121
        old_maccess = node.maccess
5✔
122
        if float(maccess) != float(old_maccess):
5✔
123
            self._debug(f'\tchange: maccess changed for \"{path}\"')
5✔
124
            return node, True
5✔
125
        # test hash
126
        if self.hash and node.md5:
5✔
127
            md5 = self._get_hash(path)
5✔
128
            if md5 and md5 != node.md5:
5✔
129
                msg = f'\tchange: checksum changed for \"{path}\"'
5✔
130
                self._debug(msg)
5✔
131
                return node, True
5✔
132
        self._debug(f'\tchange: no change for \"{path}\"')
5✔
133
        return node, False
5✔
134

135
    def rec_size(self, node: Union[NodeDir, NodeStorage],
5✔
136
                 store: bool = True) -> int:
137
        """
138
        recursively traverse tree and return size
139
        @store: store the size in the node
140
        """
141
        if node.type == nodes.TYPE_FILE:
5✔
142
            self._debug(f'size of {node.type} \"{node.name}\": {node.size}')
5✔
143
            return node.size
5✔
144
        msg = f'getting node size recursively for \"{node.name}\"'
5✔
145
        self._debug(msg)
5✔
146
        size: int = 0
5✔
147
        for i in node.children:
5✔
148
            if node.type == nodes.TYPE_DIR:
5✔
149
                sub_size = self.rec_size(i, store=store)
5✔
150
                if store:
5✔
151
                    i.size = sub_size
5✔
152
                size += sub_size
5✔
153
                continue
5✔
154
            if node.type == nodes.TYPE_STORAGE:
5✔
155
                sub_size = self.rec_size(i, store=store)
5✔
156
                if store:
5✔
157
                    i.size = sub_size
5✔
158
                size += sub_size
5✔
159
                continue
5✔
160
            self._debug(f'skipping {node.name}')
×
161
        if store:
5✔
162
            node.size = size
5✔
163
        self._debug(f'size of {node.type} \"{node.name}\": {size}')
5✔
164
        return size
5✔
165

166
    ###############################################################
167
    # public helpers
168
    ###############################################################
169
    @staticmethod
5✔
170
    def attrs_to_string(attr: Union[List[str], Dict[str, str], str]) -> str:
5✔
171
        """format the storage attr for saving"""
172
        if not attr:
5✔
173
            return ''
5✔
174
        if isinstance(attr, list):
5✔
175
            return ', '.join(attr)
5✔
176
        if isinstance(attr, dict):
×
177
            ret = []
×
178
            for key, val in attr.items():
×
179
                ret.append(f'{key}={val}')
×
180
            return ', '.join(ret)
×
181
        attr = attr.rstrip()
×
182
        return attr
×
183

184
    def do_hashing(self, val: bool) -> None:
5✔
185
        """hash files when indexing"""
186
        self.hash = val
5✔
187

188
    ###############################################################
189
    # node creation
190
    ###############################################################
191
    def new_top_node(self) -> NodeTop:
5✔
192
        """create a new top node"""
193
        top = NodeTop(nodes.NAME_TOP)
5✔
194
        self._debug(f'new top node: {top}')
5✔
195
        return top
5✔
196

197
    def new_file_node(self, name: str, path: str,
5✔
198
                      parent: NodeAny, storagepath: str) -> Optional[NodeFile]:
199
        """create a new node representing a file"""
200
        if not os.path.exists(path):
5✔
201
            Logger.err(f'File \"{path}\" does not exist')
×
202
            return None
×
203
        path = os.path.abspath(path)
5✔
204
        try:
5✔
205
            stat = os.lstat(path)
5✔
206
        except OSError as exc:
×
207
            Logger.err(f'OSError: {exc}')
×
208
            return None
×
209
        md5 = ''
5✔
210
        if self.hash:
5✔
211
            md5 = self._get_hash(path)
5✔
212
        relpath = os.sep.join([storagepath, name])
5✔
213

214
        maccess = os.path.getmtime(path)
5✔
215
        node = NodeFile(name,
5✔
216
                        relpath,
217
                        stat.st_size,
218
                        md5,
219
                        maccess,
220
                        parent=parent)
221
        if self.arc:
5✔
222
            ext = os.path.splitext(path)[1][1:]
×
223
            if ext.lower() in self.decomp.get_formats():
×
224
                self._debug(f'{path} is an archive')
×
225
                names = self.decomp.get_names(path)
×
226
                self.list_to_tree(node, names)
×
227
            else:
228
                self._debug(f'{path} is NOT an archive')
×
229
        return node
5✔
230

231
    def new_dir_node(self, name: str, path: str,
5✔
232
                     parent: NodeAny, storagepath: str) -> NodeDir:
233
        """create a new node representing a directory"""
234
        path = os.path.abspath(path)
5✔
235
        relpath = os.sep.join([storagepath, name])
5✔
236
        maccess = os.path.getmtime(path)
5✔
237
        return NodeDir(name,
5✔
238
                       relpath,
239
                       0,
240
                       maccess,
241
                       parent=parent)
242

243
    def new_storage_node(self, name: str,
5✔
244
                         path: str,
245
                         parent: str,
246
                         attrs: Dict[str, Any]) \
247
            -> NodeStorage:
248
        """create a new node representing a storage"""
249
        path = os.path.abspath(path)
5✔
250
        free = shutil.disk_usage(path).free
5✔
251
        total = shutil.disk_usage(path).total
5✔
252
        epoch = int(time.time())
5✔
253
        return NodeStorage(name,
5✔
254
                           free,
255
                           total,
256
                           0,
257
                           epoch,
258
                           self.attrs_to_string(attrs),
259
                           parent=parent)
260

261
    def new_archive_node(self, name: str, path: str,
5✔
262
                         parent: str, archive: str) -> NodeArchived:
263
        """create a new node for archive data"""
264
        return NodeArchived(name=name, relpath=path,
×
265
                            parent=parent, size=0, md5='',
266
                            archive=archive)
267

268
    ###############################################################
269
    # node management
270
    ###############################################################
271
    def update_metanode(self, top: NodeTop) -> NodeMeta:
5✔
272
        """create or update meta node information"""
273
        meta = self._get_meta_node(top)
5✔
274
        epoch = int(time.time())
5✔
275
        if not meta:
5✔
276
            attrs: Dict[str, Any] = {}
5✔
277
            attrs['created'] = epoch
5✔
278
            attrs['created_version'] = VERSION
5✔
279
            meta = NodeMeta(name=nodes.NAME_META,
5✔
280
                            attr=attrs)
281
        meta.attr['access'] = epoch
5✔
282
        meta.attr['access_version'] = VERSION
5✔
283
        return meta
5✔
284

285
    def _get_meta_node(self, top: NodeTop) -> Optional[NodeMeta]:
5✔
286
        """return the meta node if any"""
287
        try:
5✔
288
            found = next(filter(lambda x: x.type == nodes.TYPE_META,
5✔
289
                         top.children))
290
            return cast(NodeMeta, found)
5✔
291
        except StopIteration:
5✔
292
            return None
5✔
293

294
    def clean_not_flagged(self, top: NodeTop) -> int:
5✔
295
        """remove any node not flagged and clean flags"""
296
        cnt = 0
5✔
297
        for node in anytree.PreOrderIter(top):
5✔
298
            if node.type not in [nodes.TYPE_DIR, nodes.TYPE_FILE]:
5✔
299
                continue
5✔
300
            if self._clean(node):
5✔
301
                cnt += 1
5✔
302
        return cnt
5✔
303

304
    def _clean(self, node: NodeAny) -> bool:
5✔
305
        """remove node if not flagged"""
306
        if not node.flagged():
5✔
307
            node.parent = None
5✔
308
            return True
5✔
309
        node.unflag()
5✔
310
        return False
5✔
311

312
    ###############################################################
313
    # printing
314
    ###############################################################
315
    def _node_to_csv(self, node: NodeAny,
5✔
316
                     sep: str = ',',
317
                     raw: bool = False) -> None:
318
        """
319
        print a node to csv
320
        @node: the node to consider
321
        @sep: CSV separator character
322
        @raw: print raw size rather than human readable
323
        """
324
        if not node:
5✔
325
            return
×
326
        if node.type == nodes.TYPE_TOP:
5✔
327
            return
5✔
328

329
        out = []
5✔
330
        if node.type == nodes.TYPE_STORAGE:
5✔
331
            # handle storage
332
            out.append(node.name)   # name
5✔
333
            out.append(node.type)   # type
5✔
334
            out.append('')          # fake full path
5✔
335
            size = self.rec_size(node, store=False)
5✔
336
            out.append(size_to_str(size, raw=raw))  # size
5✔
337
            out.append(epoch_to_str(node.ts))  # indexed_at
5✔
338
            out.append('')  # fake maccess
5✔
339
            out.append('')  # fake md5
5✔
340
            out.append(str(len(node.children)))  # nbfiles
5✔
341
            # fake free_space
342
            out.append(size_to_str(node.free, raw=raw))
5✔
343
            # fake total_space
344
            out.append(size_to_str(node.total, raw=raw))
5✔
345
            out.append(node.attr)  # meta
5✔
346
        else:
347
            # handle other nodes
348
            out.append(node.name.replace('"', '""'))  # name
5✔
349
            out.append(node.type)  # type
5✔
350
            parents = self._get_parents(node)
5✔
351
            storage = self._get_storage(node)
5✔
352
            fullpath = os.path.join(storage.name, parents)
5✔
353
            out.append(fullpath.replace('"', '""'))  # full path
5✔
354

355
            out.append(size_to_str(node.size, raw=raw))  # size
5✔
356
            out.append(epoch_to_str(storage.ts))  # indexed_at
5✔
357
            if self._has_attr(node, 'maccess'):
5✔
358
                out.append(epoch_to_str(node.maccess))  # maccess
5✔
359
            else:
360
                out.append('')  # fake maccess
×
361
            if self._has_attr(node, 'md5'):
5✔
362
                out.append(node.md5)  # md5
5✔
363
            else:
364
                out.append('')  # fake md5
5✔
365
            if node.type == nodes.TYPE_DIR:
5✔
366
                out.append(str(len(node.children)))  # nbfiles
5✔
367
            else:
368
                out.append('')  # fake nbfiles
5✔
369
            out.append('')  # fake free_space
5✔
370
            out.append('')  # fake total_space
5✔
371
            out.append('')  # fake meta
5✔
372

373
        line = sep.join(['"' + o + '"' for o in out])
5✔
374
        if len(line) > 0:
5✔
375
            Logger.stdout_nocolor(line)
5✔
376

377
    def _print_node_native(self, node: NodeAny,
5✔
378
                           pre: str = '',
379
                           withpath: bool = False,
380
                           withdepth: bool = False,
381
                           withstorage: bool = False,
382
                           recalcparent: bool = False,
383
                           raw: bool = False) -> None:
384
        """
385
        print a node
386
        @node: the node to print
387
        @pre: string to print before node
388
        @withpath: print the node path
389
        @withdepth: print the node depth info
390
        @withstorage: print the node storage it belongs to
391
        @recalcparent: get relpath from tree instead of relpath field
392
        @raw: print raw size rather than human readable
393
        """
394
        if node.type == nodes.TYPE_TOP:
5✔
395
            # top node
396
            Logger.stdout_nocolor(f'{pre}{node.name}')
5✔
397
        elif node.type == nodes.TYPE_FILE:
5✔
398
            # node of type file
399
            name = node.name
5✔
400
            if withpath:
5✔
401
                if recalcparent:
5✔
402
                    name = os.sep.join([self._get_parents(node.parent), name])
×
403
                else:
404
                    name = node.relpath
5✔
405
            name = name.lstrip(os.sep)
5✔
406
            if withstorage:
5✔
407
                storage = self._get_storage(node)
5✔
408
            attr_str = ''
5✔
409
            if node.md5:
5✔
410
                attr_str = f', md5:{node.md5}'
5✔
411
            size = size_to_str(node.size, raw=raw)
5✔
412
            compl = f'size:{size}{attr_str}'
5✔
413
            if withstorage:
5✔
414
                content = Logger.get_bold_text(storage.name)
5✔
415
                compl += f', storage:{content}'
5✔
416
            NodePrinter.print_file_native(pre, name, compl)
5✔
417
        elif node.type == nodes.TYPE_DIR:
5✔
418
            # node of type directory
419
            name = node.name
5✔
420
            if withpath:
5✔
421
                if recalcparent:
×
422
                    name = os.sep.join([self._get_parents(node.parent), name])
×
423
                else:
424
                    name = node.relpath
×
425
            name = name.lstrip(os.sep)
5✔
426
            depth = 0
5✔
427
            if withdepth:
5✔
428
                depth = len(node.children)
5✔
429
            if withstorage:
5✔
430
                storage = self._get_storage(node)
×
431
            attr: List[Tuple[str, str]] = []
5✔
432
            if node.size:
5✔
433
                attr.append(('totsize', size_to_str(node.size, raw=raw)))
5✔
434
            if withstorage:
5✔
435
                attr.append(('storage', Logger.get_bold_text(storage.name)))
×
436
            NodePrinter.print_dir_native(pre, name, depth=depth, attr=attr)
5✔
437
        elif node.type == nodes.TYPE_STORAGE:
5✔
438
            # node of type storage
439
            sztotal = size_to_str(node.total, raw=raw)
5✔
440
            szused = size_to_str(node.total - node.free, raw=raw)
5✔
441
            nbchildren = len(node.children)
5✔
442
            pcent = 0
5✔
443
            if node.total > 0:
5✔
444
                pcent = node.free * 100 / node.total
5✔
445
            freepercent = f'{pcent:.1f}%'
5✔
446
            # get the date
447
            timestamp = ''
5✔
448
            if self._has_attr(node, 'ts'):
5✔
449
                timestamp = 'date:'
5✔
450
                timestamp += epoch_to_str(node.ts)
5✔
451
            disksize = ''
5✔
452
            # the children size
453
            recsize = self.rec_size(node, store=False)
5✔
454
            sizestr = size_to_str(recsize, raw=raw)
5✔
455
            disksize = 'totsize:' + f'{sizestr}'
5✔
456
            # format the output
457
            name = node.name
5✔
458
            args = [
5✔
459
                'nbfiles:' + f'{nbchildren}',
460
                disksize,
461
                f'free:{freepercent}',
462
                'du:' + f'{szused}/{sztotal}',
463
                timestamp]
464
            argsstring = ' | '.join(args)
5✔
465
            NodePrinter.print_storage_native(pre,
5✔
466
                                             name,
467
                                             argsstring,
468
                                             node.attr)
469
        elif node.type == nodes.TYPE_ARCHIVED:
×
470
            # archive node
471
            if self.arc:
×
472
                NodePrinter.print_archive_native(pre, node.name, node.archive)
×
473
        else:
474
            Logger.err(f'bad node encountered: {node}')
×
475

476
    def print_tree(self, node: NodeAny,
5✔
477
                   fmt: str = 'native',
478
                   raw: bool = False) -> None:
479
        """
480
        print the tree in different format
481
        @node: start node
482
        @style: when fmt=native, defines the tree style
483
        @fmt: output format
484
        @raw: print the raw size rather than human readable
485
        """
486
        if fmt == 'native':
5✔
487
            # "tree" style
488
            rend = anytree.RenderTree(node, childiter=self._sort_tree)
5✔
489
            for pre, _, thenode in rend:
5✔
490
                self._print_node_native(thenode, pre=pre,
5✔
491
                                        withdepth=True, raw=raw)
492
        elif fmt == 'csv':
5✔
493
            # csv output
494
            self._to_csv(node, raw=raw)
5✔
495
        elif fmt == 'csv-with-header':
×
496
            # csv output
497
            Logger.stdout_nocolor(self.CSV_HEADER)
×
498
            self._to_csv(node, raw=raw)
×
499

500
    def _to_csv(self, node: NodeAny,
5✔
501
                raw: bool = False) -> None:
502
        """print the tree to csv"""
503
        rend = anytree.RenderTree(node, childiter=self._sort_tree)
5✔
504
        for _, _, item in rend:
5✔
505
            self._node_to_csv(item, raw=raw)
5✔
506

507
    @staticmethod
5✔
508
    def _fzf_prompt(strings: Any) -> Any:
5✔
509
        # prompt with fzf
510
        fzf = FzfPrompt()
×
511
        selected = fzf.prompt(strings)
×
512
        return selected
×
513

514
    def _to_fzf(self, node: NodeAny, fmt: str) -> None:
5✔
515
        """
516
        fzf prompt with list and print selected node(s)
517
        @node: node to start with
518
        @fmt: output format for selected nodes
519
        """
520
        rendered = anytree.RenderTree(node, childiter=self._sort_tree)
×
521
        the_nodes = {}
×
522
        # construct node names list
523
        for _, _, rend in rendered:
×
524
            if not rend:
×
525
                continue
×
526
            parents = self._get_parents(rend)
×
527
            storage = self._get_storage(rend)
×
528
            fullpath = os.path.join(storage.name, parents)
×
529
            the_nodes[fullpath] = rend
×
530
        # prompt with fzf
531
        paths = self._fzf_prompt(the_nodes.keys())
×
532
        # print the resulting tree
533
        subfmt = fmt.replace('fzf-', '')
×
534
        for path in paths:
×
535
            if not path:
×
536
                continue
×
537
            if path not in the_nodes:
×
538
                continue
×
539
            rend = the_nodes[path]
×
540
            self.print_tree(rend, fmt=subfmt)
×
541

542
    @staticmethod
5✔
543
    def to_dot(top: NodeTop,
5✔
544
               path: str = 'tree.dot') -> str:
545
        """export to dot for graphing"""
546
        anytree.exporter.DotExporter(top).to_dotfile(path)
5✔
547
        Logger.info(f'dot file created under \"{path}\"')
5✔
548
        return f'dot {path} -T png -o /tmp/tree.png'
5✔
549

550
    ###############################################################
551
    # searching
552
    ###############################################################
553
    def find_name(self, top: NodeTop,
5✔
554
                  key: str,
555
                  script: bool = False,
556
                  only_dir: bool = False,
557
                  startnode: Optional[NodeAny] = None,
558
                  parentfromtree: bool = False,
559
                  fmt: str = 'native',
560
                  raw: bool = False) -> List[NodeAny]:
561
        """
562
        find files based on their names
563
        @top: top node
564
        @key: term to search for
565
        @script: output script
566
        @directory: only search for directories
567
        @startpath: node to start with
568
        @parentfromtree: get path from parent instead of stored relpath
569
        @fmt: output format
570
        @raw: raw size output
571
        returns the found nodes
572
        """
573
        self._debug(f'searching for \"{key}\"')
5✔
574

575
        # search for nodes based on path
576
        start: Optional[NodeAny] = top
5✔
577
        if startnode:
5✔
578
            start = self.get_node(top, startnode)
×
579
        filterfunc = self._callback_find_name(key, only_dir)
5✔
580
        found = anytree.findall(start, filter_=filterfunc)
5✔
581
        nbfound = len(found)
5✔
582
        self._debug(f'found {nbfound} node(s)')
5✔
583

584
        # compile found nodes
585
        paths = {}
5✔
586
        for item in found:
5✔
587
            item.name = fix_badchars(item.name)
5✔
588
            if hasattr(item, 'relpath'):
5✔
589
                item.relpath = fix_badchars(item.relpath)
5✔
590
            if parentfromtree:
5✔
591
                paths[self._get_parents(item)] = item
×
592
            else:
593
                paths[item.relpath] = item
5✔
594

595
        # handle fzf mode
596
        if fmt.startswith('fzf'):
5✔
597
            selected = self._fzf_prompt(paths.keys())
×
598
            newpaths = {}
×
599
            subfmt = fmt.replace('fzf-', '')
×
600
            for item in selected:
×
601
                if item not in paths:
×
602
                    continue
×
603
                newpaths[item] = paths[item]
×
604
                self.print_tree(newpaths[item], fmt=subfmt)
×
605
            paths = newpaths
×
606
        else:
607
            if fmt == 'native':
5✔
608
                for _, item in paths.items():
5✔
609
                    self._print_node_native(item, withpath=True,
5✔
610
                                            withdepth=True,
611
                                            withstorage=True,
612
                                            recalcparent=parentfromtree,
613
                                            raw=raw)
614
            elif fmt.startswith('csv'):
×
615
                if fmt == 'csv-with-header':
×
616
                    Logger.stdout_nocolor(self.CSV_HEADER)
×
617
                for _, item in paths.items():
×
618
                    self._node_to_csv(item, raw=raw)
×
619

620
        # execute script if any
621
        if script:
5✔
622
            tmp = ['${source}/' + x for x in paths]
5✔
623
            tmpstr = ' '.join(tmp)
5✔
624
            cmd = f'op=file; source=/media/mnt; $op {tmpstr}'
5✔
625
            Logger.info(cmd)
5✔
626

627
        return list(paths.values())
5✔
628

629
    def _callback_find_name(self, term: str, only_dir: bool) -> Any:
5✔
630
        """callback for finding files"""
631
        def find_name(node: NodeAny) -> bool:
5✔
632
            if node.type == nodes.TYPE_STORAGE:
5✔
633
                # ignore storage nodes
634
                return False
5✔
635
            if node.type == nodes.TYPE_TOP:
5✔
636
                # ignore top nodes
637
                return False
5✔
638
            if node.type == nodes.TYPE_META:
5✔
639
                # ignore meta nodes
640
                return False
×
641
            if only_dir and node.type == nodes.TYPE_DIR:
5✔
642
                # ignore non directory
643
                return False
×
644

645
            # filter
646
            if not term:
5✔
647
                return True
×
648
            if term.lower() in node.name.lower():
5✔
649
                return True
5✔
650

651
            # ignore
652
            return False
5✔
653
        return find_name
5✔
654

655
    ###############################################################
656
    # ls
657
    ###############################################################
658
    def list(self, top: NodeTop,
5✔
659
             path: str,
660
             rec: bool = False,
661
             fmt: str = 'native',
662
             raw: bool = False) -> List[NodeAny]:
663
        """
664
        list nodes for "ls"
665
        @top: top node
666
        @path: path to search for
667
        @rec: recursive walk
668
        @fmt: output format
669
        @raw: print raw size
670
        """
671
        self._debug(f'walking path: \"{path}\" from {top}')
5✔
672

673
        resolv = anytree.resolver.Resolver('name')
5✔
674
        found = []
5✔
675
        try:
5✔
676
            # resolve the path in the tree
677
            found = resolv.glob(top, path)
5✔
678
            if len(found) < 1:
5✔
679
                # nothing found
680
                self._debug('nothing found')
×
681
                return []
×
682

683
            if rec:
5✔
684
                # print the entire tree
685
                self.print_tree(found[0].parent, fmt=fmt, raw=raw)
5✔
686
                return found
5✔
687

688
            # sort found nodes
689
            found = sorted(found, key=self._sort, reverse=self.sortsize)
5✔
690

691
            # print the parent
692
            if fmt == 'native':
5✔
693
                self._print_node_native(found[0].parent,
5✔
694
                                        withpath=False,
695
                                        withdepth=True,
696
                                        raw=raw)
697
            elif fmt.startswith('csv'):
×
698
                self._node_to_csv(found[0].parent, raw=raw)
×
699
            elif fmt.startswith('fzf'):
×
700
                pass
701

702
            # print all found nodes
703
            if fmt == 'csv-with-header':
5✔
704
                Logger.stdout_nocolor(self.CSV_HEADER)
×
705
            for item in found:
5✔
706
                if fmt == 'native':
5✔
707
                    self._print_node_native(item, withpath=False,
5✔
708
                                            pre='- ',
709
                                            withdepth=True,
710
                                            raw=raw)
711
                elif fmt.startswith('csv'):
×
712
                    self._node_to_csv(item, raw=raw)
×
713
                elif fmt.startswith('fzf'):
×
714
                    self._to_fzf(item, fmt)
×
715

716
        except anytree.resolver.ChildResolverError:
5✔
717
            pass
5✔
718
        return found
5✔
719

720
    ###############################################################
721
    # tree creation
722
    ###############################################################
723
    def _add_entry(self, name: str,
5✔
724
                   top: NodeTop,
725
                   resolv: Any) -> None:
726
        """add an entry to the tree"""
727
        entries = name.rstrip(os.sep).split(os.sep)
×
728
        if len(entries) == 1:
×
729
            self.new_archive_node(name, name, top, top.name)
×
730
            return
×
731
        sub = os.sep.join(entries[:-1])
×
732
        nodename = entries[-1]
×
733
        try:
×
734
            parent = resolv.get(top, sub)
×
735
            parent = self.new_archive_node(nodename, name, parent, top.name)
×
736
        except anytree.resolver.ChildResolverError:
×
737
            self.new_archive_node(nodename, name, top, top.name)
×
738

739
    def list_to_tree(self, parent: NodeAny, names: List[str]) -> None:
5✔
740
        """convert list of files to a tree"""
741
        if not names:
×
742
            return
×
743
        resolv = anytree.resolver.Resolver('name')
×
744
        for name in names:
×
745
            name = name.rstrip(os.sep)
×
746
            self._add_entry(name, parent, resolv)
×
747

748
    ###############################################################
749
    # diverse
750
    ###############################################################
751
    def _sort_tree(self,
5✔
752
                   items: List[NodeAny]) -> List[NodeAny]:
753
        """sorting a list of items"""
754
        return sorted(items, key=self._sort, reverse=self.sortsize)
5✔
755

756
    def _sort(self, lst: NodeAny) -> Any:
5✔
757
        """sort a list"""
758
        if self.sortsize:
5✔
759
            return self._sort_size(lst)
×
760
        return self._sort_fs(lst)
5✔
761

762
    @staticmethod
5✔
763
    def _sort_fs(node: NodeAny) -> Tuple[str, str]:
5✔
764
        """sorting nodes dir first and alpha"""
765
        return (node.type, node.name.lstrip('.').lower())
5✔
766

767
    @staticmethod
5✔
768
    def _sort_size(node: NodeAny) -> float:
5✔
769
        """sorting nodes by size"""
770
        try:
×
771
            if not node.size:
×
772
                return 0
×
773
            return float(node.size)
×
774
        except AttributeError:
×
775
            return 0
×
776

777
    def _get_storage(self, node: NodeAny) -> NodeStorage:
5✔
778
        """recursively traverse up to find storage"""
779
        if node.type == nodes.TYPE_STORAGE:
5✔
780
            return node
×
781
        return cast(NodeStorage, node.ancestors[1])
5✔
782

783
    @staticmethod
5✔
784
    def _has_attr(node: NodeAny, attr: str) -> bool:
5✔
785
        """return True if node has attr as attribute"""
786
        return attr in node.__dict__.keys()
5✔
787

788
    def _get_parents(self, node: NodeAny) -> str:
5✔
789
        """get all parents recursively"""
790
        if node.type == nodes.TYPE_STORAGE:
5✔
791
            return ''
5✔
792
        if node.type == nodes.TYPE_TOP:
5✔
793
            return ''
×
794
        parent = self._get_parents(node.parent)
5✔
795
        if parent:
5✔
796
            return os.sep.join([parent, node.name])
5✔
797
        return str(node.name)
5✔
798

799
    @staticmethod
5✔
800
    def _get_hash(path: str) -> str:
5✔
801
        """return md5 hash of node"""
802
        try:
5✔
803
            return md5sum(path)
5✔
804
        except CatcliException as exc:
×
805
            Logger.err(str(exc))
×
806
            return ''
×
807

808
    def _debug(self, string: str) -> None:
5✔
809
        """print debug"""
810
        if not self.debug:
5✔
811
            return
5✔
812
        Logger.debug(string)
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc