• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DHARPA-Project / kiara_plugin.network_analysis / 16399888719

20 Jul 2025 12:28PM UTC coverage: 55.414% (+0.3%) from 55.096%
16399888719

push

github

makkus
fix: fix #30, add 'first_row_is_header' checkbox to notebook

86 of 166 branches covered (51.81%)

Branch coverage included in aggregate %.

2 of 4 new or added lines in 2 files covered. (50.0%)

610 of 1090 relevant lines covered (55.96%)

2.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

36.36
/src/kiara_plugin/network_analysis/modules/filter.py
1
# -*- coding: utf-8 -*-
2
from typing import Any, Dict, Mapping, Union
5✔
3

4
from kiara.models.values.value import Value
5✔
5
from kiara.modules import ValueMapSchema
5✔
6
from kiara.modules.included_core_modules.filter import FilterModule
5✔
7
from kiara_plugin.network_analysis import NetworkData
5✔
8
from kiara_plugin.network_analysis.defaults import (
5✔
9
    COMPONENT_ID_COLUMN_NAME,
10
    LABEL_COLUMN_NAME,
11
    NODE_ID_COLUMN_NAME,
12
    SOURCE_COLUMN_NAME,
13
    TARGET_COLUMN_NAME,
14
)
15

16

17
class NetworkDataFiltersModule(FilterModule):
5✔
18
    _module_type_name = "network_data.filters"
5✔
19

20
    @classmethod
5✔
21
    def retrieve_supported_type(cls) -> Union[Dict[str, Any], str]:
5✔
22
        return "network_data"
5✔
23

24
    def create_filter_inputs(self, filter_name: str) -> Union[None, ValueMapSchema]:
5✔
25
        if filter_name == "select_component":
5✔
26
            return {
5✔
27
                "component_id": {
28
                    "type": "integer",
29
                    "doc": "The id of the componen to select.",
30
                    "optional": False,
31
                    "default": 0,
32
                },
33
            }
34
        return None
×
35

36
    def filter__select_component(self, value: Value, filter_inputs: Mapping[str, Any]):
5✔
37
        import duckdb
×
38

39
        component_id = filter_inputs["component_id"]
×
40

41
        network_data: NetworkData = value.data
×
42

43
        nodes_table = network_data.nodes.arrow_table
×
44

45
        # Get non-computed node columns (excluding internal columns starting with '_')
46
        nodes_table_columns = [LABEL_COLUMN_NAME]
×
47
        for column_name in nodes_table.column_names:
×
48
            if column_name.startswith("_"):
×
49
                continue
×
NEW
50
            nodes_table_columns.append(f'"{column_name}"')
×
51

52
        # Filter nodes by component and add a new sequential index
53
        nodes_query = f"""
×
54
            SELECT
55
                ROW_NUMBER() OVER (ORDER BY {NODE_ID_COLUMN_NAME}) - 1 AS {NODE_ID_COLUMN_NAME},
56
                {NODE_ID_COLUMN_NAME} AS old_node_id,
57
                {", ".join(nodes_table_columns)}  -- exclude NODE_ID_COLUMN_NAME since we're creating new_node_id
58
            FROM nodes_table
59
            WHERE {COMPONENT_ID_COLUMN_NAME} = {component_id}
60
            ORDER BY {NODE_ID_COLUMN_NAME}
61
        """
62
        nodes_result = duckdb.sql(nodes_query)
×
63

64
        # Create a mapping table for old_node_id -> new_node_id
65
        id_mapping_query = f"""
×
66
            SELECT old_node_id, {NODE_ID_COLUMN_NAME} FROM nodes_result
67
        """
68
        id_mapping_result = duckdb.sql(id_mapping_query)  # noqa
×
69

70
        # Get non-computed edge columns (excluding internal columns starting with '_')
71
        edges_table = network_data.edges.arrow_table
×
72
        edges_table_columns = [SOURCE_COLUMN_NAME, TARGET_COLUMN_NAME]
×
73
        for column_name in edges_table.column_names:
×
74
            if column_name.startswith("_"):
×
75
                continue
×
NEW
76
            edges_table_columns.append(f'"{column_name}"')
×
77

78
        # Filter edges by component and translate node IDs using the mapping
79
        edges_query = f"""
×
80
            SELECT
81
                src_map.{NODE_ID_COLUMN_NAME} AS {SOURCE_COLUMN_NAME},
82
                e.{SOURCE_COLUMN_NAME} AS old_source_id,
83
                tgt_map.{NODE_ID_COLUMN_NAME} AS {TARGET_COLUMN_NAME},
84
                e.{TARGET_COLUMN_NAME} AS old_target_id
85
                {", " + ", ".join(edges_table_columns[2:]) if len(edges_table_columns) > 2 else ""}
86
            FROM edges_table e
87
            JOIN id_mapping_result src_map ON e.{SOURCE_COLUMN_NAME} = src_map.old_node_id
88
            JOIN id_mapping_result tgt_map ON e.{TARGET_COLUMN_NAME} = tgt_map.old_node_id
89
            WHERE e.{COMPONENT_ID_COLUMN_NAME} = {component_id}
90
        """
91
        edges_result = duckdb.sql(edges_query)
×
92

93
        network_data_result = NetworkData.create_network_data(
×
94
            nodes_table=nodes_result.arrow(), edges_table=edges_result.arrow()
95
        )
96

97
        return network_data_result
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc