• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deadc0de6 / catcli / 4601309577

pending completion
4601309577

push

github

deadc0de6
bump version

1 of 1 new or added line in 1 file covered. (100.0%)

1286 of 1707 relevant lines covered (75.34%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.44
/catcli/noder.py
1
"""
2
author: deadc0de6 (https://github.com/deadc0de6)
3
Copyright (c) 2017, deadc0de6
4

5
Class that process nodes in the catalog tree
6
"""
7

8
import os
1✔
9
import shutil
1✔
10
import time
1✔
11
from typing import List, Union, Tuple, Any, Optional, Dict, cast
1✔
12
import anytree  # type: ignore
1✔
13

14
# local imports
15
from catcli import nodes
1✔
16
from catcli.nodes import NodeAny, NodeStorage, \
1✔
17
    NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta
18
from catcli.utils import size_to_str, epoch_to_str, md5sum, fix_badchars
1✔
19
from catcli.logger import Logger
1✔
20
from catcli.nodeprinter import NodePrinter
1✔
21
from catcli.decomp import Decomp
1✔
22
from catcli.version import __version__ as VERSION
1✔
23
from catcli.exceptions import CatcliException
1✔
24

25

26
class Noder:
1✔
27
    """
28
    handles node in the catalog tree
29
    There are 4 types of node:
30
    * "top" node representing the top node (generic node)
31
    * "storage" node representing a storage
32
    * "dir" node representing a directory
33
    * "file" node representing a file
34
    """
35

36
    CSV_HEADER = ('name,type,path,size,indexed_at,'
1✔
37
                  'maccess,md5,nbfiles,free_space,'
38
                  'total_space,meta')
39

40
    def __init__(self, debug: bool = False,
1✔
41
                 sortsize: bool = False,
42
                 arc: bool = False) -> None:
43
        """
44
        @debug: debug mode
45
        @sortsize: sort nodes by size
46
        @arch: handle archive
47
        """
48
        self.hash = True
1✔
49
        self.debug = debug
1✔
50
        self.sortsize = sortsize
1✔
51
        self.arc = arc
1✔
52
        if self.arc:
1✔
53
            self.decomp = Decomp()
×
54

55
    @staticmethod
1✔
56
    def get_storage_names(top: NodeTop) -> List[str]:
1✔
57
        """return a list of all storage names"""
58
        return [x.name for x in list(top.children)]
1✔
59

60
    def get_storage_node(self, top: NodeTop,
1✔
61
                         name: str,
62
                         newpath: str = '') -> NodeStorage:
63
        """
64
        return the storage node if any
65
        if newpath is submitted, it will update the media info
66
        """
67
        found = None
1✔
68
        for node in top.children:
1✔
69
            if node.type != nodes.TYPE_STORAGE:
1✔
70
                continue
×
71
            if node.name == name:
1✔
72
                found = node
1✔
73
                break
1✔
74
        if found and newpath and os.path.exists(newpath):
1✔
75
            found.free = shutil.disk_usage(newpath).free
1✔
76
            found.total = shutil.disk_usage(newpath).total
1✔
77
            found.ts = int(time.time())
1✔
78
        return cast(NodeStorage, found)
1✔
79

80
    @staticmethod
1✔
81
    def get_node(top: NodeTop,
1✔
82
                 path: str,
83
                 quiet: bool = False) -> Optional[NodeAny]:
84
        """get the node by internal tree path"""
85
        resolv = anytree.resolver.Resolver('name')
1✔
86
        try:
1✔
87
            bpath = os.path.basename(path)
1✔
88
            the_node = resolv.get(top, bpath)
1✔
89
            return cast(NodeAny, the_node)
1✔
90
        except anytree.resolver.ChildResolverError:
1✔
91
            if not quiet:
1✔
92
                Logger.err(f'No node at path \"{bpath}\"')
×
93
            return None
1✔
94

95
    def get_node_if_changed(self,
1✔
96
                            top: NodeTop,
97
                            path: str,
98
                            treepath: str) -> Tuple[Optional[NodeAny], bool]:
99
        """
100
        return the node (if any) and if it has changed
101
        @top: top node (storage)
102
        @path: abs path to file
103
        @treepath: rel path from indexed directory
104
        """
105
        treepath = treepath.lstrip(os.sep)
1✔
106
        node = self.get_node(top, treepath, quiet=True)
1✔
107
        # node does not exist
108
        if not node:
1✔
109
            self._debug('\tchange: node does not exist')
1✔
110
            return None, True
1✔
111
        if os.path.isdir(path):
1✔
112
            return node, False
1✔
113
        # force re-indexing if no maccess
114
        maccess = os.path.getmtime(path)
1✔
115
        if not self._has_attr(node, 'maccess') or \
1✔
116
                not node.maccess:
117
            self._debug('\tchange: no maccess found')
×
118
            return node, True
×
119
        # maccess changed
120
        old_maccess = node.maccess
1✔
121
        if float(maccess) != float(old_maccess):
1✔
122
            self._debug(f'\tchange: maccess changed for \"{path}\"')
1✔
123
            return node, True
1✔
124
        # test hash
125
        if self.hash and node.md5:
1✔
126
            md5 = self._get_hash(path)
1✔
127
            if md5 and md5 != node.md5:
1✔
128
                msg = f'\tchange: checksum changed for \"{path}\"'
1✔
129
                self._debug(msg)
1✔
130
                return node, True
1✔
131
        self._debug(f'\tchange: no change for \"{path}\"')
1✔
132
        return node, False
1✔
133

134
    def rec_size(self, node: Union[NodeDir, NodeStorage],
1✔
135
                 store: bool = True) -> int:
136
        """
137
        recursively traverse tree and return size
138
        @store: store the size in the node
139
        """
140
        if node.type == nodes.TYPE_FILE:
1✔
141
            self._debug(f'size of {node.type} \"{node.name}\": {node.size}')
1✔
142
            return node.size
1✔
143
        msg = f'getting node size recursively for \"{node.name}\"'
1✔
144
        self._debug(msg)
1✔
145
        size: int = 0
1✔
146
        for i in node.children:
1✔
147
            if node.type == nodes.TYPE_DIR:
1✔
148
                sub_size = self.rec_size(i, store=store)
1✔
149
                if store:
1✔
150
                    i.size = sub_size
1✔
151
                size += sub_size
1✔
152
                continue
1✔
153
            if node.type == nodes.TYPE_STORAGE:
1✔
154
                sub_size = self.rec_size(i, store=store)
1✔
155
                if store:
1✔
156
                    i.size = sub_size
1✔
157
                size += sub_size
1✔
158
                continue
1✔
159
            self._debug(f'skipping {node.name}')
×
160
        if store:
1✔
161
            node.size = size
1✔
162
        self._debug(f'size of {node.type} \"{node.name}\": {size}')
1✔
163
        return size
1✔
164

165
    ###############################################################
166
    # public helpers
167
    ###############################################################
168
    @staticmethod
1✔
169
    def attrs_to_string(attr: Union[List[str], Dict[str, str], str]) -> str:
1✔
170
        """format the storage attr for saving"""
171
        if not attr:
1✔
172
            return ''
1✔
173
        if isinstance(attr, list):
1✔
174
            return ', '.join(attr)
1✔
175
        if isinstance(attr, dict):
×
176
            ret = []
×
177
            for key, val in attr.items():
×
178
                ret.append(f'{key}={val}')
×
179
            return ', '.join(ret)
×
180
        attr = attr.rstrip()
×
181
        return attr
×
182

183
    def do_hashing(self, val: bool) -> None:
1✔
184
        """hash files when indexing"""
185
        self.hash = val
1✔
186

187
    ###############################################################
188
    # node creation
189
    ###############################################################
190
    def new_top_node(self) -> NodeTop:
1✔
191
        """create a new top node"""
192
        top = NodeTop(nodes.NAME_TOP)
1✔
193
        self._debug(f'new top node: {top}')
1✔
194
        return top
1✔
195

196
    def new_file_node(self, name: str, path: str,
1✔
197
                      parent: NodeAny, storagepath: str) -> Optional[NodeFile]:
198
        """create a new node representing a file"""
199
        if not os.path.exists(path):
1✔
200
            Logger.err(f'File \"{path}\" does not exist')
×
201
            return None
×
202
        path = os.path.abspath(path)
1✔
203
        try:
1✔
204
            stat = os.lstat(path)
1✔
205
        except OSError as exc:
×
206
            Logger.err(f'OSError: {exc}')
×
207
            return None
×
208
        md5 = ''
1✔
209
        if self.hash:
1✔
210
            md5 = self._get_hash(path)
1✔
211
        relpath = os.sep.join([storagepath, name])
1✔
212

213
        maccess = os.path.getmtime(path)
1✔
214
        node = NodeFile(name,
1✔
215
                        relpath,
216
                        stat.st_size,
217
                        md5,
218
                        maccess,
219
                        parent=parent)
220
        if self.arc:
1✔
221
            ext = os.path.splitext(path)[1][1:]
×
222
            if ext.lower() in self.decomp.get_formats():
×
223
                self._debug(f'{path} is an archive')
×
224
                names = self.decomp.get_names(path)
×
225
                self.list_to_tree(node, names)
×
226
            else:
227
                self._debug(f'{path} is NOT an archive')
×
228
        return node
1✔
229

230
    def new_dir_node(self, name: str, path: str,
1✔
231
                     parent: NodeAny, storagepath: str) -> NodeDir:
232
        """create a new node representing a directory"""
233
        path = os.path.abspath(path)
1✔
234
        relpath = os.sep.join([storagepath, name])
1✔
235
        maccess = os.path.getmtime(path)
1✔
236
        return NodeDir(name,
1✔
237
                       relpath,
238
                       0,
239
                       maccess,
240
                       parent=parent)
241

242
    def new_storage_node(self, name: str,
1✔
243
                         path: str,
244
                         parent: str,
245
                         attrs: Dict[str, Any]) \
246
            -> NodeStorage:
247
        """create a new node representing a storage"""
248
        path = os.path.abspath(path)
1✔
249
        free = shutil.disk_usage(path).free
1✔
250
        total = shutil.disk_usage(path).total
1✔
251
        epoch = int(time.time())
1✔
252
        return NodeStorage(name,
1✔
253
                           free,
254
                           total,
255
                           0,
256
                           epoch,
257
                           self.attrs_to_string(attrs),
258
                           parent=parent)
259

260
    def new_archive_node(self, name: str, path: str,
1✔
261
                         parent: str, archive: str) -> NodeArchived:
262
        """create a new node for archive data"""
263
        return NodeArchived(name=name, relpath=path,
×
264
                            parent=parent, size=0, md5='',
265
                            archive=archive)
266

267
    ###############################################################
268
    # node management
269
    ###############################################################
270
    def update_metanode(self, top: NodeTop) -> NodeMeta:
1✔
271
        """create or update meta node information"""
272
        meta = self._get_meta_node(top)
1✔
273
        epoch = int(time.time())
1✔
274
        if not meta:
1✔
275
            attrs: Dict[str, Any] = {}
1✔
276
            attrs['created'] = epoch
1✔
277
            attrs['created_version'] = VERSION
1✔
278
            meta = NodeMeta(name=nodes.NAME_META,
1✔
279
                            attr=attrs)
280
        meta.attr['access'] = epoch
1✔
281
        meta.attr['access_version'] = VERSION
1✔
282
        return meta
1✔
283

284
    def _get_meta_node(self, top: NodeTop) -> Optional[NodeMeta]:
1✔
285
        """return the meta node if any"""
286
        try:
1✔
287
            found = next(filter(lambda x: x.type == nodes.TYPE_META,
1✔
288
                         top.children))
289
            return cast(NodeMeta, found)
1✔
290
        except StopIteration:
1✔
291
            return None
1✔
292

293
    def clean_not_flagged(self, top: NodeTop) -> int:
1✔
294
        """remove any node not flagged and clean flags"""
295
        cnt = 0
1✔
296
        for node in anytree.PreOrderIter(top):
1✔
297
            if node.type not in [nodes.TYPE_DIR, nodes.TYPE_FILE]:
1✔
298
                continue
1✔
299
            if self._clean(node):
1✔
300
                cnt += 1
1✔
301
        return cnt
1✔
302

303
    def _clean(self, node: NodeAny) -> bool:
1✔
304
        """remove node if not flagged"""
305
        if not node.flagged():
1✔
306
            node.parent = None
1✔
307
            return True
1✔
308
        node.unflag()
1✔
309
        return False
1✔
310

311
    ###############################################################
312
    # printing
313
    ###############################################################
314
    def _node_to_csv(self, node: NodeAny,
1✔
315
                     sep: str = ',',
316
                     raw: bool = False) -> None:
317
        """
318
        print a node to csv
319
        @node: the node to consider
320
        @sep: CSV separator character
321
        @raw: print raw size rather than human readable
322
        """
323
        if not node:
1✔
324
            return
×
325
        if node.type == nodes.TYPE_TOP:
1✔
326
            return
1✔
327

328
        out = []
1✔
329
        if node.type == nodes.TYPE_STORAGE:
1✔
330
            # handle storage
331
            out.append(node.name)   # name
1✔
332
            out.append(node.type)   # type
1✔
333
            out.append('')          # fake full path
1✔
334
            size = self.rec_size(node, store=False)
1✔
335
            out.append(size_to_str(size, raw=raw))  # size
1✔
336
            out.append(epoch_to_str(node.ts))  # indexed_at
1✔
337
            out.append('')  # fake maccess
1✔
338
            out.append('')  # fake md5
1✔
339
            out.append(str(len(node.children)))  # nbfiles
1✔
340
            # fake free_space
341
            out.append(size_to_str(node.free, raw=raw))
1✔
342
            # fake total_space
343
            out.append(size_to_str(node.total, raw=raw))
1✔
344
            out.append(node.attr)  # meta
1✔
345
        else:
346
            # handle other nodes
347
            out.append(node.name.replace('"', '""'))  # name
1✔
348
            out.append(node.type)  # type
1✔
349
            parents = self._get_parents(node)
1✔
350
            storage = self._get_storage(node)
1✔
351
            fullpath = os.path.join(storage.name, parents)
1✔
352
            out.append(fullpath.replace('"', '""'))  # full path
1✔
353

354
            out.append(size_to_str(node.size, raw=raw))  # size
1✔
355
            out.append(epoch_to_str(storage.ts))  # indexed_at
1✔
356
            if self._has_attr(node, 'maccess'):
1✔
357
                out.append(epoch_to_str(node.maccess))  # maccess
1✔
358
            else:
359
                out.append('')  # fake maccess
×
360
            if self._has_attr(node, 'md5'):
1✔
361
                out.append(node.md5)  # md5
1✔
362
            else:
363
                out.append('')  # fake md5
1✔
364
            if node.type == nodes.TYPE_DIR:
1✔
365
                out.append(str(len(node.children)))  # nbfiles
1✔
366
            else:
367
                out.append('')  # fake nbfiles
1✔
368
            out.append('')  # fake free_space
1✔
369
            out.append('')  # fake total_space
1✔
370
            out.append('')  # fake meta
1✔
371

372
        line = sep.join(['"' + o + '"' for o in out])
1✔
373
        if len(line) > 0:
1✔
374
            Logger.stdout_nocolor(line)
1✔
375

376
    def _print_node_native(self, node: NodeAny,
1✔
377
                           pre: str = '',
378
                           withpath: bool = False,
379
                           withdepth: bool = False,
380
                           withstorage: bool = False,
381
                           recalcparent: bool = False,
382
                           raw: bool = False) -> None:
383
        """
384
        print a node
385
        @node: the node to print
386
        @pre: string to print before node
387
        @withpath: print the node path
388
        @withdepth: print the node depth info
389
        @withstorage: print the node storage it belongs to
390
        @recalcparent: get relpath from tree instead of relpath field
391
        @raw: print raw size rather than human readable
392
        """
393
        if node.type == nodes.TYPE_TOP:
1✔
394
            # top node
395
            Logger.stdout_nocolor(f'{pre}{node.name}')
1✔
396
        elif node.type == nodes.TYPE_FILE:
1✔
397
            # node of type file
398
            name = node.name
1✔
399
            if withpath:
1✔
400
                if recalcparent:
1✔
401
                    name = os.sep.join([self._get_parents(node.parent), name])
×
402
                else:
403
                    name = node.relpath
1✔
404
            name = name.lstrip(os.sep)
1✔
405
            if withstorage:
1✔
406
                storage = self._get_storage(node)
1✔
407
            attr_str = ''
1✔
408
            if node.md5:
1✔
409
                attr_str = f', md5:{node.md5}'
1✔
410
            size = size_to_str(node.size, raw=raw)
1✔
411
            compl = f'size:{size}{attr_str}'
1✔
412
            if withstorage:
1✔
413
                content = Logger.get_bold_text(storage.name)
1✔
414
                compl += f', storage:{content}'
1✔
415
            NodePrinter.print_file_native(pre, name, compl)
1✔
416
        elif node.type == nodes.TYPE_DIR:
1✔
417
            # node of type directory
418
            name = node.name
1✔
419
            if withpath:
1✔
420
                if recalcparent:
×
421
                    name = os.sep.join([self._get_parents(node.parent), name])
×
422
                else:
423
                    name = node.relpath
×
424
            name = name.lstrip(os.sep)
1✔
425
            depth = 0
1✔
426
            if withdepth:
1✔
427
                depth = len(node.children)
1✔
428
            if withstorage:
1✔
429
                storage = self._get_storage(node)
×
430
            attr: List[Tuple[str, str]] = []
1✔
431
            if node.size:
1✔
432
                attr.append(('totsize', size_to_str(node.size, raw=raw)))
1✔
433
            if withstorage:
1✔
434
                attr.append(('storage', Logger.get_bold_text(storage.name)))
×
435
            NodePrinter.print_dir_native(pre, name, depth=depth, attr=attr)
1✔
436
        elif node.type == nodes.TYPE_STORAGE:
1✔
437
            # node of type storage
438
            sztotal = size_to_str(node.total, raw=raw)
1✔
439
            szused = size_to_str(node.total - node.free, raw=raw)
1✔
440
            nbchildren = len(node.children)
1✔
441
            pcent = 0
1✔
442
            if node.total > 0:
1✔
443
                pcent = node.free * 100 / node.total
1✔
444
            freepercent = f'{pcent:.1f}%'
1✔
445
            # get the date
446
            timestamp = ''
1✔
447
            if self._has_attr(node, 'ts'):
1✔
448
                timestamp = 'date:'
1✔
449
                timestamp += epoch_to_str(node.ts)
1✔
450
            disksize = ''
1✔
451
            # the children size
452
            recsize = self.rec_size(node, store=False)
1✔
453
            sizestr = size_to_str(recsize, raw=raw)
1✔
454
            disksize = 'totsize:' + f'{sizestr}'
1✔
455
            # format the output
456
            name = node.name
1✔
457
            args = [
1✔
458
                'nbfiles:' + f'{nbchildren}',
459
                disksize,
460
                f'free:{freepercent}',
461
                'du:' + f'{szused}/{sztotal}',
462
                timestamp]
463
            argsstring = ' | '.join(args)
1✔
464
            NodePrinter.print_storage_native(pre,
1✔
465
                                             name,
466
                                             argsstring,
467
                                             node.attr)
468
        elif node.type == nodes.TYPE_ARCHIVED:
×
469
            # archive node
470
            if self.arc:
×
471
                NodePrinter.print_archive_native(pre, node.name, node.archive)
×
472
        else:
473
            Logger.err(f'bad node encountered: {node}')
×
474

475
    def print_tree(self, node: NodeAny,
1✔
476
                   fmt: str = 'native',
477
                   raw: bool = False) -> None:
478
        """
479
        print the tree in different format
480
        @node: start node
481
        @style: when fmt=native, defines the tree style
482
        @fmt: output format
483
        @raw: print the raw size rather than human readable
484
        """
485
        if fmt == 'native':
1✔
486
            # "tree" style
487
            rend = anytree.RenderTree(node, childiter=self._sort_tree)
1✔
488
            for pre, _, thenode in rend:
1✔
489
                self._print_node_native(thenode, pre=pre,
1✔
490
                                        withdepth=True, raw=raw)
491
        elif fmt == 'csv':
1✔
492
            # csv output
493
            self._to_csv(node, raw=raw)
1✔
494
        elif fmt == 'csv-with-header':
×
495
            # csv output
496
            Logger.stdout_nocolor(self.CSV_HEADER)
×
497
            self._to_csv(node, raw=raw)
×
498

499
    def _to_csv(self, node: NodeAny,
1✔
500
                raw: bool = False) -> None:
501
        """print the tree to csv"""
502
        rend = anytree.RenderTree(node, childiter=self._sort_tree)
1✔
503
        for _, _, item in rend:
1✔
504
            self._node_to_csv(item, raw=raw)
1✔
505

506
    @staticmethod
1✔
507
    def _fzf_prompt(strings: Any) -> Any:
1✔
508
        """prompt with fzf"""
509
        try:
×
510
            from pyfzf.pyfzf import FzfPrompt  # type: ignore # pylint: disable=C0415 # noqa
×
511
            fzf = FzfPrompt()
×
512
            selected = fzf.prompt(strings)
×
513
            return selected
×
514
        except ModuleNotFoundError:
×
515
            Logger.err('install pyfzf to use fzf')
×
516
            return None
×
517

518
    def _to_fzf(self, node: NodeAny, fmt: str) -> None:
1✔
519
        """
520
        fzf prompt with list and print selected node(s)
521
        @node: node to start with
522
        @fmt: output format for selected nodes
523
        """
524
        rendered = anytree.RenderTree(node, childiter=self._sort_tree)
×
525
        the_nodes = {}
×
526
        # construct node names list
527
        for _, _, rend in rendered:
×
528
            if not rend:
×
529
                continue
×
530
            parents = self._get_parents(rend)
×
531
            storage = self._get_storage(rend)
×
532
            fullpath = os.path.join(storage.name, parents)
×
533
            the_nodes[fullpath] = rend
×
534
        # prompt with fzf
535
        paths = self._fzf_prompt(the_nodes.keys())
×
536
        # print the resulting tree
537
        subfmt = fmt.replace('fzf-', '')
×
538
        for path in paths:
×
539
            if not path:
×
540
                continue
×
541
            if path not in the_nodes:
×
542
                continue
×
543
            rend = the_nodes[path]
×
544
            self.print_tree(rend, fmt=subfmt)
×
545

546
    @staticmethod
1✔
547
    def to_dot(top: NodeTop,
1✔
548
               path: str = 'tree.dot') -> str:
549
        """export to dot for graphing"""
550
        anytree.exporter.DotExporter(top).to_dotfile(path)
1✔
551
        Logger.info(f'dot file created under \"{path}\"')
1✔
552
        return f'dot {path} -T png -o /tmp/tree.png'
1✔
553

554
    ###############################################################
555
    # searching
556
    ###############################################################
557
    def find_name(self, top: NodeTop,
1✔
558
                  key: str,
559
                  script: bool = False,
560
                  only_dir: bool = False,
561
                  startnode: Optional[NodeAny] = None,
562
                  parentfromtree: bool = False,
563
                  fmt: str = 'native',
564
                  raw: bool = False) -> List[NodeAny]:
565
        """
566
        find files based on their names
567
        @top: top node
568
        @key: term to search for
569
        @script: output script
570
        @directory: only search for directories
571
        @startpath: node to start with
572
        @parentfromtree: get path from parent instead of stored relpath
573
        @fmt: output format
574
        @raw: raw size output
575
        returns the found nodes
576
        """
577
        self._debug(f'searching for \"{key}\"')
1✔
578

579
        # search for nodes based on path
580
        start: Optional[NodeAny] = top
1✔
581
        if startnode:
1✔
582
            start = self.get_node(top, startnode)
×
583
        filterfunc = self._callback_find_name(key, only_dir)
1✔
584
        found = anytree.findall(start, filter_=filterfunc)
1✔
585
        nbfound = len(found)
1✔
586
        self._debug(f'found {nbfound} node(s)')
1✔
587

588
        # compile found nodes
589
        paths = {}
1✔
590
        for item in found:
1✔
591
            item.name = fix_badchars(item.name)
1✔
592
            if hasattr(item, 'relpath'):
1✔
593
                item.relpath = fix_badchars(item.relpath)
1✔
594
            if parentfromtree:
1✔
595
                paths[self._get_parents(item)] = item
×
596
            else:
597
                paths[item.relpath] = item
1✔
598

599
        # handle fzf mode
600
        if fmt.startswith('fzf'):
1✔
601
            selected = self._fzf_prompt(paths.keys())
×
602
            newpaths = {}
×
603
            subfmt = fmt.replace('fzf-', '')
×
604
            for item in selected:
×
605
                if item not in paths:
×
606
                    continue
×
607
                newpaths[item] = paths[item]
×
608
                self.print_tree(newpaths[item], fmt=subfmt)
×
609
            paths = newpaths
×
610
        else:
611
            if fmt == 'native':
1✔
612
                for _, item in paths.items():
1✔
613
                    self._print_node_native(item, withpath=True,
1✔
614
                                            withdepth=True,
615
                                            withstorage=True,
616
                                            recalcparent=parentfromtree,
617
                                            raw=raw)
618
            elif fmt.startswith('csv'):
×
619
                if fmt == 'csv-with-header':
×
620
                    Logger.stdout_nocolor(self.CSV_HEADER)
×
621
                for _, item in paths.items():
×
622
                    self._node_to_csv(item, raw=raw)
×
623

624
        # execute script if any
625
        if script:
1✔
626
            tmp = ['${source}/' + x for x in paths]
1✔
627
            tmpstr = ' '.join(tmp)
1✔
628
            cmd = f'op=file; source=/media/mnt; $op {tmpstr}'
1✔
629
            Logger.info(cmd)
1✔
630

631
        return list(paths.values())
1✔
632

633
    def _callback_find_name(self, term: str, only_dir: bool) -> Any:
1✔
634
        """callback for finding files"""
635
        def find_name(node: NodeAny) -> bool:
1✔
636
            if node.type == nodes.TYPE_STORAGE:
1✔
637
                # ignore storage nodes
638
                return False
1✔
639
            if node.type == nodes.TYPE_TOP:
1✔
640
                # ignore top nodes
641
                return False
1✔
642
            if node.type == nodes.TYPE_META:
1✔
643
                # ignore meta nodes
644
                return False
×
645
            if only_dir and node.type == nodes.TYPE_DIR:
1✔
646
                # ignore non directory
647
                return False
×
648

649
            # filter
650
            if not term:
1✔
651
                return True
×
652
            if term.lower() in node.name.lower():
1✔
653
                return True
1✔
654

655
            # ignore
656
            return False
1✔
657
        return find_name
1✔
658

659
    ###############################################################
660
    # ls
661
    ###############################################################
662
    def list(self, top: NodeTop,
1✔
663
             path: str,
664
             rec: bool = False,
665
             fmt: str = 'native',
666
             raw: bool = False) -> List[NodeAny]:
667
        """
668
        list nodes for "ls"
669
        @top: top node
670
        @path: path to search for
671
        @rec: recursive walk
672
        @fmt: output format
673
        @raw: print raw size
674
        """
675
        self._debug(f'walking path: \"{path}\" from {top}')
1✔
676

677
        resolv = anytree.resolver.Resolver('name')
1✔
678
        found = []
1✔
679
        try:
1✔
680
            # resolve the path in the tree
681
            found = resolv.glob(top, path)
1✔
682
            if len(found) < 1:
1✔
683
                # nothing found
684
                self._debug('nothing found')
×
685
                return []
×
686

687
            if rec:
1✔
688
                # print the entire tree
689
                self.print_tree(found[0].parent, fmt=fmt, raw=raw)
1✔
690
                return found
1✔
691

692
            # sort found nodes
693
            found = sorted(found, key=self._sort, reverse=self.sortsize)
1✔
694

695
            # print the parent
696
            if fmt == 'native':
1✔
697
                self._print_node_native(found[0].parent,
1✔
698
                                        withpath=False,
699
                                        withdepth=True,
700
                                        raw=raw)
701
            elif fmt.startswith('csv'):
×
702
                self._node_to_csv(found[0].parent, raw=raw)
×
703
            elif fmt.startswith('fzf'):
×
704
                pass
×
705

706
            # print all found nodes
707
            if fmt == 'csv-with-header':
1✔
708
                Logger.stdout_nocolor(self.CSV_HEADER)
×
709
            for item in found:
1✔
710
                if fmt == 'native':
1✔
711
                    self._print_node_native(item, withpath=False,
1✔
712
                                            pre='- ',
713
                                            withdepth=True,
714
                                            raw=raw)
715
                elif fmt.startswith('csv'):
×
716
                    self._node_to_csv(item, raw=raw)
×
717
                elif fmt.startswith('fzf'):
×
718
                    self._to_fzf(item, fmt)
×
719

720
        except anytree.resolver.ChildResolverError:
1✔
721
            pass
1✔
722
        return found
1✔
723

724
    ###############################################################
725
    # tree creation
726
    ###############################################################
727
    def _add_entry(self, name: str,
1✔
728
                   top: NodeTop,
729
                   resolv: Any) -> None:
730
        """add an entry to the tree"""
731
        entries = name.rstrip(os.sep).split(os.sep)
×
732
        if len(entries) == 1:
×
733
            self.new_archive_node(name, name, top, top.name)
×
734
            return
×
735
        sub = os.sep.join(entries[:-1])
×
736
        nodename = entries[-1]
×
737
        try:
×
738
            parent = resolv.get(top, sub)
×
739
            parent = self.new_archive_node(nodename, name, parent, top.name)
×
740
        except anytree.resolver.ChildResolverError:
×
741
            self.new_archive_node(nodename, name, top, top.name)
×
742

743
    def list_to_tree(self, parent: NodeAny, names: List[str]) -> None:
1✔
744
        """convert list of files to a tree"""
745
        if not names:
×
746
            return
×
747
        resolv = anytree.resolver.Resolver('name')
×
748
        for name in names:
×
749
            name = name.rstrip(os.sep)
×
750
            self._add_entry(name, parent, resolv)
×
751

752
    ###############################################################
753
    # diverse
754
    ###############################################################
755
    def _sort_tree(self,
1✔
756
                   items: List[NodeAny]) -> List[NodeAny]:
757
        """sorting a list of items"""
758
        return sorted(items, key=self._sort, reverse=self.sortsize)
1✔
759

760
    def _sort(self, lst: NodeAny) -> Any:
1✔
761
        """sort a list"""
762
        if self.sortsize:
1✔
763
            return self._sort_size(lst)
×
764
        return self._sort_fs(lst)
1✔
765

766
    @staticmethod
1✔
767
    def _sort_fs(node: NodeAny) -> Tuple[str, str]:
1✔
768
        """sorting nodes dir first and alpha"""
769
        return (node.type, node.name.lstrip('.').lower())
1✔
770

771
    @staticmethod
1✔
772
    def _sort_size(node: NodeAny) -> float:
1✔
773
        """sorting nodes by size"""
774
        try:
×
775
            if not node.size:
×
776
                return 0
×
777
            return float(node.size)
×
778
        except AttributeError:
×
779
            return 0
×
780

781
    def _get_storage(self, node: NodeAny) -> NodeStorage:
1✔
782
        """recursively traverse up to find storage"""
783
        if node.type == nodes.TYPE_STORAGE:
1✔
784
            return node
×
785
        return cast(NodeStorage, node.ancestors[1])
1✔
786

787
    @staticmethod
1✔
788
    def _has_attr(node: NodeAny, attr: str) -> bool:
1✔
789
        """return True if node has attr as attribute"""
790
        return attr in node.__dict__.keys()
1✔
791

792
    def _get_parents(self, node: NodeAny) -> str:
1✔
793
        """get all parents recursively"""
794
        if node.type == nodes.TYPE_STORAGE:
1✔
795
            return ''
1✔
796
        if node.type == nodes.TYPE_TOP:
1✔
797
            return ''
×
798
        parent = self._get_parents(node.parent)
1✔
799
        if parent:
1✔
800
            return os.sep.join([parent, node.name])
1✔
801
        return str(node.name)
1✔
802

803
    @staticmethod
1✔
804
    def _get_hash(path: str) -> str:
1✔
805
        """return md5 hash of node"""
806
        try:
1✔
807
            return md5sum(path)
1✔
808
        except CatcliException as exc:
×
809
            Logger.err(str(exc))
×
810
            return ''
×
811

812
    def _debug(self, string: str) -> None:
1✔
813
        """print debug"""
814
        if not self.debug:
1✔
815
            return
1✔
816
        Logger.debug(string)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc