• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ICRAR / daliuge / 4911681207

pending completion
4911681207

Pull #231

github

GitHub
Merge 9186e10d1 into e48989cce
Pull Request #231: Liu 355

180 of 229 new or added lines in 17 files covered. (78.6%)

26 existing lines in 5 files now uncovered.

15345 of 19059 relevant lines covered (80.51%)

1.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

21.05
/daliuge-translator/dlg/dropmake/utils/bash_parameter.py
1
#    ICRAR - International Centre for Radio Astronomy Research
2
#    (c) UWA - The University of Western Australia, 2017
3
#    Copyright by UWA (in the framework of the ICRAR)
4
#    All rights reserved
5
#
6
#    This library is free software; you can redistribute it and/or
7
#    modify it under the terms of the GNU Lesser General Public
8
#    License as published by the Free Software Foundation; either
9
#    version 2.1 of the License, or (at your option) any later version.
10
#
11
#    This library is distributed in the hope that it will be useful,
12
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
13
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
#    Lesser General Public License for more details.
15
#
16
#    You should have received a copy of the GNU Lesser General Public
17
#    License along with this library; if not, write to the Free Software
18
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston,
19
#    MA 02111-1307  USA
20

21
import re
2✔
22

23
inp_regex = re.compile(r"%i\[(-[0-9]+)\]")
2✔
24
out_regex = re.compile(r"%o\[(-[0-9]+)\]")
2✔
25

26

27
class BashCommand(object):
2✔
28
    """
29
    An efficient implementation of the bash command with parameters
30
    """
31

32
    def __init__(self, cmds):
2✔
33
        """
34
        create the logical form of the bash command line
35

36
        cmds: a list such that ' '.join(cmds) looks something like:
37
                 'python /home/dfms/myclean.py -d %i[-21] -f %i[-3] %o[-2] -v'
38
        """
UNCOV
39
        self._input_map = (
×
40
            dict()
41
        )  # key: logical drop id, value: a list of physical oids
UNCOV
42
        self._output_map = dict()
×
UNCOV
43
        if len(cmds) > 0 and isinstance(cmds[0], dict):
×
44
            cmds = [list(c.keys())[0] for c in cmds]
×
UNCOV
45
        cmd = " ".join(cmds)
×
UNCOV
46
        self._cmds = cmd.replace(
×
47
            ";", " ; "
48
        ).split()  # re-split just in case hidden spaces
49
        # self._cmds = re.split(';| *', cmd) # resplit for * as well as spaces
50

UNCOV
51
        for m in inp_regex.finditer(cmd):
×
52
            self._input_map[
×
53
                int(m.group(1))
54
            ] = set()  # TODO - check if sequence needs to be reserved!
UNCOV
55
        for m in out_regex.finditer(cmd):
×
56
            self._output_map[int(m.group(1))] = set()
×
57

58
    def add_input_param(self, lgn_id, oid):
2✔
59
        lgn_id = int(lgn_id)
×
60
        if lgn_id in self._input_map:
×
61
            self._input_map[lgn_id].add(oid)
×
62

63
    def add_output_param(self, lgn_id, oid):
2✔
64
        lgn_id = int(lgn_id)
×
65
        if lgn_id in self._output_map:
×
66
            self._output_map[lgn_id].add(oid)
×
67

68
    def to_real_command(self):
2✔
UNCOV
69
        def _get_delimit(matchobj):
×
70
            return " " if matchobj.start() == 0 else ","
×
71

UNCOV
72
        cmds = self._cmds
×
UNCOV
73
        for k in range(len(cmds)):
×
UNCOV
74
            d = cmds[k]
×
UNCOV
75
            imatch = inp_regex.search(d)
×
UNCOV
76
            omatch = out_regex.search(d)
×
UNCOV
77
            if imatch is not None:
×
78
                lgn_id = int(imatch.group(1))
×
79
                cmds[k] = d.replace(
×
80
                    imatch.group(0),
81
                    _get_delimit(imatch).join(
82
                        ["%i[{0}]".format(x) for x in self._input_map[lgn_id]]
83
                    ),
84
                )
UNCOV
85
            elif omatch is not None:
×
86
                lgn_id = int(omatch.group(1))
×
87
                cmds[k] = d.replace(
×
88
                    omatch.group(0),
89
                    _get_delimit(omatch).join(
90
                        ["%o[{0}]".format(x) for x in self._output_map[lgn_id]]
91
                    ),
92
                )
93

UNCOV
94
        return " ".join(cmds)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc