• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vantage6 / vantage6 / 26948030571

04 Jun 2026 11:08AM UTC coverage: 62.816% (-3.5%) from 66.301%
26948030571

push

github

web-flow
Merge pull request #2571 from vantage6/release/5.0

Release/5.0

1216 of 2107 new or added lines in 45 files covered. (57.71%)

15 existing lines in 5 files now uncovered.

1642 of 2614 relevant lines covered (62.82%)

0.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.18
/vantage6/vantage6/cli/algorithm/generate_algorithm_json.py
1
import importlib
1✔
2
import inspect
1✔
3
import json
1✔
4
import os
1✔
5
import sys
1✔
6
from collections.abc import Callable
1✔
7
from inspect import getmembers, isfunction, ismodule, signature
1✔
8
from pathlib import Path
1✔
9
from types import ModuleType, UnionType
1✔
10
from typing import Any, OrderedDict, get_args, get_origin
1✔
11

12
import click
1✔
13
import pandas as pd
1✔
14
import questionary as q
1✔
15

16
from vantage6.common import error, info, warning
1✔
17
from vantage6.common.algorithm_function import (
1✔
18
    get_vantage6_decorator_type,
19
    is_vantage6_algorithm_func,
20
)
21
from vantage6.common.enum import AlgorithmArgumentType, AlgorithmStepType, StrEnumBase
1✔
22

23
from vantage6.algorithm.client import AlgorithmClient
1✔
24
from vantage6.algorithm.preprocessing.algorithm_json_data import (
1✔
25
    PREPROCESSING_FUNCTIONS_JSON_DATA,
26
)
27

28

29
class MergePreference:
1✔
30
    """Singleton class to manage global merge preference state"""
31

32
    _instance = None
1✔
33
    _prefer_existing = None
1✔
34

35
    def __new__(cls):
1✔
NEW
36
        if cls._instance is None:
×
NEW
37
            cls._instance = super(MergePreference, cls).__new__(cls)
×
NEW
38
        return cls._instance
×
39

40
    @classmethod
1✔
41
    def get_preference(cls) -> bool | None:
1✔
42
        """Get the current merge preference"""
NEW
43
        return cls._prefer_existing
×
44

45
    @classmethod
1✔
46
    def set_preference(cls, prefer_existing: bool) -> None:
1✔
47
        """Set the merge preference globally"""
NEW
48
        cls._prefer_existing = prefer_existing
×
49

50
    @classmethod
1✔
51
    def reset(cls) -> None:
1✔
52
        """Reset the preference to None"""
53
        cls._prefer_existing = None
1✔
54

55

56
class FunctionArgumentType(StrEnumBase):
1✔
57
    """Type of the function argument"""
58

59
    PARAMETER = "parameter"
1✔
60
    DATAFRAME = "dataframe"
1✔
61

62

63
class Function:
1✔
64
    """Class to handle a function and its JSON representation"""
65

66
    def __init__(self, func: Callable):
1✔
67
        self.func = func
1✔
68
        self.name = func.__name__
1✔
69
        self.signature = signature(func)
1✔
70
        self.docstring = func.__doc__
1✔
71
        self.json = None
1✔
72
        self.step_type = None
1✔
73

74
    def prepare_json(self) -> None:
1✔
75
        """Convert the function to a JSON format"""
NEW
76
        self.step_type = self._get_step_type()
×
NEW
77
        function_json = {
×
78
            "name": self.name,
79
            "display_name": self._pretty_print_name(self.name),
80
            "standalone": True,
81
            "description": self._extract_headline_of_docstring(),
82
            "step_type": self.step_type.value if self.step_type else None,
83
            "ui_visualizations": [],
84
            "arguments": [],
85
            "databases": [],
86
        }
87

NEW
88
        parameters = OrderedDict(self.signature.parameters)
×
89

90
        # if the function is a data extraction function, the first argument is a dict
91
        # with database connection details. This argument should not be added to the
92
        # function json. Instead, a database should be added to the function json.
NEW
93
        if self.step_type == AlgorithmStepType.DATA_EXTRACTION:
×
NEW
94
            function_json["databases"].append(
×
95
                {
96
                    "name": "Database",
97
                    "description": "Database to extract data from",
98
                }
99
            )
100
            # remove database connection details from the signature
NEW
101
            parameters.popitem(last=False)
×
102

103
        # add the arguments to the function json
NEW
104
        for name, param in parameters.items():
×
NEW
105
            arg_json, arg_type = self._get_argument_json(name, param)
×
NEW
106
            if arg_json is None:
×
NEW
107
                continue
×
NEW
108
            elif arg_type == FunctionArgumentType.DATAFRAME:
×
NEW
109
                function_json["databases"].append(arg_json)
×
110
            else:
NEW
111
                function_json["arguments"].append(arg_json)
×
NEW
112
        self.json = function_json
×
113

114
    def merge_with_template_json_data(self) -> None:
1✔
115
        """
116
        Merge the function jsons with the json data from the algorithm_json_data module
117
        """
118
        # Only merge the function jsons with template json data if it is an
119
        # infrastructure-defined function
NEW
120
        if (
×
121
            not self._is_func_defined_in_vantage6()
122
            or self.json["name"] not in PREPROCESSING_FUNCTIONS_JSON_DATA
123
        ):
NEW
124
            return
×
125

126
        # get the template json data for the function
NEW
127
        template_json = PREPROCESSING_FUNCTIONS_JSON_DATA[self.json["name"]]
×
128
        # merge the dicts, with the template dict taking precedence
NEW
129
        for argument in self.json["arguments"]:
×
NEW
130
            if argument["name"] in template_json["arguments"]:
×
NEW
131
                argument.update(template_json["arguments"][argument["name"]])
×
NEW
132
        self._expand_frontend_arguments(template_json)
×
133

134
    def merge_with_existing_json(self, existing_json: dict) -> None:
1✔
135
        """Merge the function json with the existing json data"""
136
        self._expand_frontend_arguments(existing_json)
1✔
137
        existing_without_frontend = {
1✔
138
            key: value
139
            for key, value in existing_json.items()
140
            if key != "frontend_arguments"
141
        }
142
        self._merge_dicts(self.json, existing_without_frontend)
1✔
143

144
    def _expand_frontend_arguments(self, source_json: dict) -> None:
1✔
145
        """
146
        Expand ``frontend_arguments`` into the ``arguments`` list.
147

148
        Used for built-in preprocessing templates and legacy algorithm_store.json
149
        files that still store frontend-only arguments separately.
150
        """
151
        if "frontend_arguments" not in source_json:
1✔
152
            return
1✔
153
        for frontend_argument in source_json["frontend_arguments"]:
1✔
154
            self._add_frontend_argument(source_json, frontend_argument)
1✔
155

156
    def _is_func_defined_in_vantage6(self) -> bool:
1✔
157
        """Check if the function is defined in the vantage6 package"""
NEW
158
        return self.func.__module__.startswith("vantage6.algorithm.")
×
159

160
    def _merge_dicts(self, target: dict, source: dict) -> None:
1✔
161
        """
162
        Recursively merge source dict into target dict, with source taking precedence
163
        """
164
        for key, value in source.items():
1✔
165
            if key in target:
1✔
166
                if isinstance(value, dict) and isinstance(target[key], dict):
1✔
167
                    # Recursively merge nested dictionaries
NEW
168
                    self._merge_dicts(target[key], value)
×
169
                else:
170
                    self._replace_target_with_source(target, key, value)
1✔
171
            else:
172
                target[key] = value
1✔
173

174
    def _replace_target_with_source(self, target: dict, key: str, value: Any) -> None:
1✔
175
        """Replace the value in target with the one from source"""
176
        if key not in target:
1✔
NEW
177
            target[key] = value
×
NEW
178
            return
×
179
        if target[key] == value:
1✔
180
            return
1✔
181

NEW
182
        prefer_existing = MergePreference.get_preference()
×
NEW
183
        if prefer_existing:
×
NEW
184
            target[key] = value
×
NEW
185
        elif prefer_existing is None:
×
NEW
186
            info(
×
187
                f"Different values for the same key '{key}' in function '{self.name}' "
188
                "were found."
189
            )
NEW
190
            info(f"Value from function itself: {target[key]}")
×
NEW
191
            info(f"Value from algorithm.json: {value}")
×
NEW
192
            result = q.select(
×
193
                "Please select the value to keep:",
194
                choices=[
195
                    "function itself",
196
                    "algorithm.json",
197
                    "function itself (also for all other conflicts)",
198
                    "algorithm.json (also for all other conflicts)",
199
                ],
200
            ).unsafe_ask()
NEW
201
            if result == "algorithm.json":
×
NEW
202
                target[key] = value
×
NEW
203
            elif result == "function itself":
×
NEW
204
                pass  # do nothing
×
NEW
205
            elif result == "function itself (also for all other conflicts)":
×
NEW
206
                MergePreference.set_preference(False)
×
NEW
207
            elif result == "algorithm.json (also for all other conflicts)":
×
NEW
208
                MergePreference.set_preference(True)
×
NEW
209
                target[key] = value
×
210

211
    def _get_argument_json(
1✔
212
        self, name: str, param: inspect.Parameter, warn_if_unsupported_arg: bool = True
213
    ) -> tuple[dict | None, FunctionArgumentType | None]:
214
        """Get the argument JSON"""
215

NEW
216
        if param.annotation is None:
×
NEW
217
            error(f"Function {self.name} has no annotation for argument {name}")
×
NEW
218
            info(f"Please add a type annotation to the argument {name}")
×
NEW
219
            info(f"For example, for string arguments: 'def {self.name}({name}: str)'")
×
NEW
220
            exit(1)
×
221

NEW
222
        if param.annotation is AlgorithmClient:
×
223
            # Algorithm client arguments do not have to be provided by the user
NEW
224
            return None, None
×
NEW
225
        elif param.annotation is pd.DataFrame:
×
226
            # this is an argument that requires the user to supply a dataframe. That
227
            # only requires a name and description.
NEW
228
            return {
×
229
                "name": name if name != "df" else "Data to use",
230
                "description": self._extract_parameter_description(name),
231
            }, FunctionArgumentType.DATAFRAME
232
        else:
233
            # This is a regular function parameter
NEW
234
            type_ = self._get_argument_type(
×
235
                param, name, warn_if_unsupported=not self._is_func_defined_in_vantage6()
236
            )
NEW
237
            arg_json = {
×
238
                "name": name,
239
                "display_name": self._pretty_print_name(name),
240
                "description": self._extract_parameter_description(name),
241
                "type": type_.value if type_ else None,
242
                "has_default_value": param.default != inspect.Parameter.empty,
243
                "is_frontend_only": False,
244
            }
NEW
245
            if param.default != inspect.Parameter.empty:
×
NEW
246
                arg_json["default_value"] = param.default
×
247

NEW
248
            return arg_json, FunctionArgumentType.PARAMETER
×
249

250
    def _add_frontend_argument(
1✔
251
        self, template_json: dict, frontend_argument: str
252
    ) -> None:
253
        """Add a frontend argument to the function json"""
254
        frontend_argument_json: dict = template_json["frontend_arguments"][
1✔
255
            frontend_argument
256
        ]
257
        before_arg_name = frontend_argument_json.pop("before_argument")
1✔
258

259
        try:
1✔
260
            before_arg_idx = next(
1✔
261
                idx
262
                for idx, arg in enumerate(self.json["arguments"])
263
                if arg["name"] == before_arg_name
264
            )
265
            self.json["arguments"].insert(before_arg_idx, frontend_argument_json)
1✔
NEW
266
        except StopIteration:
×
NEW
267
            warning(
×
268
                f"Could not find argument {before_arg_name} in function "
269
                f"{self.json['name']}. Frontend argument {frontend_argument} "
270
                "will not be added."
271
            )
272

273
    def _get_argument_type(
1✔
274
        self, param: inspect.Parameter, name: str, warn_if_unsupported: bool = True
275
    ) -> AlgorithmArgumentType | None:
276
        """Get the type of the argument"""
NEW
277
        if isinstance(param.annotation, UnionType):
×
278
            # Arguments with default values may have type 'str | None'. If that is the
279
            # case, we want to use the type of the first element in the union.
NEW
280
            if len(param.annotation.__args__) > 2:
×
281
                # if there are more than 2 elements in the union, don't handle
NEW
282
                if warn_if_unsupported:
×
NEW
283
                    warning(
×
284
                        f"Unsupported argument type: {param.annotation} for argument "
285
                        f"{name} in function {self.name}"
286
                    )
NEW
287
                return None
×
NEW
288
            elif len(param.annotation.__args__) == 2:
×
289
                # if there are two, we want to use the first one if the second is None
NEW
290
                if param.annotation.__args__[1] is type(None):
×
NEW
291
                    type_ = param.annotation.__args__[0]
×
292
                else:
NEW
293
                    if warn_if_unsupported:
×
NEW
294
                        warning(
×
295
                            f"Unsupported argument type: {param.annotation} for "
296
                            f"argument '{name}' in function '{self.name}'"
297
                        )
NEW
298
                    return None
×
299
            else:
300
                # normally, unions have 2+ elements. If there is only one, use that
NEW
301
                type_ = param.annotation.__args__[0]
×
302
        else:
NEW
303
            type_ = param.annotation
×
304

NEW
305
        if type_ is str:
×
NEW
306
            return AlgorithmArgumentType.STRING
×
NEW
307
        elif type_ is dict:
×
NEW
308
            return AlgorithmArgumentType.JSON
×
NEW
309
        elif type_ is int:
×
NEW
310
            return AlgorithmArgumentType.INTEGER
×
NEW
311
        elif type_ is float:
×
NEW
312
            return AlgorithmArgumentType.FLOAT
×
NEW
313
        elif type_ is bool:
×
NEW
314
            return AlgorithmArgumentType.BOOLEAN
×
NEW
315
        elif type_ is list:
×
NEW
316
            return AlgorithmArgumentType.STRINGS
×
NEW
317
        elif get_origin(type_) is list:
×
318
            # Handle generic list types like list[str], list[int], list[float]
NEW
319
            args = get_args(type_)
×
NEW
320
            if len(args) == 1:
×
NEW
321
                inner_type = args[0]
×
NEW
322
                if inner_type is str:
×
NEW
323
                    return AlgorithmArgumentType.STRINGS
×
NEW
324
                elif inner_type is int:
×
NEW
325
                    return AlgorithmArgumentType.INTEGERS
×
NEW
326
                elif inner_type is float:
×
NEW
327
                    return AlgorithmArgumentType.FLOATS
×
328
            # Fallback: if list has no args or multiple args, default to STRINGS
NEW
329
            return AlgorithmArgumentType.STRINGS
×
330
        else:
NEW
331
            if warn_if_unsupported:
×
NEW
332
                warning(
×
333
                    f"Unsupported argument type: {param.annotation} for argument "
334
                    f"'{name}' in function '{self.name}'"
335
                )
NEW
336
            return None
×
337

338
    def _pretty_print_name(self, name: str) -> str:
1✔
339
        """Pretty print the name of the function"""
NEW
340
        pretty = name.replace("_", " ")
×
NEW
341
        if len(pretty):
×
NEW
342
            pretty = pretty[0].upper() + pretty[1:]
×
NEW
343
        return pretty
×
344

345
    def _extract_headline_of_docstring(self) -> str:
1✔
346
        """Extract the headline of the docstring"""
NEW
347
        if not self.docstring:
×
NEW
348
            return ""
×
349

350
        # Split by double newlines to get the first paragraph
NEW
351
        paragraphs = self.docstring.split("\n\n")
×
NEW
352
        first_paragraph = paragraphs[0]
×
353

354
        # Split by single newlines and join the lines with spaces
NEW
355
        lines = first_paragraph.split("\n")
×
NEW
356
        header = " ".join(line.strip() for line in lines if line.strip() != "")
×
NEW
357
        return header
×
358

359
    def _get_step_type(self) -> AlgorithmStepType | None:
1✔
360
        """Get the step type of the function"""
NEW
361
        decorator_type = get_vantage6_decorator_type(self.func)
×
NEW
362
        if decorator_type in AlgorithmStepType.list():
×
NEW
363
            return decorator_type
×
364
        else:
NEW
365
            warning(
×
366
                f"Unsupported decorator type: {decorator_type} for function {self.name}"
367
            )
NEW
368
            return None
×
369

370
    def _extract_parameter_description(self, name: str) -> str:
1✔
371
        """Extract the description of the parameter"""
NEW
372
        if not self.docstring:
×
NEW
373
            return ""
×
374

375
        # Try both patterns: "{name}:" and "{name} :"
NEW
376
        patterns = [f"{name}:", f"{name} :"]
×
377

NEW
378
        for pattern in patterns:
×
NEW
379
            if pattern in self.docstring:
×
NEW
380
                return self.docstring.split(pattern)[1].split("\n")[1].strip()
×
381

NEW
382
        return ""
×
383

384

385
@click.command()
1✔
386
@click.option(
1✔
387
    "--algo-function-file",
388
    default=None,
389
    type=str,
390
    help="Path to the file containing or importing the algorithm functions",
391
)
392
@click.option(
1✔
393
    "--current-json",
394
    default=None,
395
    type=str,
396
    help="Path to the current algorithm.json file",
397
)
398
@click.option(
1✔
399
    "--output-file",
400
    default="new-algorithm.json",
401
    type=str,
402
    help="Path to the output file",
403
)
404
def cli_algorithm_generate_json(
1✔
405
    algo_function_file: str, current_json: str, output_file: str
406
) -> dict:
407
    """
408
    Generate an updated algorithm.json file to submit to the algorithm store.
409

410
    You should provide the path to the file where the algorithm functions are
411
    defined.
412

413
    Note that if you do asterisk ('from x import *') imports, all functions from the
414
    imported module will be added to the algorithm.json file.
415
    """
NEW
416
    algo_function_file = _get_algo_function_file_location(algo_function_file)
×
417

NEW
418
    current_json = _get_current_json_location(current_json)
×
419

420
    # read the current algorithm.json file
NEW
421
    with open(current_json, "r", encoding="utf-8") as f:
×
NEW
422
        current_json_data = json.load(f)
×
423

424
    # get the functions from the file
NEW
425
    info(f"Importing functions from {algo_function_file}...")
×
NEW
426
    functions = _get_functions_from_file(algo_function_file)
×
NEW
427
    function_objs = [Function(f) for f in functions]
×
428

NEW
429
    info("Converting functions to JSON...")
×
NEW
430
    for function in function_objs:
×
NEW
431
        function.prepare_json()
×
NEW
432
        function.merge_with_template_json_data()
×
433

434
        # merge the function jsons with the existing json data
NEW
435
        current_json_func = [
×
436
            f for f in current_json_data["functions"] if f["name"] == function.name
437
        ]
NEW
438
        if current_json_func:
×
NEW
439
            function.merge_with_existing_json(current_json_func[0])
×
440

441
    # write the new algorithm.json file
NEW
442
    info(f"Writing new algorithm.json file to {output_file}...")
×
NEW
443
    current_json_data["functions"] = [f.json for f in function_objs]
×
NEW
444
    with open(output_file, "w", encoding="utf-8") as f:
×
NEW
445
        json.dump(current_json_data, f, indent=2)
×
446

NEW
447
    info(f"New algorithm.json file written to: {output_file}")
×
448

NEW
449
    warning("-" * 60)
×
NEW
450
    warning(f"Check the generated '{output_file}' file before ")
×
NEW
451
    warning("submitting it to the algorithm store!")
×
NEW
452
    warning("-" * 60)
×
453

454

455
def _get_functions_from_file(file_path: str) -> None:
1✔
456
    """Get the functions from the file
457

458
    Parameters
459
    ----------
460
    file_path : str
461
        Path to the file containing or importing the algorithm functions
462
    """
463
    # Convert path to absolute path
NEW
464
    file_path = str(Path(file_path).resolve())
×
465

466
    # Get the package root directory (two levels up from the file)
NEW
467
    package_root = str(Path(file_path).parent.parent)
×
NEW
468
    if package_root not in sys.path:
×
NEW
469
        sys.path.insert(0, package_root)
×
470

471
    # Get the module name from the file path, including the package name
NEW
472
    package_name = Path(file_path).parent.name
×
NEW
473
    module_name = f"{package_name}.{Path(file_path).stem}"
×
474

475
    # Import the module
NEW
476
    try:
×
NEW
477
        module = importlib.import_module(module_name)
×
NEW
478
    except ImportError as e:
×
NEW
479
        raise ImportError(f"Could not import module {module_name}: {str(e)}") from e
×
480

NEW
481
    def get_members_from_module(module: ModuleType) -> list:
×
482
        """Get the functions from the module"""
NEW
483
        return [
×
484
            member for name, member in getmembers(module) if not name.startswith("_")
485
        ]
486

487
    # get the functions from the algorithm module
NEW
488
    import_members = get_members_from_module(module)
×
NEW
489
    import_functions = [
×
490
        m for m in import_members if isfunction(m) and is_vantage6_algorithm_func(m)
491
    ]
NEW
492
    import_modules = [m for m in import_members if ismodule(m)]
×
493

494
    # add the functions from the imported modules (only 1 level deep). This is so that
495
    # if you do e.g. 'from vantage6.algorithm.preprocessing import *', all functions
496
    # from within those modules are also imported.
NEW
497
    for import_module in import_modules:
×
NEW
498
        second_level_import_members = get_members_from_module(import_module)
×
NEW
499
        import_functions.extend(
×
500
            [
501
                m
502
                for m in second_level_import_members
503
                if isfunction(m) and is_vantage6_algorithm_func(m)
504
            ]
505
        )
506

NEW
507
    return import_functions
×
508

509

510
def _get_algo_function_file_location(algo_function_file: str | None) -> None:
1✔
511
    """Get user input for the algorithm creation
512

513
    Parameters
514
    ----------
515
    algo_function_file : str
516
        Path to the file containing or importingthe algorithm functions
517
    """
NEW
518
    if not algo_function_file:
×
NEW
519
        default_dir = str(Path(os.getcwd()) / "__init__.py")
×
NEW
520
        algo_function_file = q.text(
×
521
            "Path to the file containing or importing the algorithm functions:",
522
            default=default_dir,
523
        ).unsafe_ask()
524

525
    # Convert to absolute path using pathlib
NEW
526
    algo_function_file = str(Path(algo_function_file).resolve())
×
527

528
    # check if the file exists
NEW
529
    if not Path(algo_function_file).exists():
×
NEW
530
        raise FileNotFoundError(f"File {algo_function_file} does not exist")
×
531

NEW
532
    return algo_function_file
×
533

534

535
def _get_current_json_location(current_json: str) -> None:
1✔
536
    """Get user input for the current algorithm.json file
537

538
    Parameters
539
    ----------
540
    current_json : str
541
        Path to the current algorithm.json file
542
    """
NEW
543
    if not current_json:
×
NEW
544
        default_dir = str(Path(os.getcwd()) / "algorithm_store.json")
×
NEW
545
        current_json = q.text(
×
546
            "Path to the current algorithm.json file:",
547
            default=default_dir,
548
        ).unsafe_ask()
549

550
    # Convert to absolute path using pathlib
NEW
551
    current_json = str(Path(current_json).resolve())
×
552

553
    # check if the file exists
NEW
554
    if not Path(current_json).exists():
×
NEW
555
        raise FileNotFoundError(f"File {current_json} does not exist")
×
556

NEW
557
    return current_json
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc