• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

deadc0de6 / catcli / 5039228937

pending completion
5039228937

push

github

GitHub
Merge pull request #35 from deadc0de6/fix-34

11 of 11 new or added lines in 4 files covered. (100.0%)

1289 of 1709 relevant lines covered (75.42%)

3.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.34
/catcli/noder.py
1
"""
2
author: deadc0de6 (https://github.com/deadc0de6)
3
Copyright (c) 2017, deadc0de6
4

5
Class that process nodes in the catalog tree
6
"""
7

8
import os
5✔
9
import shutil
5✔
10
import time
5✔
11
from typing import List, Union, Tuple, Any, Optional, Dict, cast
5✔
12
import anytree  # type: ignore
5✔
13

14
# local imports
15
from catcli import nodes
5✔
16
from catcli.nodes import NodeAny, NodeStorage, \
5✔
17
    NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta
18
from catcli.utils import size_to_str, epoch_to_str, md5sum, fix_badchars
5✔
19
from catcli.logger import Logger
5✔
20
from catcli.nodeprinter import NodePrinter
5✔
21
from catcli.decomp import Decomp
5✔
22
from catcli.version import __version__ as VERSION
5✔
23
from catcli.exceptions import CatcliException
5✔
24

25

26
class Noder:
5✔
27
    """
28
    handles node in the catalog tree
29
    There are 4 types of node:
30
    * "top" node representing the top node (generic node)
31
    * "storage" node representing a storage
32
    * "dir" node representing a directory
33
    * "file" node representing a file
34
    """
35

36
    CSV_HEADER = ('name,type,path,size,indexed_at,'
5✔
37
                  'maccess,md5,nbfiles,free_space,'
38
                  'total_space,meta')
39

40
    def __init__(self, debug: bool = False,
5✔
41
                 sortsize: bool = False,
42
                 arc: bool = False) -> None:
43
        """
44
        @debug: debug mode
45
        @sortsize: sort nodes by size
46
        @arch: handle archive
47
        """
48
        self.hash = True
5✔
49
        self.debug = debug
5✔
50
        self.sortsize = sortsize
5✔
51
        self.arc = arc
5✔
52
        if self.arc:
5✔
53
            self.decomp = Decomp()
×
54

55
    @staticmethod
5✔
56
    def get_storage_names(top: NodeTop) -> List[str]:
5✔
57
        """return a list of all storage names"""
58
        return [x.name for x in list(top.children)]
5✔
59

60
    def get_storage_node(self, top: NodeTop,
5✔
61
                         name: str,
62
                         newpath: str = '') -> NodeStorage:
63
        """
64
        return the storage node if any
65
        if newpath is submitted, it will update the media info
66
        """
67
        found = None
5✔
68
        for node in top.children:
5✔
69
            if node.type != nodes.TYPE_STORAGE:
5✔
70
                continue
×
71
            if node.name == name:
5✔
72
                found = node
5✔
73
                break
5✔
74
        if found and newpath and os.path.exists(newpath):
5✔
75
            found.free = shutil.disk_usage(newpath).free
5✔
76
            found.total = shutil.disk_usage(newpath).total
5✔
77
            found.ts = int(time.time())
5✔
78
        return cast(NodeStorage, found)
5✔
79

80
    @staticmethod
5✔
81
    def get_node(top: NodeTop,
5✔
82
                 path: str,
83
                 quiet: bool = False) -> Optional[NodeAny]:
84
        """get the node by internal tree path"""
85
        resolv = anytree.resolver.Resolver('name')
5✔
86
        try:
5✔
87
            bpath = os.path.basename(path)
5✔
88
            the_node = resolv.get(top, bpath)
5✔
89
            return cast(NodeAny, the_node)
5✔
90
        except anytree.resolver.ChildResolverError:
5✔
91
            if not quiet:
5✔
92
                Logger.err(f'No node at path \"{bpath}\"')
×
93
            return None
5✔
94

95
    def get_node_if_changed(self,
5✔
96
                            top: NodeTop,
97
                            path: str,
98
                            treepath: str) -> Tuple[Optional[NodeAny], bool]:
99
        """
100
        return the node (if any) and if it has changed
101
        @top: top node (storage)
102
        @path: abs path to file
103
        @treepath: rel path from indexed directory
104
        """
105
        treepath = treepath.lstrip(os.sep)
5✔
106
        node = self.get_node(top, treepath, quiet=True)
5✔
107
        # node does not exist
108
        if not node:
5✔
109
            self._debug('\tchange: node does not exist')
5✔
110
            return None, True
5✔
111
        if os.path.isdir(path):
5✔
112
            return node, False
5✔
113
        # force re-indexing if no maccess
114
        maccess = os.path.getmtime(path)
5✔
115
        if not self._has_attr(node, 'maccess') or \
5✔
116
                not node.maccess:
117
            self._debug('\tchange: no maccess found')
×
118
            return node, True
×
119
        # maccess changed
120
        old_maccess = node.maccess
5✔
121
        if float(maccess) != float(old_maccess):
5✔
122
            self._debug(f'\tchange: maccess changed for \"{path}\"')
5✔
123
            return node, True
5✔
124
        # test hash
125
        if self.hash and node.md5:
5✔
126
            md5 = self._get_hash(path)
5✔
127
            if md5 and md5 != node.md5:
5✔
128
                msg = f'\tchange: checksum changed for \"{path}\"'
5✔
129
                self._debug(msg)
5✔
130
                return node, True
5✔
131
        self._debug(f'\tchange: no change for \"{path}\"')
5✔
132
        return node, False
5✔
133

134
    def rec_size(self, node: Union[NodeDir, NodeStorage],
5✔
135
                 store: bool = True) -> int:
136
        """
137
        recursively traverse tree and return size
138
        @store: store the size in the node
139
        """
140
        if node.type == nodes.TYPE_FILE:
5✔
141
            self._debug(f'size of {node.type} \"{node.name}\": {node.size}')
5✔
142
            return node.size
5✔
143
        msg = f'getting node size recursively for \"{node.name}\"'
5✔
144
        self._debug(msg)
5✔
145
        size: int = 0
5✔
146
        for i in node.children:
5✔
147
            if node.type == nodes.TYPE_DIR:
5✔
148
                sub_size = self.rec_size(i, store=store)
5✔
149
                if store:
5✔
150
                    i.size = sub_size
5✔
151
                size += sub_size
5✔
152
                continue
5✔
153
            if node.type == nodes.TYPE_STORAGE:
5✔
154
                sub_size = self.rec_size(i, store=store)
5✔
155
                if store:
5✔
156
                    i.size = sub_size
5✔
157
                size += sub_size
5✔
158
                continue
5✔
159
            self._debug(f'skipping {node.name}')
×
160
        if store:
5✔
161
            node.size = size
5✔
162
        self._debug(f'size of {node.type} \"{node.name}\": {size}')
5✔
163
        return size
5✔
164

165
    ###############################################################
166
    # public helpers
167
    ###############################################################
168
    @staticmethod
5✔
169
    def attrs_to_string(attr: Union[List[str], Dict[str, str], str]) -> str:
5✔
170
        """format the storage attr for saving"""
171
        if not attr:
5✔
172
            return ''
5✔
173
        if isinstance(attr, list):
5✔
174
            return ', '.join(attr)
5✔
175
        if isinstance(attr, dict):
×
176
            ret = []
×
177
            for key, val in attr.items():
×
178
                ret.append(f'{key}={val}')
×
179
            return ', '.join(ret)
×
180
        attr = attr.rstrip()
×
181
        return attr
×
182

183
    def do_hashing(self, val: bool) -> None:
5✔
184
        """hash files when indexing"""
185
        self.hash = val
5✔
186

187
    ###############################################################
188
    # node creation
189
    ###############################################################
190
    def new_top_node(self) -> NodeTop:
5✔
191
        """create a new top node"""
192
        top = NodeTop(nodes.NAME_TOP)
5✔
193
        self._debug(f'new top node: {top}')
5✔
194
        return top
5✔
195

196
    def new_file_node(self, name: str, path: str,
5✔
197
                      parent: NodeAny, storagepath: str) -> Optional[NodeFile]:
198
        """create a new node representing a file"""
199
        if not os.path.exists(path):
5✔
200
            Logger.err(f'File \"{path}\" does not exist')
×
201
            return None
×
202
        path = os.path.abspath(path)
5✔
203
        try:
5✔
204
            stat = os.lstat(path)
5✔
205
        except OSError as exc:
×
206
            Logger.err(f'OSError: {exc}')
×
207
            return None
×
208
        md5 = ''
5✔
209
        if self.hash:
5✔
210
            md5 = self._get_hash(path)
5✔
211
        relpath = os.sep.join([storagepath, name])
5✔
212

213
        maccess = os.path.getmtime(path)
5✔
214
        node = NodeFile(name,
5✔
215
                        relpath,
216
                        stat.st_size,
217
                        md5,
218
                        maccess,
219
                        parent=parent)
220
        if self.arc:
5✔
221
            ext = os.path.splitext(path)[1][1:]
×
222
            if ext.lower() in self.decomp.get_formats():
×
223
                self._debug(f'{path} is an archive')
×
224
                names = self.decomp.get_names(path)
×
225
                self.list_to_tree(node, names)
×
226
            else:
227
                self._debug(f'{path} is NOT an archive')
×
228
        return node
5✔
229

230
    def new_dir_node(self, name: str, path: str,
5✔
231
                     parent: NodeAny, storagepath: str) -> NodeDir:
232
        """create a new node representing a directory"""
233
        path = os.path.abspath(path)
5✔
234
        relpath = os.sep.join([storagepath, name])
5✔
235
        maccess = os.path.getmtime(path)
5✔
236
        return NodeDir(name,
5✔
237
                       relpath,
238
                       0,
239
                       maccess,
240
                       parent=parent)
241

242
    def new_storage_node(self, name: str,
5✔
243
                         path: str,
244
                         parent: str,
245
                         attrs: Dict[str, Any]) \
246
            -> NodeStorage:
247
        """create a new node representing a storage"""
248
        path = os.path.abspath(path)
5✔
249
        free = shutil.disk_usage(path).free
5✔
250
        total = shutil.disk_usage(path).total
5✔
251
        epoch = int(time.time())
5✔
252
        return NodeStorage(name,
5✔
253
                           free,
254
                           total,
255
                           0,
256
                           epoch,
257
                           self.attrs_to_string(attrs),
258
                           parent=parent)
259

260
    def new_archive_node(self, name: str, path: str,
5✔
261
                         parent: str, archive: str) -> NodeArchived:
262
        """create a new node for archive data"""
263
        return NodeArchived(name=name, relpath=path,
×
264
                            parent=parent, size=0, md5='',
265
                            archive=archive)
266

267
    ###############################################################
268
    # node management
269
    ###############################################################
270
    def update_metanode(self, top: NodeTop) -> NodeMeta:
5✔
271
        """create or update meta node information"""
272
        meta = self._get_meta_node(top)
5✔
273
        epoch = int(time.time())
5✔
274
        if not meta:
5✔
275
            attrs: Dict[str, Any] = {}
5✔
276
            attrs['created'] = epoch
5✔
277
            attrs['created_version'] = VERSION
5✔
278
            meta = NodeMeta(name=nodes.NAME_META,
5✔
279
                            attr=attrs)
280
        meta.attr['access'] = epoch
5✔
281
        meta.attr['access_version'] = VERSION
5✔
282
        return meta
5✔
283

284
    def _get_meta_node(self, top: NodeTop) -> Optional[NodeMeta]:
5✔
285
        """return the meta node if any"""
286
        try:
5✔
287
            found = next(filter(lambda x: x.type == nodes.TYPE_META,
5✔
288
                         top.children))
289
            return cast(NodeMeta, found)
5✔
290
        except StopIteration:
5✔
291
            return None
5✔
292

293
    def clean_not_flagged(self, top: NodeTop) -> int:
5✔
294
        """remove any node not flagged and clean flags"""
295
        cnt = 0
5✔
296
        for node in anytree.PreOrderIter(top):
5✔
297
            if node.type not in [nodes.TYPE_DIR, nodes.TYPE_FILE]:
5✔
298
                continue
5✔
299
            if self._clean(node):
5✔
300
                cnt += 1
5✔
301
        return cnt
5✔
302

303
    def _clean(self, node: NodeAny) -> bool:
5✔
304
        """remove node if not flagged"""
305
        if not node.flagged():
5✔
306
            node.parent = None
5✔
307
            return True
5✔
308
        node.unflag()
5✔
309
        return False
5✔
310

311
    ###############################################################
312
    # printing
313
    ###############################################################
314
    def _node_to_csv(self, node: NodeAny,
5✔
315
                     sep: str = ',',
316
                     raw: bool = False) -> None:
317
        """
318
        print a node to csv
319
        @node: the node to consider
320
        @sep: CSV separator character
321
        @raw: print raw size rather than human readable
322
        """
323
        if not node:
5✔
324
            return
×
325
        if node.type == nodes.TYPE_TOP:
5✔
326
            return
5✔
327

328
        out = []
5✔
329
        if node.type == nodes.TYPE_STORAGE:
5✔
330
            # handle storage
331
            out.append(node.name)   # name
5✔
332
            out.append(node.type)   # type
5✔
333
            out.append('')          # fake full path
5✔
334
            size = self.rec_size(node, store=False)
5✔
335
            out.append(size_to_str(size, raw=raw))  # size
5✔
336
            out.append(epoch_to_str(node.ts))  # indexed_at
5✔
337
            out.append('')  # fake maccess
5✔
338
            out.append('')  # fake md5
5✔
339
            out.append(str(len(node.children)))  # nbfiles
5✔
340
            # fake free_space
341
            out.append(size_to_str(node.free, raw=raw))
5✔
342
            # fake total_space
343
            out.append(size_to_str(node.total, raw=raw))
5✔
344
            out.append(node.attr)  # meta
5✔
345
        else:
346
            # handle other nodes
347
            out.append(node.name.replace('"', '""'))  # name
5✔
348
            out.append(node.type)  # type
5✔
349
            parents = self._get_parents(node)
5✔
350
            storage = self._get_storage(node)
5✔
351
            fullpath = os.path.join(storage.name, parents)
5✔
352
            out.append(fullpath.replace('"', '""'))  # full path
5✔
353

354
            out.append(size_to_str(node.size, raw=raw))  # size
5✔
355
            out.append(epoch_to_str(storage.ts))  # indexed_at
5✔
356
            if self._has_attr(node, 'maccess'):
5✔
357
                out.append(epoch_to_str(node.maccess))  # maccess
5✔
358
            else:
359
                out.append('')  # fake maccess
×
360
            if self._has_attr(node, 'md5'):
5✔
361
                out.append(node.md5)  # md5
5✔
362
            else:
363
                out.append('')  # fake md5
5✔
364
            if node.type == nodes.TYPE_DIR:
5✔
365
                out.append(str(len(node.children)))  # nbfiles
5✔
366
            else:
367
                out.append('')  # fake nbfiles
5✔
368
            out.append('')  # fake free_space
5✔
369
            out.append('')  # fake total_space
5✔
370
            out.append('')  # fake meta
5✔
371

372
        line = sep.join(['"' + o + '"' for o in out])
5✔
373
        if len(line) > 0:
5✔
374
            Logger.stdout_nocolor(line)
5✔
375

376
    def _print_node_native(self, node: NodeAny,
5✔
377
                           pre: str = '',
378
                           withpath: bool = False,
379
                           withdepth: bool = False,
380
                           withstorage: bool = False,
381
                           recalcparent: bool = False,
382
                           raw: bool = False) -> None:
383
        """
384
        print a node
385
        @node: the node to print
386
        @pre: string to print before node
387
        @withpath: print the node path
388
        @withdepth: print the node depth info
389
        @withstorage: print the node storage it belongs to
390
        @recalcparent: get relpath from tree instead of relpath field
391
        @raw: print raw size rather than human readable
392
        """
393
        if node.type == nodes.TYPE_TOP:
5✔
394
            # top node
395
            Logger.stdout_nocolor(f'{pre}{node.name}')
5✔
396
        elif node.type == nodes.TYPE_FILE:
5✔
397
            # node of type file
398
            name = node.name
5✔
399
            if withpath:
5✔
400
                if recalcparent:
5✔
401
                    name = os.sep.join([self._get_parents(node.parent), name])
×
402
                else:
403
                    name = node.relpath
5✔
404
            name = name.lstrip(os.sep)
5✔
405
            if withstorage:
5✔
406
                storage = self._get_storage(node)
5✔
407
            attr_str = ''
5✔
408
            if node.md5:
5✔
409
                attr_str = f', md5:{node.md5}'
5✔
410
            size = size_to_str(node.size, raw=raw)
5✔
411
            compl = f'size:{size}{attr_str}'
5✔
412
            if withstorage:
5✔
413
                content = Logger.get_bold_text(storage.name)
5✔
414
                compl += f', storage:{content}'
5✔
415
            NodePrinter.print_file_native(pre, name, compl)
5✔
416
        elif node.type == nodes.TYPE_DIR:
5✔
417
            # node of type directory
418
            name = node.name
5✔
419
            if withpath:
5✔
420
                if recalcparent:
×
421
                    name = os.sep.join([self._get_parents(node.parent), name])
×
422
                else:
423
                    name = node.relpath
×
424
            name = name.lstrip(os.sep)
5✔
425
            depth = 0
5✔
426
            if withdepth:
5✔
427
                depth = len(node.children)
5✔
428
            if withstorage:
5✔
429
                storage = self._get_storage(node)
×
430
            attr: List[Tuple[str, str]] = []
5✔
431
            if node.size:
5✔
432
                attr.append(('totsize', size_to_str(node.size, raw=raw)))
5✔
433
            if withstorage:
5✔
434
                attr.append(('storage', Logger.get_bold_text(storage.name)))
×
435
            NodePrinter.print_dir_native(pre, name, depth=depth, attr=attr)
5✔
436
        elif node.type == nodes.TYPE_STORAGE:
5✔
437
            # node of type storage
438
            sztotal = size_to_str(node.total, raw=raw)
5✔
439
            szused = size_to_str(node.total - node.free, raw=raw)
5✔
440
            nbchildren = len(node.children)
5✔
441
            pcent = 0
5✔
442
            if node.total > 0:
5✔
443
                pcent = node.free * 100 / node.total
5✔
444
            freepercent = f'{pcent:.1f}%'
5✔
445
            # get the date
446
            timestamp = ''
5✔
447
            if self._has_attr(node, 'ts'):
5✔
448
                timestamp = 'date:'
5✔
449
                timestamp += epoch_to_str(node.ts)
5✔
450
            disksize = ''
5✔
451
            # the children size
452
            recsize = self.rec_size(node, store=False)
5✔
453
            sizestr = size_to_str(recsize, raw=raw)
5✔
454
            disksize = 'totsize:' + f'{sizestr}'
5✔
455
            # format the output
456
            name = node.name
5✔
457
            args = [
5✔
458
                'nbfiles:' + f'{nbchildren}',
459
                disksize,
460
                f'free:{freepercent}',
461
                'du:' + f'{szused}/{sztotal}',
462
                timestamp]
463
            argsstring = ' | '.join(args)
5✔
464
            NodePrinter.print_storage_native(pre,
5✔
465
                                             name,
466
                                             argsstring,
467
                                             node.attr)
468
        elif node.type == nodes.TYPE_ARCHIVED:
×
469
            # archive node
470
            if self.arc:
×
471
                NodePrinter.print_archive_native(pre, node.name, node.archive)
×
472
        else:
473
            Logger.err(f'bad node encountered: {node}')
×
474

475
    def print_tree(self, node: NodeAny,
5✔
476
                   fmt: str = 'native',
477
                   raw: bool = False) -> None:
478
        """
479
        print the tree in different format
480
        @node: start node
481
        @style: when fmt=native, defines the tree style
482
        @fmt: output format
483
        @raw: print the raw size rather than human readable
484
        """
485
        if fmt == 'native':
5✔
486
            # "tree" style
487
            rend = anytree.RenderTree(node, childiter=self._sort_tree)
5✔
488
            for pre, _, thenode in rend:
5✔
489
                self._print_node_native(thenode, pre=pre,
5✔
490
                                        withdepth=True, raw=raw)
491
        elif fmt == 'csv':
5✔
492
            # csv output
493
            self._to_csv(node, raw=raw)
5✔
494
        elif fmt == 'csv-with-header':
×
495
            # csv output
496
            Logger.stdout_nocolor(self.CSV_HEADER)
×
497
            self._to_csv(node, raw=raw)
×
498

499
    def _to_csv(self, node: NodeAny,
5✔
500
                raw: bool = False) -> None:
501
        """print the tree to csv"""
502
        rend = anytree.RenderTree(node, childiter=self._sort_tree)
5✔
503
        for _, _, item in rend:
5✔
504
            self._node_to_csv(item, raw=raw)
5✔
505

506
    @staticmethod
5✔
507
    def _fzf_prompt(strings: Any) -> Any:
5✔
508
        """prompt with fzf"""
509
        try:
×
510
            from pyfzf.pyfzf import FzfPrompt  # type: ignore # pylint: disable=C0415 # noqa
×
511
            fzf = FzfPrompt()
×
512
            selected = fzf.prompt(strings)
×
513
            return selected
×
514
        except ModuleNotFoundError:
×
515
            Logger.err('install pyfzf to use fzf')
×
516
            return None
×
517

518
    def _to_fzf(self, node: NodeAny, fmt: str) -> None:
5✔
519
        """
520
        fzf prompt with list and print selected node(s)
521
        @node: node to start with
522
        @fmt: output format for selected nodes
523
        """
524
        rendered = anytree.RenderTree(node, childiter=self._sort_tree)
×
525
        the_nodes = {}
×
526
        # construct node names list
527
        for _, _, rend in rendered:
×
528
            if not rend:
×
529
                continue
×
530
            parents = self._get_parents(rend)
×
531
            storage = self._get_storage(rend)
×
532
            fullpath = os.path.join(storage.name, parents)
×
533
            the_nodes[fullpath] = rend
×
534
        # prompt with fzf
535
        paths = self._fzf_prompt(the_nodes.keys())
×
536
        # print the resulting tree
537
        subfmt = fmt.replace('fzf-', '')
×
538
        for path in paths:
×
539
            if not path:
×
540
                continue
×
541
            if path not in the_nodes:
×
542
                continue
×
543
            rend = the_nodes[path]
×
544
            self.print_tree(rend, fmt=subfmt)
×
545

546
    @staticmethod
5✔
547
    def to_dot(top: NodeTop,
5✔
548
               path: str = 'tree.dot') -> str:
549
        """export to dot for graphing"""
550
        anytree.exporter.DotExporter(top).to_dotfile(path)
5✔
551
        Logger.info(f'dot file created under \"{path}\"')
5✔
552
        return f'dot {path} -T png -o /tmp/tree.png'
5✔
553

554
    ###############################################################
555
    # searching
556
    ###############################################################
557
    def find_name(self, top: NodeTop,
5✔
558
                  key: str,
559
                  script: bool = False,
560
                  only_dir: bool = False,
561
                  startnode: Optional[NodeAny] = None,
562
                  parentfromtree: bool = False,
563
                  fmt: str = 'native',
564
                  raw: bool = False) -> List[NodeAny]:
565
        """
566
        find files based on their names
567
        @top: top node
568
        @key: term to search for
569
        @script: output script
570
        @directory: only search for directories
571
        @startpath: node to start with
572
        @parentfromtree: get path from parent instead of stored relpath
573
        @fmt: output format
574
        @raw: raw size output
575
        returns the found nodes
576
        """
577
        self._debug(f'searching for \"{key}\"')
5✔
578

579
        # search for nodes based on path
580
        start: Optional[NodeAny] = top
5✔
581
        if startnode:
5✔
582
            start = self.get_node(top, startnode)
×
583
        filterfunc = self._callback_find_name(key, only_dir)
5✔
584
        found = anytree.findall(start, filter_=filterfunc)
5✔
585
        self._debug(f'found {len(found)} node(s)')
5✔
586

587
        # compile found nodes
588
        paths = {}
5✔
589
        for item in found:
5✔
590
            item.name = fix_badchars(item.name)
5✔
591
            if hasattr(item, 'relpath'):
5✔
592
                item.relpath = fix_badchars(item.relpath)
5✔
593
            storage = self._get_storage(item)
5✔
594
            if parentfromtree:
5✔
595
                parent = self._get_parents(item)
×
596
                key = f'{storage}/{parent}/{item.relpath}'
×
597
                paths[parent] = item
×
598
            else:
599
                key = f'{storage}/{item.path}'
5✔
600
                paths[key] = item
5✔
601

602
        # handle fzf mode
603
        if fmt.startswith('fzf'):
5✔
604
            selected = self._fzf_prompt(paths.keys())
×
605
            newpaths = {}
×
606
            subfmt = fmt.replace('fzf-', '')
×
607
            for item in selected:
×
608
                if item not in paths:
×
609
                    continue
×
610
                newpaths[item] = paths[item]
×
611
                self.print_tree(newpaths[item], fmt=subfmt)
×
612
            paths = newpaths
×
613
        else:
614
            if fmt == 'native':
5✔
615
                for _, item in paths.items():
5✔
616
                    self._print_node_native(item, withpath=True,
5✔
617
                                            withdepth=True,
618
                                            withstorage=True,
619
                                            recalcparent=parentfromtree,
620
                                            raw=raw)
621
            elif fmt.startswith('csv'):
×
622
                if fmt == 'csv-with-header':
×
623
                    Logger.stdout_nocolor(self.CSV_HEADER)
×
624
                for _, item in paths.items():
×
625
                    self._node_to_csv(item, raw=raw)
×
626

627
        # execute script if any
628
        if script:
5✔
629
            tmp = ['${source}/' + x for x in paths]
5✔
630
            tmpstr = ' '.join(tmp)
5✔
631
            cmd = f'op=file; source=/media/mnt; $op {tmpstr}'
5✔
632
            Logger.info(cmd)
5✔
633

634
        return list(paths.values())
5✔
635

636
    def _callback_find_name(self, term: str, only_dir: bool) -> Any:
5✔
637
        """callback for finding files"""
638
        def find_name(node: NodeAny) -> bool:
5✔
639
            if node.type == nodes.TYPE_STORAGE:
5✔
640
                # ignore storage nodes
641
                return False
5✔
642
            if node.type == nodes.TYPE_TOP:
5✔
643
                # ignore top nodes
644
                return False
5✔
645
            if node.type == nodes.TYPE_META:
5✔
646
                # ignore meta nodes
647
                return False
×
648
            if only_dir and node.type == nodes.TYPE_DIR:
5✔
649
                # ignore non directory
650
                return False
×
651

652
            # filter
653
            if not term:
5✔
654
                return True
×
655
            if term.lower() in node.name.lower():
5✔
656
                return True
5✔
657

658
            # ignore
659
            return False
5✔
660
        return find_name
5✔
661

662
    ###############################################################
663
    # ls
664
    ###############################################################
665
    def list(self, top: NodeTop,
5✔
666
             path: str,
667
             rec: bool = False,
668
             fmt: str = 'native',
669
             raw: bool = False) -> List[NodeAny]:
670
        """
671
        list nodes for "ls"
672
        @top: top node
673
        @path: path to search for
674
        @rec: recursive walk
675
        @fmt: output format
676
        @raw: print raw size
677
        """
678
        self._debug(f'walking path: \"{path}\" from {top}')
5✔
679

680
        resolv = anytree.resolver.Resolver('name')
5✔
681
        found = []
5✔
682
        try:
5✔
683
            # resolve the path in the tree
684
            found = resolv.glob(top, path)
5✔
685
            if len(found) < 1:
5✔
686
                # nothing found
687
                self._debug('nothing found')
×
688
                return []
×
689

690
            if rec:
5✔
691
                # print the entire tree
692
                self.print_tree(found[0].parent, fmt=fmt, raw=raw)
5✔
693
                return found
5✔
694

695
            # sort found nodes
696
            found = sorted(found, key=self._sort, reverse=self.sortsize)
5✔
697

698
            # print the parent
699
            if fmt == 'native':
5✔
700
                self._print_node_native(found[0].parent,
5✔
701
                                        withpath=False,
702
                                        withdepth=True,
703
                                        raw=raw)
704
            elif fmt.startswith('csv'):
×
705
                self._node_to_csv(found[0].parent, raw=raw)
×
706
            elif fmt.startswith('fzf'):
×
707
                pass
708

709
            # print all found nodes
710
            if fmt == 'csv-with-header':
5✔
711
                Logger.stdout_nocolor(self.CSV_HEADER)
×
712
            for item in found:
5✔
713
                if fmt == 'native':
5✔
714
                    self._print_node_native(item, withpath=False,
5✔
715
                                            pre='- ',
716
                                            withdepth=True,
717
                                            raw=raw)
718
                elif fmt.startswith('csv'):
×
719
                    self._node_to_csv(item, raw=raw)
×
720
                elif fmt.startswith('fzf'):
×
721
                    self._to_fzf(item, fmt)
×
722

723
        except anytree.resolver.ChildResolverError:
5✔
724
            pass
5✔
725
        return found
5✔
726

727
    ###############################################################
728
    # tree creation
729
    ###############################################################
730
    def _add_entry(self, name: str,
5✔
731
                   top: NodeTop,
732
                   resolv: Any) -> None:
733
        """add an entry to the tree"""
734
        entries = name.rstrip(os.sep).split(os.sep)
×
735
        if len(entries) == 1:
×
736
            self.new_archive_node(name, name, top, top.name)
×
737
            return
×
738
        sub = os.sep.join(entries[:-1])
×
739
        nodename = entries[-1]
×
740
        try:
×
741
            parent = resolv.get(top, sub)
×
742
            parent = self.new_archive_node(nodename, name, parent, top.name)
×
743
        except anytree.resolver.ChildResolverError:
×
744
            self.new_archive_node(nodename, name, top, top.name)
×
745

746
    def list_to_tree(self, parent: NodeAny, names: List[str]) -> None:
5✔
747
        """convert list of files to a tree"""
748
        if not names:
×
749
            return
×
750
        resolv = anytree.resolver.Resolver('name')
×
751
        for name in names:
×
752
            name = name.rstrip(os.sep)
×
753
            self._add_entry(name, parent, resolv)
×
754

755
    ###############################################################
756
    # diverse
757
    ###############################################################
758
    def _sort_tree(self,
5✔
759
                   items: List[NodeAny]) -> List[NodeAny]:
760
        """sorting a list of items"""
761
        return sorted(items, key=self._sort, reverse=self.sortsize)
5✔
762

763
    def _sort(self, lst: NodeAny) -> Any:
5✔
764
        """sort a list"""
765
        if self.sortsize:
5✔
766
            return self._sort_size(lst)
×
767
        return self._sort_fs(lst)
5✔
768

769
    @staticmethod
5✔
770
    def _sort_fs(node: NodeAny) -> Tuple[str, str]:
5✔
771
        """sorting nodes dir first and alpha"""
772
        return (node.type, node.name.lstrip('.').lower())
5✔
773

774
    @staticmethod
5✔
775
    def _sort_size(node: NodeAny) -> float:
5✔
776
        """sorting nodes by size"""
777
        try:
×
778
            if not node.size:
×
779
                return 0
×
780
            return float(node.size)
×
781
        except AttributeError:
×
782
            return 0
×
783

784
    def _get_storage(self, node: NodeAny) -> NodeStorage:
5✔
785
        """recursively traverse up to find storage"""
786
        if node.type == nodes.TYPE_STORAGE:
5✔
787
            return node
×
788
        return cast(NodeStorage, node.ancestors[1])
5✔
789

790
    @staticmethod
5✔
791
    def _has_attr(node: NodeAny, attr: str) -> bool:
5✔
792
        """return True if node has attr as attribute"""
793
        return attr in node.__dict__.keys()
5✔
794

795
    def _get_parents(self, node: NodeAny) -> str:
5✔
796
        """get all parents recursively"""
797
        if node.type == nodes.TYPE_STORAGE:
5✔
798
            return ''
5✔
799
        if node.type == nodes.TYPE_TOP:
5✔
800
            return ''
×
801
        parent = self._get_parents(node.parent)
5✔
802
        if parent:
5✔
803
            return os.sep.join([parent, node.name])
5✔
804
        return str(node.name)
5✔
805

806
    @staticmethod
5✔
807
    def _get_hash(path: str) -> str:
5✔
808
        """return md5 hash of node"""
809
        try:
5✔
810
            return md5sum(path)
5✔
811
        except CatcliException as exc:
×
812
            Logger.err(str(exc))
×
813
            return ''
×
814

815
    def _debug(self, string: str) -> None:
5✔
816
        """print debug"""
817
        if not self.debug:
5✔
818
            return
5✔
819
        Logger.debug(string)
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc