• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DHARPA-Project / kiara_plugin.network_analysis / 16113187676

07 Jul 2025 09:28AM UTC coverage: 54.789% (+0.1%) from 54.641%
16113187676

push

github

makkus
build: add marimo depenendcy

84 of 163 branches covered (51.53%)

Branch coverage included in aggregate %.

591 of 1069 relevant lines covered (55.29%)

2.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.79
/src/kiara_plugin/network_analysis/modules/create.py
1
# -*- coding: utf-8 -*-
2
from typing import Any, Dict, List, Mapping, Union
5✔
3

4
from pydantic import Field
5✔
5

6
from kiara.api import KiaraModule, ValueMap, ValueMapSchema
5✔
7
from kiara.exceptions import KiaraProcessingException
5✔
8
from kiara.models.filesystem import (
5✔
9
    KiaraFile,
10
)
11
from kiara.models.module import KiaraModuleConfig
5✔
12
from kiara.models.module.jobs import JobLog
5✔
13
from kiara.models.values.value import Value
5✔
14
from kiara.modules.included_core_modules.create_from import (
5✔
15
    CreateFromModule,
16
    CreateFromModuleConfig,
17
)
18
from kiara_plugin.network_analysis.defaults import (
5✔
19
    LABEL_ALIAS_NAMES,
20
    LABEL_COLUMN_NAME,
21
    NODE_ID_ALIAS_NAMES,
22
    NODE_ID_COLUMN_NAME,
23
    SOURCE_COLUMN_ALIAS_NAMES,
24
    SOURCE_COLUMN_NAME,
25
    TARGET_COLUMN_ALIAS_NAMES,
26
    TARGET_COLUMN_NAME,
27
)
28
from kiara_plugin.network_analysis.models import NetworkData
5✔
29
from kiara_plugin.network_analysis.utils import (
5✔
30
    guess_node_id_column_name,
31
    guess_node_label_column_name,
32
    guess_source_column_name,
33
    guess_target_column_name,
34
)
35
from kiara_plugin.tabular.models.table import KiaraTable
5✔
36

37
KIARA_METADATA = {
5✔
38
    "authors": [
39
        {"name": "Lena Jaskov", "email": "helena.jaskov@uni.lu"},
40
        {"name": "Markus Binsteiner", "email": "markus@frkl.io"},
41
    ],
42
    "description": "Modules to create/export network data.",
43
}
44

45

46
class CreateNetworkDataModuleConfig(CreateFromModuleConfig):
5✔
47
    ignore_errors: bool = Field(
5✔
48
        description="Whether to ignore convert errors and omit the failed items.",
49
        default=False,
50
    )
51

52

53
class CreateNetworkDataModule(CreateFromModule):
5✔
54
    _module_type_name = "create.network_data"
5✔
55
    _config_cls = CreateNetworkDataModuleConfig
5✔
56

57
    def create__network_data__from__file(self, source_value: Value) -> Any:
5✔
58
        """Create a table from a file, trying to auto-determine the format of said file.
59

60
        Supported file formats (at the moment):
61
        - gml
62
        - gexf
63
        - graphml (uses the standard xml library present in Python, which is insecure - see xml for additional information. Only parse GraphML files you trust)
64
        - pajek
65
        - leda
66
        - graph6
67
        - sparse6
68
        """
69

70
        source_file: KiaraFile = source_value.data
×
71
        # the name of the attribute kiara should use to populate the node labels
72
        label_attr_name: Union[str, None] = None
×
73
        # attributes to ignore when creating the node table,
74
        # mostly useful if we know that the file contains attributes that are not relevant for the network
75
        # or for 'label', if we don't want to duplicate the information in '_label' and 'label'
76
        ignore_node_attributes = None
×
77

78
        if source_file.file_name.endswith(".gml"):
×
79
            import networkx as nx
×
80

81
            # we use 'lable="id"' here because networkx is fussy about labels being unique and non-null
82
            # we use the 'label' attribute for the node labels manually later
83
            graph = nx.read_gml(source_file.path, label="id")
×
84
            label_attr_name = "label"
×
85
            ignore_node_attributes = ["label"]
×
86

87
        elif source_file.file_name.endswith(".gexf"):
×
88
            import networkx as nx
×
89

90
            graph = nx.read_gexf(source_file.path)
×
91
        elif source_file.file_name.endswith(".graphml"):
×
92
            import networkx as nx
×
93

94
            graph = nx.read_graphml(source_file.path)
×
95
        elif source_file.file_name.endswith(".pajek") or source_file.file_name.endswith(
×
96
            ".net"
97
        ):
98
            import networkx as nx
×
99

100
            graph = nx.read_pajek(source_file.path)
×
101
        elif source_file.file_name.endswith(".leda"):
×
102
            import networkx as nx
×
103

104
            graph = nx.read_leda(source_file.path)
×
105
        elif source_file.file_name.endswith(
×
106
            ".graph6"
107
        ) or source_file.file_name.endswith(".g6"):
108
            import networkx as nx
×
109

110
            graph = nx.read_graph6(source_file.path)
×
111
        elif source_file.file_name.endswith(
×
112
            ".sparse6"
113
        ) or source_file.file_name.endswith(".s6"):
114
            import networkx as nx
×
115

116
            graph = nx.read_sparse6(source_file.path)
×
117
        else:
118
            supported_file_estensions = [
×
119
                "gml",
120
                "gexf",
121
                "graphml",
122
                "pajek",
123
                "leda",
124
                "graph6",
125
                "g6",
126
                "sparse6",
127
                "s6",
128
            ]
129

130
            msg = f"Can't create network data for unsupported format of file: {source_file.file_name}. Supported file extensions: {', '.join(supported_file_estensions)}"
×
131

132
            raise KiaraProcessingException(msg)
×
133

134
        return NetworkData.create_from_networkx_graph(
×
135
            graph=graph,
136
            label_attr_name=label_attr_name,
137
            ignore_node_attributes=ignore_node_attributes,
138
        )
139

140

141
class AssembleNetworkDataModuleConfig(KiaraModuleConfig):
5✔
142
    node_id_column_aliases: List[str] = Field(
5✔
143
        description="Alias strings to test (in order) for auto-detecting the node id column.",
144
        default=NODE_ID_ALIAS_NAMES,
145
    )  # pydantic should handle that correctly (deepcopy) -- and anyway, it's immutable (hopefully)
146
    label_column_aliases: List[str] = Field(
5✔
147
        description="Alias strings to test (in order) for auto-detecting the node label column.",
148
        default=LABEL_ALIAS_NAMES,
149
    )
150
    source_column_aliases: List[str] = Field(
5✔
151
        description="Alias strings to test (in order) for auto-detecting the source column.",
152
        default=SOURCE_COLUMN_ALIAS_NAMES,
153
    )
154
    target_column_aliases: List[str] = Field(
5✔
155
        description="Alias strings to test (in order) for auto-detecting the target column.",
156
        default=TARGET_COLUMN_ALIAS_NAMES,
157
    )
158

159

160
class AssembleGraphFromTablesModule(KiaraModule):
5✔
161
    """Create a 'network_data' instance from one or two tables.
162

163
    This module needs at least one table as input, providing the edges of the resulting network data set.
164
    If no further table is created, basic node information will be automatically created by using unique values from the edges source and target columns.
165

166
    If no `source_column_name` (and/or `target_column_name`) is provided, *kiara* will try to auto-detect the most likely of the existing columns to use. If that is not possible, an error will be raised.
167
    """
168

169
    _module_type_name = "assemble.network_data"
5✔
170
    _config_cls = AssembleNetworkDataModuleConfig
5✔
171

172
    def create_inputs_schema(
5✔
173
        self,
174
    ) -> ValueMapSchema:
175
        inputs: Mapping[str, Any] = {
5✔
176
            "edges": {
177
                "type": "table",
178
                "doc": "A table that contains the edges data.",
179
                "optional": False,
180
            },
181
            "source_column": {
182
                "type": "string",
183
                "doc": "The name of the source column name in the edges table.",
184
                "optional": True,
185
            },
186
            "target_column": {
187
                "type": "string",
188
                "doc": "The name of the target column name in the edges table.",
189
                "optional": True,
190
            },
191
            "edges_column_map": {
192
                "type": "dict",
193
                "doc": "An optional map of original column name to desired.",
194
                "optional": True,
195
            },
196
            "nodes": {
197
                "type": "table",
198
                "doc": "A table that contains the nodes data.",
199
                "optional": True,
200
            },
201
            "id_column": {
202
                "type": "string",
203
                "doc": "The name (before any potential column mapping) of the node-table column that contains the node identifier (used in the edges table).",
204
                "optional": True,
205
            },
206
            "label_column": {
207
                "type": "string",
208
                "doc": "The name of a column that contains the node label (before any potential column name mapping). If not specified, the value of the id value will be used as label.",
209
                "optional": True,
210
            },
211
            "nodes_column_map": {
212
                "type": "dict",
213
                "doc": "An optional map of original column name to desired.",
214
                "optional": True,
215
            },
216
        }
217
        return inputs
5✔
218

219
    def create_outputs_schema(
5✔
220
        self,
221
    ) -> ValueMapSchema:
222
        outputs: Mapping[str, Any] = {
5✔
223
            "network_data": {"type": "network_data", "doc": "The network/graph data."}
224
        }
225
        return outputs
5✔
226

227
    def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog) -> None:
5✔
228
        import polars as pl
5✔
229

230
        # process nodes
231
        nodes = inputs.get_value_obj("nodes")
5✔
232

233
        # the nodes column map can be used to rename attribute columns in the nodes table
234
        nodes_column_map: Dict[str, str] = inputs.get_value_data("nodes_column_map")
5✔
235
        if nodes_column_map is None:
5✔
236
            nodes_column_map = {}
5✔
237

238
        # we need to process the nodes first, because if we have nodes, we need to create the node id map that translates from the original
239
        # id to the new, internal, integer-based one
240

241
        if nodes.is_set:
5✔
242
            job_log.add_log("processing nodes table")
5✔
243

244
            nodes_table: KiaraTable = nodes.data
5✔
245
            assert nodes_table is not None
5✔
246

247
            nodes_column_names = nodes_table.column_names
5✔
248

249
            # the most important column is the id column, which is the only one that we absolutely need to have
250
            id_column_name = inputs.get_value_data("id_column")
5✔
251

252
            if id_column_name is None:
5✔
253
                # try to auto-detect the id column
254
                column_names_to_test = self.get_config_value("node_id_column_aliases")
5✔
255
                id_column_name = guess_node_id_column_name(
5✔
256
                    nodes_table=nodes_table, suggestions=column_names_to_test
257
                )
258

259
                if id_column_name is None:
5✔
260
                    raise KiaraProcessingException(
×
261
                        f"Could not auto-determine id column name. Please specify one manually, using one of: {', '.join(nodes_column_names)}"
262
                    )
263
                else:
264
                    job_log.add_log(f"auto-detected id column: {id_column_name}")
5✔
265

266
            if id_column_name not in nodes_column_names:
5✔
267
                raise KiaraProcessingException(
×
268
                    f"Could not find id column '{id_column_name}' in the nodes table. Please specify a valid column name manually, using one of: {', '.join(nodes_column_names)}"
269
                )
270

271
            nodes_column_map[id_column_name] = NODE_ID_COLUMN_NAME
5✔
272
            if id_column_name in nodes_column_map.keys():
5✔
273
                if nodes_column_map[id_column_name] != NODE_ID_COLUMN_NAME:
5✔
274
                    raise KiaraProcessingException(
×
275
                        f"Existing mapping of id column name '{id_column_name}' is not mapped to '{NODE_ID_COLUMN_NAME}' in the 'nodes_column_map' input."
276
                    )
277
            else:
278
                nodes_column_map[id_column_name] = NODE_ID_COLUMN_NAME
×
279

280
            # the label is optional, if not specified, we try to auto-detect it. If not possible, we will use the (stringified) id column as label.
281
            label_column_name = inputs.get_value_data("label_column")
5✔
282
            if label_column_name is None:
5✔
283
                column_names_to_test = self.get_config_value("label_column_aliases")
5✔
284
                label_column_name = guess_node_label_column_name(
5✔
285
                    nodes_table=nodes_table, suggestions=column_names_to_test
286
                )
287

288
                if label_column_name and label_column_name not in nodes_column_names:
5✔
289
                    raise KiaraProcessingException(
×
290
                        f"Could not find id column '{id_column_name}' in the nodes table. Please specify a valid column name manually, using one of: {', '.join(nodes_column_names)}"
291
                    )
292

293
                if label_column_name is not None:
5✔
294
                    job_log.add_log(f"auto-detected label column: {label_column_name}")
5✔
295
                else:
296
                    job_log.add_log(
×
297
                        "no label column found, will use id column as label"
298
                    )
299

300
            nodes_arrow_dataframe = nodes_table.to_polars_dataframe()
5✔
301

302
        else:
303
            nodes_arrow_dataframe = None
×
304
            label_column_name = None
×
305

306
        # process edges
307

308
        job_log.add_log("processing edges table")
5✔
309
        edges = inputs.get_value_obj("edges")
5✔
310
        edges_table: KiaraTable = edges.data
5✔
311
        edges_source_column_name = inputs.get_value_data("source_column")
5✔
312
        edges_target_column_name = inputs.get_value_data("target_column")
5✔
313

314
        edges_arrow_dataframe = edges_table.to_polars_dataframe()
5✔
315
        edges_column_names = edges_arrow_dataframe.columns
5✔
316

317
        if edges_source_column_name is None:
5✔
318
            column_names_to_test = self.get_config_value("source_column_aliases")
5✔
319
            edges_source_column_name = guess_source_column_name(
5✔
320
                edges_table=edges_table, suggestions=column_names_to_test
321
            )
322
            if edges_target_column_name is not None:
5✔
323
                job_log.add_log(
×
324
                    f"auto-detected source column: {edges_source_column_name}"
325
                )
326

327
        if edges_target_column_name is None:
5✔
328
            column_names_to_test = self.get_config_value("target_column_aliases")
5✔
329
            edges_target_column_name = guess_target_column_name(
5✔
330
                edges_table=edges_table, suggestions=column_names_to_test
331
            )
332
            if edges_target_column_name is not None:
5✔
333
                job_log.add_log(
5✔
334
                    f"auto-detected target column: {edges_target_column_name}"
335
                )
336

337
        if not edges_source_column_name or not edges_target_column_name:
5✔
338
            if not edges_source_column_name and not edges_target_column_name:
×
339
                if len(edges_column_names) == 2:
×
340
                    job_log.add_log(
×
341
                        "using first two columns as source and target columns"
342
                    )
343
                    edges_source_column_name = edges_column_names[0]
×
344
                    edges_target_column_name = edges_column_names[1]
×
345
                else:
346
                    raise KiaraProcessingException(
×
347
                        f"Could not auto-detect source and target column names. Please specify them manually using one of: {', '.join(edges_column_names)}."
348
                    )
349

350
            if not edges_source_column_name:
×
351
                raise KiaraProcessingException(
×
352
                    f"Could not auto-detect source column name. Please specify it manually using one of: {', '.join(edges_column_names)}."
353
                )
354

355
            if not edges_target_column_name:
×
356
                raise KiaraProcessingException(
×
357
                    f"Could not auto-detect target column name. Please specify it manually using one of: {', '.join(edges_column_names)}."
358
                )
359

360
        edges_column_map: Dict[str, str] = inputs.get_value_data("edges_column_map")
5✔
361
        if edges_column_map is None:
5✔
362
            edges_column_map = {}
5✔
363

364
        if edges_source_column_name in edges_column_map.keys():
5✔
365
            if edges_column_map[edges_source_column_name] != SOURCE_COLUMN_NAME:
×
366
                raise KiaraProcessingException(
×
367
                    f"Existing mapping of source column name '{edges_source_column_name}' is not mapped to '{SOURCE_COLUMN_NAME}' in the 'edges_column_map' input."
368
                )
369
        else:
370
            edges_column_map[edges_source_column_name] = SOURCE_COLUMN_NAME
5✔
371

372
        if edges_target_column_name in edges_column_map.keys():
5✔
373
            if edges_column_map[edges_target_column_name] == SOURCE_COLUMN_NAME:
×
374
                raise KiaraProcessingException(
×
375
                    msg="Edges and source column names can't be the same."
376
                )
377
            if edges_column_map[edges_target_column_name] != TARGET_COLUMN_NAME:
×
378
                raise KiaraProcessingException(
×
379
                    f"Existing mapping of target column name '{edges_target_column_name}' is not mapped to '{TARGET_COLUMN_NAME}' in the 'edges_column_map' input."
380
                )
381
        else:
382
            edges_column_map[edges_target_column_name] = TARGET_COLUMN_NAME
5✔
383

384
        if edges_source_column_name not in edges_column_names:
5✔
385
            raise KiaraProcessingException(
×
386
                f"Edges table does not contain source column '{edges_source_column_name}'. Choose one of: {', '.join(edges_column_names)}."
387
            )
388
        if edges_target_column_name not in edges_column_names:
5✔
389
            raise KiaraProcessingException(
×
390
                f"Edges table does not contain target column '{edges_target_column_name}'. Choose one of: {', '.join(edges_column_names)}."
391
            )
392

393
        source_column_old = edges_arrow_dataframe.get_column(edges_source_column_name)
5✔
394
        target_column_old = edges_arrow_dataframe.get_column(edges_target_column_name)
5✔
395

396
        job_log.add_log("generating node id map and nodes table")
5✔
397
        # fill out the node id map
398
        unique_node_ids_old = (
5✔
399
            pl.concat([source_column_old, target_column_old], rechunk=False)
400
            .unique()
401
            .sort()
402
        )
403

404
        if nodes_arrow_dataframe is None:
5✔
405
            new_node_ids = range(0, len(unique_node_ids_old))  # noqa: PIE808
×
406
            node_id_map = dict(zip(unique_node_ids_old, new_node_ids))
×
407
            # node_id_map = {
408
            #     node_id: new_node_id
409
            #     for node_id, new_node_id in
410
            # }
411

412
            nodes_arrow_dataframe = pl.DataFrame(
×
413
                {
414
                    NODE_ID_COLUMN_NAME: new_node_ids,
415
                    LABEL_COLUMN_NAME: (str(x) for x in unique_node_ids_old),
416
                    "id": unique_node_ids_old,
417
                }
418
            )
419

420
        else:
421
            id_column_old = nodes_arrow_dataframe.get_column(id_column_name)
5✔
422
            unique_node_ids_nodes_table = id_column_old.unique().sort()
5✔
423

424
            if len(unique_node_ids_old) > len(unique_node_ids_nodes_table):
5✔
425
                ~(unique_node_ids_old.is_in(unique_node_ids_nodes_table))
×
426
                raise NotImplementedError("MISSING NODE IDS NOT IMPLEMENTED YET")
427
            else:
428
                new_node_ids = range(0, len(id_column_old))  # noqa: PIE808
5✔
429
                node_id_map = dict(zip(id_column_old, new_node_ids))
5✔
430
                # node_id_map = {
431
                #     node_id: new_node_id
432
                #     for node_id, new_node_id in
433
                # }
434
                new_idx_series = pl.Series(
5✔
435
                    name=NODE_ID_COLUMN_NAME, values=new_node_ids
436
                )
437
                nodes_arrow_dataframe.insert_column(0, new_idx_series)
5✔
438

439
                if not label_column_name:
5✔
440
                    label_column_name = NODE_ID_COLUMN_NAME
×
441

442
                # we create a copy of the label column, and stringify its items
443

444
                label_column = nodes_arrow_dataframe.get_column(
5✔
445
                    label_column_name
446
                ).rename(LABEL_COLUMN_NAME)
447
                if label_column.dtype != pl.Utf8:
5✔
448
                    label_column = label_column.cast(pl.Utf8)
×
449

450
                if label_column.null_count() != 0:
5✔
451
                    raise KiaraProcessingException(
×
452
                        f"Label column '{label_column_name}' contains null values. This is not allowed."
453
                    )
454

455
                nodes_arrow_dataframe = nodes_arrow_dataframe.insert_column(
5✔
456
                    1, label_column
457
                )
458

459
        # TODO: deal with different types if node ids are strings or integers
460
        try:
5✔
461
            source_column_mapped = source_column_old.replace_strict(
5✔
462
                node_id_map, default=None
463
            ).rename(SOURCE_COLUMN_NAME)
464
        except Exception:
×
465
            raise KiaraProcessingException(
×
466
                "Could not map node ids onto edges source column.  In most cases the issue is that your node ids have a different data type in your nodes table as in the source column of your edges table."
467
            )
468

469
        if source_column_mapped.is_null().any():
5✔
470
            raise KiaraProcessingException(
×
471
                "The source column contains values that are not mapped in the nodes table."
472
            )
473

474
        try:
5✔
475
            target_column_mapped = target_column_old.replace_strict(
5✔
476
                node_id_map, default=None
477
            ).rename(TARGET_COLUMN_NAME)
478
        except Exception:
×
479
            raise KiaraProcessingException(
×
480
                "Could not map node ids onto edges source column.  In most cases the issue is that your node ids have a different data type in your nodes table as in the target column of your edges table."
481
            )
482

483
        if target_column_mapped.is_null().any():
5✔
484
            raise KiaraProcessingException(
×
485
                "The target column contains values that are not mapped in the nodes table."
486
            )
487

488
        edges_arrow_dataframe.insert_column(0, source_column_mapped)
5✔
489
        edges_arrow_dataframe.insert_column(1, target_column_mapped)
5✔
490

491
        edges_arrow_dataframe = edges_arrow_dataframe.drop(edges_source_column_name)
5✔
492
        edges_arrow_dataframe = edges_arrow_dataframe.drop(edges_target_column_name)
5✔
493

494
        edges_arrow_table = edges_arrow_dataframe.to_arrow()
5✔
495
        # edges_table_augmented = augment_edges_table_with_weights(edges_arrow_dataframe)
496

497
        # # TODO: also index the other columns?
498
        # edges_data_schema = create_sqlite_schema_data_from_arrow_table(
499
        #     table=edges_arrow_dataframe,
500
        #     index_columns=[SOURCE_COLUMN_NAME, TARGET_COLUMN_NAME],
501
        #     column_map=edges_column_map,
502
        # )
503

504
        nodes_arrow_table = nodes_arrow_dataframe.to_arrow()
5✔
505

506
        job_log.add_log("creating network data instance")
5✔
507
        network_data = NetworkData.create_network_data(
5✔
508
            nodes_table=nodes_arrow_table, edges_table=edges_arrow_table
509
        )
510

511
        outputs.set_value("network_data", network_data)
5✔
512

513

514
# class FilteredNetworkDataModule(KiaraModule):
515
#     """Create a new network_data instance from an existing one, using only a sub-set of nodes and/or edges."""
516
#
517
#     def create_inputs_schema(
518
#         self,
519
#     ) -> ValueMapSchema:
520
#         return {}
521
#
522
#     def create_outputs_schema(
523
#         self,
524
#     ) -> ValueMapSchema:
525
#         return {}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc