• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

freqtrade / freqtrade / 9394559170

26 Apr 2024 06:36AM UTC coverage: 94.656% (-0.02%) from 94.674%
9394559170

push

github

xmatthias
Loader should be passed as kwarg for clarity

20280 of 21425 relevant lines covered (94.66%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.35
/freqtrade/strategy/strategyupdater.py
1
import shutil
1✔
2
from pathlib import Path
1✔
3

4
import ast_comments
1✔
5

6
from freqtrade.constants import Config
1✔
7

8

9
class StrategyUpdater:
1✔
10
    name_mapping = {
1✔
11
        'ticker_interval': 'timeframe',
12
        'buy': 'enter_long',
13
        'sell': 'exit_long',
14
        'buy_tag': 'enter_tag',
15
        'sell_reason': 'exit_reason',
16

17
        'sell_signal': 'exit_signal',
18
        'custom_sell': 'custom_exit',
19
        'force_sell': 'force_exit',
20
        'emergency_sell': 'emergency_exit',
21

22
        # Strategy/config settings:
23
        'use_sell_signal': 'use_exit_signal',
24
        'sell_profit_only': 'exit_profit_only',
25
        'sell_profit_offset': 'exit_profit_offset',
26
        'ignore_roi_if_buy_signal': 'ignore_roi_if_entry_signal',
27
        'forcebuy_enable': 'force_entry_enable',
28
    }
29

30
    function_mapping = {
1✔
31
        'populate_buy_trend': 'populate_entry_trend',
32
        'populate_sell_trend': 'populate_exit_trend',
33
        'custom_sell': 'custom_exit',
34
        'check_buy_timeout': 'check_entry_timeout',
35
        'check_sell_timeout': 'check_exit_timeout',
36
        # '': '',
37
    }
38
    # order_time_in_force, order_types, unfilledtimeout
39
    otif_ot_unfilledtimeout = {
1✔
40
        'buy': 'entry',
41
        'sell': 'exit',
42
    }
43

44
    # create a dictionary that maps the old column names to the new ones
45
    rename_dict = {'buy': 'enter_long', 'sell': 'exit_long', 'buy_tag': 'enter_tag'}
1✔
46

47
    def start(self, config: Config, strategy_obj: dict) -> None:
1✔
48
        """
49
        Run strategy updater
50
        It updates a strategy to v3 with the help of the ast-module
51
        :return: None
52
        """
53

54
        source_file = strategy_obj['location']
1✔
55
        strategies_backup_folder = Path.joinpath(config['user_data_dir'], "strategies_orig_updater")
1✔
56
        target_file = Path.joinpath(strategies_backup_folder, strategy_obj['location_rel'])
1✔
57

58
        # read the file
59
        with Path(source_file).open('r') as f:
1✔
60
            old_code = f.read()
1✔
61
        if not strategies_backup_folder.is_dir():
1✔
62
            Path(strategies_backup_folder).mkdir(parents=True, exist_ok=True)
1✔
63

64
        # backup original
65
        # => currently no date after the filename,
66
        # could get overridden pretty fast if this is fired twice!
67
        # The folder is always the same and the file name too (currently).
68
        shutil.copy(source_file, target_file)
1✔
69

70
        # update the code
71
        new_code = self.update_code(old_code)
1✔
72
        # write the modified code to the destination folder
73
        with Path(source_file).open('w') as f:
1✔
74
            f.write(new_code)
1✔
75

76
    # define the function to update the code
77
    def update_code(self, code):
1✔
78
        # parse the code into an AST
79
        tree = ast_comments.parse(code)
1✔
80

81
        # use the AST to update the code
82
        updated_code = self.modify_ast(tree)
1✔
83

84
        # return the modified code without executing it
85
        return updated_code
1✔
86

87
    # function that uses the ast module to update the code
88
    def modify_ast(self, tree):  # noqa
1✔
89
        # use the visitor to update the names and functions in the AST
90
        NameUpdater().visit(tree)
1✔
91

92
        # first fix the comments, so it understands "\n" properly inside multi line comments.
93
        ast_comments.fix_missing_locations(tree)
1✔
94
        ast_comments.increment_lineno(tree, n=1)
1✔
95

96
        # generate the new code from the updated AST
97
        # without indent {} parameters would just be written straight one after the other.
98

99
        # ast_comments would be amazing since this is the only solution that carries over comments,
100
        # but it does currently not have an unparse function, hopefully in the future ... !
101
        # return ast_comments.unparse(tree)
102

103
        return ast_comments.unparse(tree)
1✔
104

105

106
# Here we go through each respective node, slice, elt, key ... to replace outdated entries.
107
class NameUpdater(ast_comments.NodeTransformer):
1✔
108
    def generic_visit(self, node):
1✔
109

110
        # space is not yet transferred from buy/sell to entry/exit and thereby has to be skipped.
111
        if isinstance(node, ast_comments.keyword):
1✔
112
            if node.arg == "space":
1✔
113
                return node
1✔
114

115
        # from here on this is the original function.
116
        for field, old_value in ast_comments.iter_fields(node):
1✔
117
            if isinstance(old_value, list):
1✔
118
                new_values = []
1✔
119
                for value in old_value:
1✔
120
                    if isinstance(value, ast_comments.AST):
1✔
121
                        value = self.visit(value)
1✔
122
                        if value is None:
1✔
123
                            continue
×
124
                        elif not isinstance(value, ast_comments.AST):
1✔
125
                            new_values.extend(value)
×
126
                            continue
×
127
                    new_values.append(value)
1✔
128
                old_value[:] = new_values
1✔
129
            elif isinstance(old_value, ast_comments.AST):
1✔
130
                new_node = self.visit(old_value)
1✔
131
                if new_node is None:
1✔
132
                    delattr(node, field)
×
133
                else:
134
                    setattr(node, field, new_node)
1✔
135
        return node
1✔
136

137
    def visit_Expr(self, node):
1✔
138
        if hasattr(node.value, "left") and hasattr(node.value.left, "id"):
1✔
139
            node.value.left.id = self.check_dict(StrategyUpdater.name_mapping, node.value.left.id)
1✔
140
            self.visit(node.value)
1✔
141
        return node
1✔
142

143
    # Renames an element if contained inside a dictionary.
144
    @staticmethod
1✔
145
    def check_dict(current_dict: dict, element: str):
1✔
146
        if element in current_dict:
1✔
147
            element = current_dict[element]
1✔
148
        return element
1✔
149

150
    def visit_arguments(self, node):
1✔
151
        if isinstance(node.args, list):
1✔
152
            for arg in node.args:
1✔
153
                arg.arg = self.check_dict(StrategyUpdater.name_mapping, arg.arg)
1✔
154
        return node
1✔
155

156
    def visit_Name(self, node):
1✔
157
        # if the name is in the mapping, update it
158
        node.id = self.check_dict(StrategyUpdater.name_mapping, node.id)
1✔
159
        return node
1✔
160

161
    def visit_Import(self, node):
1✔
162
        # do not update the names in import statements
163
        return node
1✔
164

165
    def visit_ImportFrom(self, node):
1✔
166
        # if hasattr(node, "module"):
167
        #    if node.module == "freqtrade.strategy.hyper":
168
        #        node.module = "freqtrade.strategy"
169
        return node
1✔
170

171
    def visit_If(self, node: ast_comments.If):
1✔
172
        for child in ast_comments.iter_child_nodes(node):
1✔
173
            self.visit(child)
1✔
174
        return node
1✔
175

176
    def visit_FunctionDef(self, node):
1✔
177
        node.name = self.check_dict(StrategyUpdater.function_mapping, node.name)
1✔
178
        self.generic_visit(node)
1✔
179
        return node
1✔
180

181
    def visit_Attribute(self, node):
1✔
182
        if (
1✔
183
                isinstance(node.value, ast_comments.Name)
184
                and node.value.id == 'trade'
185
                and node.attr == 'nr_of_successful_buys'
186
        ):
187
            node.attr = 'nr_of_successful_entries'
1✔
188
        return node
1✔
189

190
    def visit_ClassDef(self, node):
1✔
191
        # check if the class is derived from IStrategy
192
        if any(isinstance(base, ast_comments.Name) and
1✔
193
               base.id == 'IStrategy' for base in node.bases):
194
            # check if the INTERFACE_VERSION variable exists
195
            has_interface_version = any(
1✔
196
                isinstance(child, ast_comments.Assign) and
197
                isinstance(child.targets[0], ast_comments.Name) and
198
                child.targets[0].id == 'INTERFACE_VERSION'
199
                for child in node.body
200
            )
201

202
            # if the INTERFACE_VERSION variable does not exist, add it as the first child
203
            if not has_interface_version:
1✔
204
                node.body.insert(0, ast_comments.parse('INTERFACE_VERSION = 3').body[0])
1✔
205
            # otherwise, update its value to 3
206
            else:
207
                for child in node.body:
1✔
208
                    if (
1✔
209
                            isinstance(child, ast_comments.Assign)
210
                            and isinstance(child.targets[0], ast_comments.Name)
211
                            and child.targets[0].id == 'INTERFACE_VERSION'
212
                    ):
213
                        child.value = ast_comments.parse('3').body[0].value
1✔
214
        self.generic_visit(node)
1✔
215
        return node
1✔
216

217
    def visit_Subscript(self, node):
1✔
218
        if isinstance(node.slice, ast_comments.Constant):
1✔
219
            if node.slice.value in StrategyUpdater.rename_dict:
1✔
220
                # Replace the slice attributes with the values from rename_dict
221
                node.slice.value = StrategyUpdater.rename_dict[node.slice.value]
×
222
        if hasattr(node.slice, "elts"):
1✔
223
            self.visit_elts(node.slice.elts)
1✔
224
        if hasattr(node.slice, "value"):
1✔
225
            if hasattr(node.slice.value, "elts"):
1✔
226
                self.visit_elts(node.slice.value.elts)
×
227
        return node
1✔
228

229
    # elts can have elts (technically recursively)
230
    def visit_elts(self, elts):
1✔
231
        if isinstance(elts, list):
1✔
232
            for elt in elts:
1✔
233
                self.visit_elt(elt)
1✔
234
        else:
235
            self.visit_elt(elts)
1✔
236
        return elts
1✔
237

238
    # sub function again needed since the structure itself is highly flexible ...
239
    def visit_elt(self, elt):
1✔
240
        if isinstance(elt, ast_comments.Constant) and elt.value in StrategyUpdater.rename_dict:
1✔
241
            elt.value = StrategyUpdater.rename_dict[elt.value]
1✔
242
        if hasattr(elt, "elts"):
1✔
243
            self.visit_elts(elt.elts)
1✔
244
        if hasattr(elt, "args"):
1✔
245
            if isinstance(elt.args, ast_comments.arguments):
1✔
246
                self.visit_elts(elt.args)
1✔
247
            else:
248
                for arg in elt.args:
1✔
249
                    self.visit_elts(arg)
1✔
250
        return elt
1✔
251

252
    def visit_Constant(self, node):
1✔
253
        node.value = self.check_dict(StrategyUpdater.otif_ot_unfilledtimeout, node.value)
1✔
254
        node.value = self.check_dict(StrategyUpdater.name_mapping, node.value)
1✔
255
        return node
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc