• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

openbraininstitute / neurodamus / 20426309593

22 Dec 2025 08:29AM UTC coverage: 90.489% (+0.01%) from 90.478%
20426309593

Pull #436

github

web-flow
Merge fe85e8962 into 913893578
Pull Request #436: Remove using `:` as a way to split sub-parts of Edges, Targets, etc

52 of 55 new or added lines in 5 files covered. (94.55%)

1 existing line in 1 file now uncovered.

7364 of 8138 relevant lines covered (90.49%)

2.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.89
/neurodamus/node.py
1
# Neurodamus
2
# Copyright 2018 - Blue Brain Project, EPFL
3
from __future__ import annotations
4✔
4

5
from typing import TYPE_CHECKING
4✔
6

7
if TYPE_CHECKING:
8
    from neurodamus.report_parameters import ReportParameters
9

10
import gc
4✔
11
import glob
4✔
12
import itertools
4✔
13
import logging
4✔
14
import math
4✔
15
import os
4✔
16
import shutil
4✔
17
from collections import defaultdict
4✔
18
from contextlib import contextmanager
4✔
19
from os import path as ospath
4✔
20
from pathlib import Path
4✔
21

22
import libsonata
4✔
23

24
# Internal Plugins
25
from . import ngv as _ngv
4✔
26
from .cell_distributor import (
4✔
27
    CellDistributor,
28
    GlobalCellManager,
29
    LoadBalance,
30
    LoadBalanceMode,
31
    VirtualCellPopulation,
32
)
33
from .connection_manager import SynapseRuleManager, edge_node_pop_names
4✔
34
from .core import (
4✔
35
    MPI,
36
    NeuronWrapper as Nd,
37
    SimulationProgress,
38
    mpi_no_errors,
39
    return_neuron_timings,
40
    run_only_rank0,
41
)
42
from .core._engine import EngineBase
4✔
43
from .core._shmutils import SHMUtil
4✔
44
from .core.configuration import (
4✔
45
    CircuitConfig,
46
    ConfigurationError,
47
    Feature,
48
    GlobalConfig,
49
    SimConfig,
50
    _SimConfig,
51
    get_debug_cell_gids,
52
    make_circuit_config,
53
)
54
from .core.coreneuron_configuration import (
4✔
55
    CompartmentMapping,
56
    CoreConfig,
57
)
58
from .core.nodeset import PopulationNodes
4✔
59
from .gap_junction import GapJunctionManager
4✔
60
from .io.sonata_config import ConnectionTypes
4✔
61
from .modification_manager import ModificationManager
4✔
62
from .neuromodulation_manager import NeuroModulationManager
4✔
63
from .replay import MissingSpikesPopulationError, SpikeManager
4✔
64
from .report import create_report
4✔
65
from .report_parameters import (
4✔
66
    CompartmentType,
67
    ReportType,
68
    SectionType,
69
    check_report_parameters,
70
    create_report_parameters,
71
)
72
from .stimulus_manager import StimulusManager
4✔
73
from .target_manager import TargetManager, TargetSpec
4✔
74
from .utils.logging import log_stage, log_verbose
4✔
75
from .utils.memory import DryRunStats, free_event_queues, pool_shrink, print_mem_usage, trim_memory
4✔
76
from .utils.pyutils import cache_errors
4✔
77
from .utils.timeit import TimerManager, timeit
4✔
78
from neurodamus.core.coreneuron_report_config import CoreReportConfig, CoreReportConfigEntry
4✔
79
from neurodamus.core.coreneuron_simulation_config import CoreSimulationConfig
4✔
80
from neurodamus.utils.pyutils import CumulativeError, rmtree
4✔
81

82

83
class METypeEngine(EngineBase):
4✔
84
    CellManagerCls = CellDistributor
4✔
85
    InnerConnectivityCls = SynapseRuleManager
4✔
86
    ConnectionTypes = {
4✔
87
        None: SynapseRuleManager,
88
        ConnectionTypes.Synaptic: SynapseRuleManager,
89
        ConnectionTypes.GapJunction: GapJunctionManager,
90
        ConnectionTypes.NeuroModulation: NeuroModulationManager,
91
    }
92
    CircuitPrecedence = 0
4✔
93

94

95
class CircuitManager:
4✔
96
    """Holds and manages populations and associated nodes and edges
97

98
    All nodes must have a name or read from sonata pop name
99
    As so, Sonata is preferred when using multiple node files
100
    """
101

102
    def __init__(self):
4✔
103
        self.node_managers = {}  # dict {pop_name -> cell_manager}  # nrn pop is None
3✔
104
        self.virtual_node_managers = {}  # same, but for virtual ones (no cells)
3✔
105
        # dict {(src_pop, dst_pop) -> list[synapse_manager]}
106
        self.edge_managers = defaultdict(list)
3✔
107
        self.alias = {}  # dict {name -> pop_name}
3✔
108
        self.global_manager = GlobalCellManager()
3✔
109
        self.global_target = TargetManager.create_global_target()
3✔
110

111
    def initialized(self):
4✔
112
        return bool(self.node_managers)
3✔
113

114
    def register_node_manager(self, cell_manager):
4✔
115
        pop = cell_manager.population_name
3✔
116
        if pop in self.node_managers:
3✔
117
            raise ConfigurationError(f"Already existing node manager for population {pop}")
×
118
        self.node_managers[pop] = cell_manager
3✔
119
        self.alias[cell_manager.circuit_name] = pop
3✔
120
        self.global_manager.register_manager(cell_manager)
3✔
121
        if cell_manager.is_initialized():
3✔
122
            self.global_target.append_nodeset(cell_manager.local_nodes)
3✔
123

124
    def _new_virtual_node_manager(self, circuit):
4✔
125
        """Instantiate a new virtual node manager explicitly."""
126
        storage = libsonata.NodeStorage(circuit.CellLibraryFile)
1✔
127
        pop_name, _ = circuit.CircuitTarget.split(":")  # Sonata config fills population
1✔
128
        node_size = storage.open_population(pop_name).size
1✔
129
        gid_vec = list(range(1, node_size + 1))
1✔
130
        virtual_cell_manager = VirtualCellPopulation(pop_name, gid_vec)
1✔
131
        self.virtual_node_managers[pop_name] = virtual_cell_manager
1✔
132
        self.global_target.append_nodeset(virtual_cell_manager.local_nodes)
1✔
133
        return virtual_cell_manager
1✔
134

135
    @staticmethod
4✔
136
    def new_node_manager_bare(circuit: CircuitConfig, target_manager, run_conf, **kwargs):
4✔
137
        engine = circuit.Engine or METypeEngine
3✔
138
        CellManagerCls = engine.CellManagerCls or CellDistributor
3✔
139
        return CellManagerCls(circuit, target_manager, run_conf, **kwargs)
3✔
140

141
    def new_node_manager(self, circuit, target_manager, run_conf, *, load_balancer=None, **kwargs):
4✔
142
        if circuit.get("PopulationType") == "virtual":
3✔
143
            return self._new_virtual_node_manager(circuit)
1✔
144
        cell_manager = self.new_node_manager_bare(circuit, target_manager, run_conf, **kwargs)
3✔
145
        cell_manager.load_nodes(load_balancer, **kwargs)
3✔
146
        self.register_node_manager(cell_manager)
3✔
147
        return cell_manager
3✔
148

149
    def get_node_manager(self, name):
4✔
150
        name = self.alias.get(name, name)
3✔
151
        return self.node_managers.get(name)
3✔
152

153
    def has_population(self, pop_name):
4✔
154
        return pop_name in self.node_managers
3✔
155

156
    def unalias_pop_keys(self, source, destination):
4✔
157
        """Un-alias population names"""
158
        return self.alias.get(source, source), self.alias.get(destination, destination)
3✔
159

160
    def get_edge_managers(self, source, destination):
4✔
161
        edge_pop_keys = self.unalias_pop_keys(source, destination)
3✔
162
        return self.edge_managers.get(edge_pop_keys) or []
3✔
163

164
    def get_edge_manager(self, source, destination, conn_type=SynapseRuleManager):
4✔
165
        managers = [
3✔
166
            manager
167
            for manager in self.get_edge_managers(source, destination)
168
            if isinstance(manager, conn_type)
169
        ]
170
        return managers[0] if managers else None
3✔
171

172
    def get_create_edge_manager(
4✔
173
        self, conn_type, source, destination, src_target, manager_args=(), **kw
174
    ):
175
        source, destination = self.unalias_pop_keys(source, destination)
3✔
176
        manager = self.get_edge_manager(source, destination, conn_type)
3✔
177
        if manager:
3✔
178
            return manager
×
179

180
        if not self.has_population(destination):
3✔
181
            raise ConfigurationError("Can't find projection Node population: " + destination)
×
182

183
        src_manager = self.node_managers.get(source) or self.virtual_node_managers.get(source)
3✔
184
        if src_manager is None:  # src manager may not exist -> virtual
3✔
185
            log_verbose("No known population %s. Creating Virtual src for projection", source)
1✔
186
            if conn_type not in {SynapseRuleManager, _ngv.GlioVascularManager}:
1✔
187
                raise ConfigurationError("Custom connections require instantiated source nodes")
×
188
            src_manager = VirtualCellPopulation(source, None, src_target.name)
1✔
189

190
        target_cell_manager = kw["cell_manager"] = self.node_managers[destination]
3✔
191
        kw["src_cell_manager"] = src_manager
3✔
192
        manager = conn_type(*manager_args, **kw)
3✔
193
        self.edge_managers[source, destination].append(manager)
3✔
194
        target_cell_manager.register_connection_manager(manager)
3✔
195
        return manager
3✔
196

197
    def all_node_managers(self):
4✔
198
        return self.node_managers.values()
3✔
199

200
    def all_synapse_managers(self):
4✔
201
        return itertools.chain.from_iterable(self.edge_managers.values())
3✔
202

203
    @staticmethod
4✔
204
    @run_only_rank0
4✔
205
    def write_population_offsets(pop_offsets, alias_pop, virtual_pop_offsets):
4✔
206
        """Write population_offsets where appropriate
207

208
        It is needed for retrieving population offsets for reporting and replay at restore time.
209

210
        Format population name::gid offset::population alias
211
        The virtual population offset is also written for synapse replay in restore.
212
        The data comes from outside because pop_offsets are not initialized
213
        in a restore scenario.
214
        """
215
        # populations_offset is necessary in output_path
216
        output_path = SimConfig.populations_offset_output_path(create=True)
3✔
217

218
        with open(output_path, "w", encoding="utf-8") as f:
3✔
219
            f.writelines(
3✔
220
                "{}::{}::{}\n".format(pop or " ", pop_offsets[pop], alias or " ")
221
                for alias, pop in alias_pop.items()
222
            )
223
            f.writelines(
3✔
224
                "{}::{}::{}\n".format(pop, offset, "virtual")
225
                for pop, offset in virtual_pop_offsets.items()
226
            )
227

228
        # Add a file in save_path too if required
229
        if SimConfig.save:
3✔
230
            save_path = SimConfig.populations_offset_save_path(create=True)
1✔
231
            shutil.copy(output_path, save_path)
1✔
232

233
    def get_population_offsets(self):
4✔
234
        pop_offsets = {
3✔
235
            pop_name: node_manager.local_nodes.offset
236
            for pop_name, node_manager in self.node_managers.items()
237
        }
238
        alias_pop = dict(self.alias)
3✔
239
        return pop_offsets, alias_pop
3✔
240

241
    def get_virtual_population_offsets(self):
4✔
242
        pop_offsets = {
3✔
243
            pop_name: node_manager.local_nodes.offset
244
            for pop_name, node_manager in self.virtual_node_managers.items()
245
        }
246
        return pop_offsets
3✔
247

248
    @classmethod
4✔
249
    def read_population_offsets(cls, file_path=None):
4✔
250
        """Read population offsets from populations_offset.dat"""
251
        pop_offsets = {}
1✔
252
        alias_pop = {}
1✔
253
        virtual_pop_offsets = {}
1✔
254
        with open(file_path or SimConfig.populations_offset_restore_path(), encoding="utf-8") as f:
1✔
255
            for line in f:
1✔
256
                pop, offset, alias = line.strip().split("::")
1✔
257
                pop = pop or None
1✔
258
                alias = alias or None
1✔
259
                if alias == "virtual":
1✔
260
                    virtual_pop_offsets[pop] = int(offset)
×
261
                else:
262
                    pop_offsets[pop] = int(offset)
1✔
263
                    alias_pop[alias] = pop
1✔
264

265
        return pop_offsets, alias_pop, virtual_pop_offsets
1✔
266

267
    def __del__(self):
4✔
268
        """De-init. Edge managers must be destructed first"""
269
        del self.edge_managers
3✔
270
        del self.virtual_node_managers
3✔
271
        del self.node_managers
3✔
272

273

274
class Node:
4✔
275
    """The Node class is the main entity for a distributed Neurodamus execution.
276

277
    Note that this concept of a "Node" differs from both an MPI node, which
278
    refers to a process in a parallel computing environment, and a node in the
279
    circuit graph, which represents an individual element or component within
280
    the simulation's neural network.
281

282
    It serves as the orchestrator for the entire simulation, managing the
283
    parallel execution of the model and distributing the cells across different
284
    computational ranks. As the primary control structure, Node is responsible
285
    for coordinating various components involved in the simulation.
286

287
    Internally, the Node class instantiates and manages parallel structures,
288
    dividing the simulation workload among multiple ranks. With the introduction
289
    of the concept of multiple populations (also known as multi-circuit), the
290
    Node class takes partial responsibility for handling this logic, aided by
291
    the :class:`neurodamus.node.CircuitManager` class (accessible via the
292
    `circuits` property), which manages the different node and edge managers.
293

294
    While many lower-level details of the Node's functionality are encapsulated
295
    within dedicated helper classes, the Node class still exposes an API that
296
    allows advanced users to control and inspect almost every major step of the
297
    simulation. For a standard run, users are encouraged to use the higher-level
298
    `Neurodamus` class instead, which simplifies some of the complexities
299
    handled by Node.
300

301
    The Node class exposes the following public properties:
302

303
    - `circuits`: is a :class:`neurodamus.node.CircuitManager` object,
304
      responsible for managing multiple node and edge managers within the
305
      simulation.
306
    - `target_manager`: is a :class:`neurodamus.target_manager.TargetManager`
307
      object, responsible for managing the targets in the simulation.
308
    - `stimulus_manager`: is a
309
      :class:`neurodamus.stimulus_manager.StimulusManager` object, responsible
310
      for interpreting and instantiating stimulus events.
311
    - `elec_manager`: The electrode manager, which controls the interaction with
312
      simulation electrodes.
313
    - `reports`: A list of Neurodamus Report `hoc` objects, used to generate
314
      simulation reports.
315

316
    Note that, while the Node object owns and manages most of the top-level
317
    objects in the simulation, the management of cell and synapse objects has
318
    been delegated to the `Circuits` class, as these are now handled at a lower
319
    level.
320

321
    Technical note:
322

323
    - The properties exposed by Node are read-only, with most internal
324
      attributes being prefixed with an underscore (`_`). Notable internal
325
      attributes include:
326

327
      `self._sonata_circuits`: The SONATA circuits used by the Node
328
      each represents a node population.
329

330
    These details make the Node class versatile and powerful for advanced users
331
    who need more granular control over the simulation process.
332
    """
333

334
    _default_population = "All"
4✔
335
    """The default population name for e.g. Reports."""
4✔
336

337
    def __init__(self, config_file, options: dict | None = None):
4✔
338
        """Creates a neurodamus executor
339
        Args:
340
            config_file: A Sonata config file
341
            options: A dictionary of run options typically coming from cmd line
342
        """
343
        options = options or {}
3✔
344
        assert isinstance(config_file, str), "`config_file` should be a string"
3✔
345
        assert config_file, "`config_file` cannot be empty"
3✔
346

347
        if config_file.endswith("BlueConfig"):
3✔
348
            raise ConfigurationError(
×
349
                "Legacy format BlueConfig is not supported, please migrate to SONATA config"
350
            )
351
        import libsonata
3✔
352

353
        conf = libsonata.SimulationConfig.from_file(config_file)
3✔
354
        Nd.init(log_filename=conf.output.log_file, log_use_color=options.pop("use_color", True))
3✔
355

356
        # This is global initialization, happening once, regardless of number of
357
        # cycles
358
        log_stage("Setting up Neurodamus configuration")
3✔
359
        self._pc = Nd.pc
3✔
360
        self._spike_vecs = []
3✔
361
        self._spike_populations = []
3✔
362
        Nd.execute("cvode = new CVode()")
3✔
363
        SimConfig.init(config_file, options)
3✔
364

365
        if SimConfig.use_coreneuron:
3✔
366
            # Instantiate the CoreNEURON artificial cell object which is used to fill up
367
            # the empty ranks. This need to be done before the circuit is
368
            # finitialized
369
            CoreConfig.instantiate_artificial_cell()
2✔
370

371
        self._run_conf = SimConfig.run_conf
3✔
372
        self._target_manager = TargetManager(self._run_conf)
3✔
373
        self._target_spec = TargetSpec(self._run_conf.get("CircuitTarget"))
3✔
374
        if SimConfig.use_neuron or SimConfig.coreneuron_direct_mode:
3✔
375
            self._sonatareport_helper = Nd.SonataReportHelper(Nd.dt, True)  # noqa: FBT003
3✔
376
        self._sonata_circuits = SimConfig.sonata_circuits
3✔
377
        self._dump_cell_state_gids = get_debug_cell_gids(options)
3✔
378
        self._core_replay_file = ""
3✔
379
        self._is_ngv_run = any(
3✔
380
            c.Engine.__name__ == "NGVEngine" for c in self._sonata_circuits.values() if c.Engine
381
        )
382
        self._initial_rss = 0
3✔
383
        self._cycle_i = 0
3✔
384
        self._n_cycles = 1
3✔
385
        self._shm_enabled = False
3✔
386
        self._dry_run_stats = None
3✔
387

388
        self._reset()
3✔
389

390
    def _reset(self):
4✔
391
        """Resets internal state for a new simulation cycle.
392

393
        Ensures `_run_conf` is a valid dictionary, initializes core attributes,
394
        and registers global targets and cell managers.
395

396
        Note: remember to call Nd.init(...) before to ensure/load neurodamus mods
397
        """
398
        if not self._run_conf or not isinstance(self._run_conf, dict):
3✔
399
            raise ValueError("Invalid `_run_conf`: Must be a dictionary for multi-cycle runs.")
×
400

401
        # Init unconditionally
402
        self._circuits = CircuitManager()
3✔
403
        self._stim_list = None
3✔
404
        self._report_list = None
3✔
405
        self._stim_manager = None
3✔
406
        self._sim_ready = False
3✔
407
        # flag to mark what we already dumped
408
        self._last_cell_state_dump_t = None
3✔
409

410
        self._bbss = Nd.BBSaveState()
3✔
411

412
        # Register the global target and cell manager
413
        self._target_manager.register_target(self._circuits.global_target)
3✔
414
        self._target_manager.register_cell_manager(self._circuits.global_manager)
3✔
415

416
    # public 'read-only' properties - object modification on user responsibility
417
    circuits = property(lambda self: self._circuits)
4✔
418
    target_manager = property(lambda self: self._target_manager)
4✔
419
    stim_manager = property(lambda self: self._stim_manager)
4✔
420
    stims = property(lambda self: self._stim_list)
4✔
421
    reports = property(lambda self: self._report_list)
4✔
422

423
    def all_circuits(self):
4✔
424
        yield from self._sonata_circuits.values()
3✔
425

426
    # -
427
    def load_targets(self):
4✔
428
        """Initialize targets. Nodesets are loaded on demand."""
429
        for circuit in self.all_circuits():
3✔
430
            log_verbose("Loading targets for circuit %s", circuit.name or "(default)")
3✔
431
            self._target_manager.load_targets(circuit)
3✔
432

433
    # -
434
    @mpi_no_errors
4✔
435
    @timeit(name="Compute LB")
4✔
436
    def compute_load_balance(self):
4✔
437
        """In case the user requested load-balance this function instantiates a
438
        CellDistributor to split cells and balance those pieces across the available CPUs.
439
        """
440
        log_stage("Computing Load Balance")
3✔
441
        circuit = None
3✔
442
        for name, circuit in self._sonata_circuits.items():
3✔
443
            if circuit.get("PopulationType") != "virtual":
3✔
444
                logging.info("Activating experimental LB for Sonata circuit '%s'", name)
3✔
445
                break
3✔
446
        if circuit is None:
3✔
447
            logging.warning(
×
448
                "Cannot calculate the load balance because no non-virtual circuit is found"
449
            )
450
            return None
×
451

452
        if not circuit.CellLibraryFile:
3✔
453
            logging.info(" => No circuit for Load Balancing. Skipping... ")
×
454
            return None
×
455

456
        _ = PopulationNodes.offset_freezer()  # Dont offset while in loadbal
3✔
457

458
        # Info about the cells to be distributed
459
        target_spec = TargetSpec(circuit.CircuitTarget)
3✔
460
        target = self.target_manager.get_target(target_spec)
3✔
461

462
        # Check / set load balance mode
463
        lb_mode = LoadBalance.select_lb_mode(SimConfig, self._run_conf, target)
3✔
464
        if lb_mode == LoadBalanceMode.RoundRobin:
3✔
465
            return None
3✔
466
        if lb_mode == LoadBalanceMode.Memory:
2✔
467
            logging.info("Load Balancing ENABLED. Mode: Memory")
2✔
468
            return self._memory_mode_load_balancing()
2✔
469

470
        # Build load balancer as per requested options
471
        node_path = circuit.CellLibraryFile
2✔
472
        pop = target_spec.population
2✔
473
        load_balancer = LoadBalance(lb_mode, node_path, pop, self._target_manager)
2✔
474

475
        if load_balancer.valid_load_distribution(target_spec):
2✔
476
            logging.info("Load Balancing done.")
1✔
477
            return load_balancer
1✔
478

479
        logging.info("Could not reuse load balance data. Doing a Full Load-Balance")
2✔
480
        cell_dist = self._circuits.new_node_manager(circuit, self._target_manager, self._run_conf)
2✔
481
        with load_balancer.generate_load_balance(target_spec, cell_dist):
2✔
482
            # Instantiate the circuit cells and synapses to evaluate complexities
483
            cell_dist.finalize()
2✔
484
            self._circuits.global_manager.finalize()
2✔
485
            SimConfig.update_connection_blocks(self._circuits.alias)
2✔
486
            target_manager = self._target_manager
2✔
487
            self._create_synapse_manager(SynapseRuleManager, circuit, target_manager)
2✔
488

489
        # reset since we instantiated with RR distribution
490
        Nd.t = 0.0  # Reset time
2✔
491
        self.clear_model()
2✔
492

493
        return load_balancer
2✔
494

495
    def _memory_mode_load_balancing(self):
4✔
496
        filename = f"allocation_r{MPI.size}_c{SimConfig.modelbuilding_steps}.pkl.gz"
2✔
497

498
        file_exists = ospath.exists(filename)
2✔
499
        MPI.barrier()
2✔
500

501
        self._dry_run_stats = DryRunStats()
2✔
502
        if file_exists:
2✔
503
            alloc = self._dry_run_stats.import_allocation_stats(filename, self._cycle_i)
×
504
        else:
505
            logging.warning("Allocation file not found. Generating on-the-fly.")
2✔
506

507
            compute_cell_memory_usage = not Path(DryRunStats._MEMORY_USAGE_FILENAME).exists()
2✔
508
            if not compute_cell_memory_usage:
2✔
509
                self._dry_run_stats.try_import_cell_memory_usage()
2✔
510
            else:
511
                logging.warning("Cell memory usage file not found. Computing on-the-fly.")
1✔
512
            for circuit in self._sonata_circuits.values():
2✔
513
                if circuit.get("PopulationType") == "biophysical":
2✔
514
                    cell_distributor = CellDistributor(
2✔
515
                        circuit, self._target_manager, self._run_conf
516
                    )
517
                    cell_distributor.load_nodes(
2✔
518
                        None,
519
                        loader_opts={
520
                            "load_mode": "load_nodes_metype",
521
                            "dry_run_stats": self._dry_run_stats,
522
                        },
523
                    )
524
                    if compute_cell_memory_usage:
2✔
525
                        cell_distributor.finalize(dry_run_stats_obj=self._dry_run_stats)
1✔
526
            if compute_cell_memory_usage:
2✔
527
                self._dry_run_stats.collect_all_mpi()
1✔
528
                self._dry_run_stats.export_cell_memory_usage()
1✔
529
                # reset since we instantiated
530
                Nd.t = 0.0  # Reset time
1✔
531
                self.clear_model()
1✔
532

533
            alloc, _, _ = self._dry_run_stats.distribute_cells_with_validation(
2✔
534
                MPI.size, SimConfig.modelbuilding_steps
535
            )
536
        for pop, ranks in alloc.items():
2✔
537
            for rank, gids in ranks.items():
2✔
538
                logging.debug("Population: %s, Rank: %s, Number of GIDs: %s", pop, rank, len(gids))
2✔
539
        return alloc
2✔
540

541
    # -
542
    @mpi_no_errors
4✔
543
    @timeit(name="Cell creation")
4✔
544
    def create_cells(self, load_balance=None):
4✔
545
        """Instantiate and distributes the cells of the network.
546
        Any targets will be updated to know which cells are local to the cpu.
547
        """
548
        if SimConfig.dry_run:
3✔
549
            logging.info("Memory usage after inizialization:")
2✔
550
            print_mem_usage()
2✔
551
            self._dry_run_stats = DryRunStats()
2✔
552
            self._dry_run_stats.try_import_cell_memory_usage()
2✔
553
            loader_opts = {"dry_run_stats": self._dry_run_stats}
2✔
554
        else:
555
            loader_opts = {}
3✔
556

557
        loader_opts["cycle_i"] = self._cycle_i
3✔
558

559
        # Check dynamic attributes required before loading cells
560
        SimConfig.check_cell_requirements(self.target_manager)
3✔
561

562
        log_stage("LOADING NODES")
3✔
563
        config = SimConfig.cli_options
3✔
564
        if not load_balance:
3✔
565
            logging.info("Load-balance object not present. Continuing Round-Robin...")
3✔
566

567
        for name, circuit in self._sonata_circuits.items():
3✔
568
            log_stage("Circuit %s", name)
3✔
569
            if config.restrict_node_populations and name not in config.restrict_node_populations:
3✔
570
                logging.warning("Skipped node population (restrict_node_populations)")
×
571
                continue
×
572
            self._circuits.new_node_manager(
3✔
573
                circuit,
574
                self._target_manager,
575
                self._run_conf,
576
                load_balancer=load_balance,
577
                loader_opts=loader_opts,
578
            )
579

580
        lfp_weights_file = self._run_conf.get("LFPWeightsPath")
3✔
581
        if lfp_weights_file:
3✔
582
            if SimConfig.use_coreneuron:
1✔
583
                lfp_manager = self._circuits.global_manager._lfp_manager
1✔
584
                cell_managers = self._circuits.global_manager._cell_managers
1✔
585
                population_list = [
1✔
586
                    manager.population_name
587
                    for manager in cell_managers
588
                    if manager.population_name is not None
589
                ]
590
                lfp_manager.load_lfp_config(lfp_weights_file, population_list)
1✔
591
            else:
592
                logging.warning("LFP supported only with CoreNEURON.")
×
593

594
        PopulationNodes.freeze_offsets()  # Dont offset further, could change gids
3✔
595

596
        # Let the cell managers have any final say in the cell objects
597
        log_stage("FINALIZING CIRCUIT CELLS")
3✔
598

599
        for cell_manager in self._circuits.all_node_managers():
3✔
600
            log_stage("Circuit %s", cell_manager.circuit_name or "(default)")
3✔
601
            if SimConfig.dry_run:
3✔
602
                cell_manager.finalize(dry_run_stats_obj=self._dry_run_stats)
2✔
603
            else:
604
                cell_manager.finalize()
3✔
605

606
        if SimConfig.dry_run:
3✔
607
            self._dry_run_stats.collect_all_mpi()
2✔
608
            self._dry_run_stats.export_cell_memory_usage()
2✔
609
            self._dry_run_stats.estimate_cell_memory()
2✔
610

611
        # Final bits after we have all cell managers
612
        self._circuits.global_manager.finalize()
3✔
613
        SimConfig.update_connection_blocks(self._circuits.alias)
3✔
614

615
    # -
616
    @mpi_no_errors
4✔
617
    @timeit(name="Synapse creation")
4✔
618
    def create_synapses(self):
4✔
619
        """Create synapses among the cells, handling connections that appear in the config file"""
620
        log_stage("LOADING CIRCUIT CONNECTIVITY")
3✔
621
        target_manager = self._target_manager
3✔
622
        manager_kwa = {
3✔
623
            "load_offsets": self._is_ngv_run,
624
            "dry_run_stats": self._dry_run_stats,
625
        }
626

627
        for circuit in self._sonata_circuits.values():
3✔
628
            Engine = circuit.Engine or METypeEngine
3✔
629
            SynManagerCls = Engine.InnerConnectivityCls
3✔
630
            self._create_synapse_manager(SynManagerCls, circuit, target_manager, **manager_kwa)
3✔
631

632
        MPI.check_no_errors()
3✔
633
        log_stage("Handling projections...")
3✔
634
        for pname, projection in SimConfig.projections.items():
3✔
635
            self._load_projections(pname, projection, **manager_kwa)
3✔
636

637
        if SimConfig.dry_run:
3✔
638
            self.syn_total_memory = self._dry_run_stats.collect_display_syn_counts()
2✔
639
            return
2✔
640

641
        log_stage("Configuring connections...")
3✔
642
        for conn_conf in SimConfig.connections.values():
3✔
643
            self._process_connection_configure(conn_conf)
1✔
644

645
        logging.info("Done, but waiting for all ranks")
3✔
646

647
    def _create_synapse_manager(self, ctype, conf, *args, **kwargs):
4✔
648
        """Create a synapse manager for intra-circuit connectivity"""
649
        log_stage("Circuit %s", conf.name or "(default)")
3✔
650
        if not conf.get("nrnPath"):
3✔
651
            logging.info(" => No connectivity set as internal. See projections")
2✔
652
            return
2✔
653

654
        if SimConfig.cli_options.restrict_connectivity >= 2:
3✔
655
            logging.warning("Skipped connectivity (restrict_connectivity)")
×
656
            return
×
657

658
        circuit_target = TargetSpec(conf.get("CircuitTarget"))
3✔
659
        if circuit_target.population is None:
3✔
NEW
660
            circuit_target.population = self._circuits.alias.get(conf.name)
×
661

662
        edge_location = conf["nrnPath"]
3✔
663
        src, dst = edge_node_pop_names(edge_location.path, edge_location.name)
3✔
664

665
        logging.info("Processing edge file %s, pop: %s", edge_location.path, edge_location.name)
3✔
666

667
        if src and dst and src != dst:
3✔
668
            raise ConfigurationError("Inner connectivity with different populations")
×
669

670
        dst = self.circuits.alias.get(dst, dst)
3✔
671
        if dst not in SimConfig.cli_options.restrict_node_populations:
3✔
672
            logging.warning("Skipped connectivity (restrict_node_populations)")
×
673
            return
×
674

675
        manager = self._circuits.get_create_edge_manager(
3✔
676
            ctype, src, dst, circuit_target, (conf, *args), **kwargs
677
        )
678
        if manager.is_file_open:  # Base connectivity
3✔
679
            manager.create_connections()
3✔
680

681
    def _process_connection_configure(self, conn_conf):
4✔
682
        source_t = TargetSpec(conn_conf["Source"])
1✔
683
        dest_t = TargetSpec(conn_conf["Destination"])
1✔
684
        source_t.population, dest_t.population = self._circuits.unalias_pop_keys(
1✔
685
            source_t.population, dest_t.population
686
        )
687
        src_target = self.target_manager.get_target(source_t)
1✔
688
        dst_target = self.target_manager.get_target(dest_t)
1✔
689
        # Loop over population pairs
690
        for src_pop in src_target.population_names:
1✔
691
            for dst_pop in dst_target.population_names:
1✔
692
                # Loop over all managers having connections between the populations
693
                for conn_manager in self._circuits.get_edge_managers(src_pop, dst_pop):
1✔
694
                    logging.debug("Using connection manager: %s", conn_manager)
1✔
695
                    conn_manager.configure_connections(conn_conf)
1✔
696

697
    @mpi_no_errors
4✔
698
    def _load_projections(self, pname, projection, **kw):
4✔
699
        """Check for Projection blocks"""
700
        # None, GapJunctions, NeuroGlial, NeuroModulation...
701
        ptype = projection.get("Type")
3✔
702
        ptype_cls = EngineBase.connection_types.get(ptype)
3✔
703
        source_t = TargetSpec(projection.get("Source"))
3✔
704
        dest_t = TargetSpec(projection.get("Destination"))
3✔
705

706
        if SimConfig.cli_options.restrict_connectivity >= 1:
3✔
707
            logging.warning("Skipped projections %s->%s (restrict_connectivity)", source_t, dest_t)
×
708
            return
×
709

710
        if not ptype_cls:
3✔
711
            raise RuntimeError(f"No Engine to handle connectivity of type '{ptype}'")
×
712

713
        edge_location = projection["Path"]
3✔
714
        logging.info("Processing Edge file: %s", edge_location)
3✔
715

716
        # Update the target spec with the actual populations
717
        src_pop, dst_pop = edge_node_pop_names(
3✔
718
            edge_location.path, edge_location.name, source_t.population, dest_t.population
719
        )
720
        source_t.population, dest_t.population = self._circuits.unalias_pop_keys(src_pop, dst_pop)
3✔
721
        src_target = self.target_manager.get_target(source_t)
3✔
722
        dst_target = self.target_manager.get_target(dest_t)
3✔
723

724
        # If the src_pop is not a known node population, allow creating a Virtual one
725
        src_populations = src_target.population_names or [source_t.population]
3✔
726

727
        for src_pop in src_populations:
3✔
728
            for dst_pop in dst_target.population_names:
3✔
729
                logging.info(" * %s (Type: %s, Src: %s, Dst: %s)", pname, ptype, src_pop, dst_pop)
3✔
730
                conn_manager = self._circuits.get_create_edge_manager(
3✔
731
                    ptype_cls,
732
                    src_pop,
733
                    dst_pop,
734
                    source_t,
735
                    (projection, self.target_manager),
736
                    **kw,  # args to ptype_cls if creating
737
                )
738
                logging.debug("Using connection manager: %s", conn_manager)
3✔
739
                conn_manager.open_edge_location(edge_location, projection, src_name=src_pop)
3✔
740
                conn_manager.create_connections(source_t.name, dest_t.name)
3✔
741

742
    @mpi_no_errors
4✔
743
    @timeit(name="Enable Stimulus")
4✔
744
    def enable_stimulus(self):
4✔
745
        """Iterate over any stimulus defined in the config file given to the simulation
746
        and instantiate them.
747
        This passes the raw text in field/value pairs to a StimulusManager object to interpret the
748
        text and instantiate an actual stimulus object.
749
        """
750
        if Feature.Stimulus not in SimConfig.cli_options.restrict_features:
3✔
751
            logging.warning("Skipped Stimulus (restrict_features)")
1✔
752
            return
1✔
753

754
        log_stage("Stimulus Apply.")
3✔
755

756
        # for each stimulus defined in the config file, request the StimulusManager to
757
        # instantiate
758
        self._stim_manager = StimulusManager(self._target_manager)
3✔
759

760
        for stim in SimConfig.stimuli:
3✔
761
            if stim.get("Mode") == "Extracellular":
3✔
762
                raise ConfigurationError("input_type extracellular_stimulation is not supported")
×
763
            target_spec = TargetSpec(stim.get("Target"))
3✔
764

765
            stim_name = stim["Name"]
3✔
766
            stim_pattern = stim["Pattern"]
3✔
767
            if stim_pattern == "SynapseReplay":
3✔
768
                continue  # Handled by enable_replay
1✔
769
            logging.info(
3✔
770
                " * [STIM] %s (%s): -> %s",
771
                stim_name,
772
                stim_pattern,
773
                target_spec,
774
            )
775
            self._stim_manager.interpret(target_spec, stim)
3✔
776

777
    # -
778
    @mpi_no_errors
4✔
779
    def enable_replay(self):
4✔
780
        """Activate replay according to config file. Call before connManager.finalize"""
781
        if Feature.Replay not in SimConfig.cli_options.restrict_features:
3✔
782
            logging.warning("Skipped Replay (restrict_features)")
×
783
            return
×
784

785
        log_stage("Handling Replay")
3✔
786

787
        if SimConfig.use_coreneuron and bool(self._core_replay_file):
3✔
788
            logging.info(" -> [REPLAY] Reusing stim file from previous cycle")
×
789
            return
×
790

791
        for stim in SimConfig.stimuli:
3✔
792
            if stim.get("Pattern") != "SynapseReplay":
3✔
793
                continue
3✔
794
            target = stim["Target"]
1✔
795
            source = stim.get("Source")
1✔
796
            stim_name = stim["Name"]
1✔
797

798
            #  - delay: Spike replays are suppressed until a certain time
799
            delay = stim.get("Delay", 0.0)
1✔
800
            logging.info(
1✔
801
                " * [SYN REPLAY] %s -> %s (delay: %d)",
802
                stim_name,
803
                target,
804
                delay,
805
            )
806
            self._enable_replay(source, target, stim, delay=delay)
1✔
807

808
    # -
809
    def _enable_replay(
4✔
810
        self, source, target, stim_conf, tshift=0.0, delay=0.0, connectivity_type=None
811
    ):
812
        ptype_cls = EngineBase.connection_types.get(connectivity_type)
1✔
813
        src_target = self.target_manager.get_target(source)
1✔
814
        dst_target = self.target_manager.get_target(target)
1✔
815

816
        if SimConfig.restore_coreneuron:
1✔
817
            pop_offsets, alias_pop, _virtual_pop_offsets = CircuitManager.read_population_offsets()
×
818

819
        for src_pop in src_target.population_names:
1✔
820
            try:
1✔
821
                log_verbose("Loading replay spikes for population '%s'", src_pop)
1✔
822
                spike_manager = SpikeManager(stim_conf["SpikeFile"], tshift, src_pop)  # Disposable
1✔
823
            except MissingSpikesPopulationError:
1✔
824
                logging.info("  > No replay for src population: '%s'", src_pop)
1✔
825
                continue
1✔
826

827
            for dst_pop in dst_target.population_names:
1✔
828
                src_pop_str, dst_pop_str = src_pop or "(base)", dst_pop or "(base)"
1✔
829

830
                if SimConfig.restore_coreneuron:  # Node and Edges managers not initialized
1✔
831
                    src_pop_offset = (
×
832
                        pop_offsets[src_pop]
833
                        if src_pop in pop_offsets
834
                        else pop_offsets[alias_pop[src_pop]]
835
                    )
836
                else:
837
                    conn_manager = self._circuits.get_edge_manager(src_pop, dst_pop, ptype_cls)
1✔
838
                    if not conn_manager and SimConfig.cli_options.restrict_connectivity >= 1:
1✔
839
                        continue
×
840
                    assert conn_manager, f"Missing edge manager for {src_pop_str} -> {dst_pop_str}"
1✔
841
                    src_pop_offset = conn_manager.src_pop_offset
1✔
842

843
                logging.info(
1✔
844
                    "=> Population pathway %s -> %s. Source offset: %d",
845
                    src_pop_str,
846
                    dst_pop_str,
847
                    src_pop_offset,
848
                )
849
                conn_manager.replay(spike_manager, source, target, delay)
1✔
850

851
    # -
852
    @mpi_no_errors
4✔
853
    @timeit(name="Enable Modifications")
4✔
854
    def enable_modifications(self):
4✔
855
        """Iterate over any Modification blocks read from the config file and apply them to the
856
        network. The steps needed are more complex than NeuronConfigures, so the user should not be
857
        expected to write the hoc directly, but rather access a library of already available mods.
858
        """
859
        # mod_mananger gets destroyed when function returns (not required)
860
        # mod_manager = Nd.ModificationManager(self._target_manager.hoc)
861
        log_stage("Enabling modifications...")
3✔
862

863
        mod_manager = ModificationManager(self._target_manager)
3✔
864
        for name, mod_info in SimConfig.modifications.items():
3✔
865
            target_spec = TargetSpec(mod_info["Target"])
1✔
866
            logging.info(" * [MOD] %s: %s -> %s", name, mod_info["Type"], target_spec)
1✔
867
            mod_manager.interpret(target_spec, mod_info)
1✔
868

869
    def write_and_get_population_offsets(self) -> tuple[dict, dict, dict]:
4✔
870
        """Retrieve population offsets from the circuit or restore them,
871
        write the offsets, and return them.
872

873
        Returns:
874
            tuple[dict, dict, dict]:
875
                - pop_offsets: Mapping of population names to GID offsets.
876
                - alias_pop: Mapping of population aliases to population names.
877
                - virtual_pop_offsets: Mapping of virtual population names to offsets.
878
        """
879
        if self._circuits.initialized():
3✔
880
            pop_offsets, alias_pop = self._circuits.get_population_offsets()
3✔
881
            virtual_pop_offsets = self._circuits.get_virtual_population_offsets()
3✔
882
        else:
883
            # restore way
884
            pop_offsets, alias_pop, virtual_pop_offsets = CircuitManager.read_population_offsets()
1✔
885
        self._circuits.write_population_offsets(
3✔
886
            pop_offsets, alias_pop, virtual_pop_offsets=virtual_pop_offsets
887
        )
888
        return pop_offsets, alias_pop, virtual_pop_offsets
3✔
889

890
    # @mpi_no_errors - not required since theres a call inside before make_comm()
891
    @timeit(name="Enable Reports")
4✔
892
    def enable_reports(self):  # noqa: C901, PLR0912, PLR0915
4✔
893
        """Iterate over reports defined in the config file and instantiate them."""
894
        log_stage("Reports Enabling")
3✔
895

896
        # filter: only the enabled ones
897
        reports_conf = {name: conf for name, conf in SimConfig.reports.items() if conf["Enabled"]}
3✔
898
        self._report_list = []
3✔
899

900
        pop_offsets, alias_pop, _virtual_pop_offsets = self.write_and_get_population_offsets()
3✔
901
        pop_offsets_alias = pop_offsets, alias_pop
3✔
902

903
        if SimConfig.use_coreneuron:
3✔
904
            if SimConfig.restore_coreneuron:
1✔
905
                # we copy it first. We will proceed to modify
906
                # it in update_report_config later in one go
907
                Path(CoreConfig.report_config_file_save).parent.mkdir(parents=True, exist_ok=True)
1✔
908
                shutil.copy(
1✔
909
                    CoreConfig.report_config_file_restore, CoreConfig.report_config_file_save
910
                )
911
            else:
912
                core_report_config = CoreReportConfig()
1✔
913

914
        # necessary for restore: we need to update the various reports tend
915
        # we can do it in one go later
916
        substitutions = defaultdict(dict)
3✔
917
        cumulative_error = CumulativeError()
3✔
918
        for rep_name, rep_conf in reports_conf.items():
3✔
919
            cumulative_error.is_error_appended = False
1✔
920
            target_spec = TargetSpec(rep_conf["Target"])
1✔
921
            target = self._target_manager.get_target(target_spec)
1✔
922

923
            # Build final config. On errors log, stop only after all reports processed
924
            rep_params = create_report_parameters(
1✔
925
                sim_end=self._run_conf["Duration"],
926
                nd_t=Nd.t,
927
                output_root=SimConfig.output_root,
928
                rep_name=rep_name,
929
                rep_conf=rep_conf,
930
                target=target,
931
                buffer_size=SimConfig.report_buffer_size,
932
                cumulative_error=cumulative_error,
933
            )
934
            if cumulative_error.is_error_appended:
1✔
935
                continue
×
936
            check_report_parameters(
1✔
937
                rep_params,
938
                Nd.dt,
939
                lfp_active=self._circuits.global_manager._lfp_manager._lfp_file,
940
                cumulative_error=cumulative_error,
941
            )
942
            if cumulative_error.is_error_appended:
1✔
943
                continue
1✔
944

945
            if SimConfig.restore_coreneuron:
1✔
946
                substitutions[rep_params.name]["end_time"] = rep_params.end
1✔
947
                continue  # we dont even need to initialize reports
1✔
948

949
            # With coreneuron direct mode, enable fast membrane current calculation
950
            # for i_membrane
951
            if (
1✔
952
                SimConfig.coreneuron_direct_mode and "i_membrane" in rep_params.report_on
953
            ) or rep_params.type == ReportType.LFP:
954
                Nd.cvode.use_fast_imem(1)
1✔
955

956
            has_gids = len(self._circuits.global_manager.get_final_gids()) > 0
1✔
957
            if not has_gids:
1✔
958
                self._report_list.append(None)
×
959
                continue
×
960

961
            report = create_report(
1✔
962
                params=rep_params,
963
                use_coreneuron=SimConfig.use_coreneuron,
964
                cumulative_error=cumulative_error,
965
            )
966
            if cumulative_error.is_error_appended:
1✔
967
                continue
1✔
968
            self._set_point_list_in_rep_params(rep_params, cumulative_error=cumulative_error)
1✔
969
            if cumulative_error.is_error_appended:
1✔
970
                continue
×
971

972
            if SimConfig.use_coreneuron:
1✔
973
                core_report_config.add_entry(
1✔
974
                    CoreReportConfigEntry.from_report_params(rep_params=rep_params)
975
                )
976

977
            if not SimConfig.use_coreneuron or rep_params.type == ReportType.SYNAPSE:
1✔
978
                report.setup(
1✔
979
                    rep_params=rep_params,
980
                    global_manager=self._circuits.global_manager,
981
                    cumulative_error=cumulative_error,
982
                )
983
                if cumulative_error.is_error_appended:
1✔
984
                    continue
1✔
985

986
            self._report_list.append(report)
1✔
987

988
        if SimConfig.restore_coreneuron:
3✔
989
            CoreReportConfig.update_file(CoreConfig.report_config_file_save, substitutions)
1✔
990

991
        cumulative_error.raise_if_any()
3✔
992

993
        MPI.check_no_errors()
3✔
994

995
        if not SimConfig.restore_coreneuron:
3✔
996
            if SimConfig.use_coreneuron:
3✔
997
                self._finalize_corenrn_reports(core_report_config, pop_offsets_alias)
1✔
998
            else:
999
                self._finalize_nrn_reports()
3✔
1000

1001
    def _finalize_corenrn_reports(self, core_report_config, pop_offsets_alias):
4✔
1002
        core_report_config.set_pop_offsets(pop_offsets_alias[0])
1✔
1003
        core_report_config.set_spike_filename(self._run_conf.get("SpikesFile"))
1✔
1004
        core_report_config.dump(CoreConfig.report_config_file_save)
1✔
1005

1006
    def _finalize_nrn_reports(self):
4✔
1007
        # once all reports are created, we finalize the communicator for any reports
1008
        self._sonatareport_helper.set_max_buffer_size_hint(SimConfig.report_buffer_size)
3✔
1009
        self._sonatareport_helper.make_comm()
3✔
1010
        self._sonatareport_helper.prepare_datasets()
3✔
1011

1012
    @cache_errors
4✔
1013
    def _set_point_list_in_rep_params(self, rep_params: ReportParameters):
4✔
1014
        """Dispatcher: it helps to retrieve the points of a target and set them in
1015
        the report parameters.
1016

1017
        Returns: The target list of points
1018
        """
1019
        if rep_params.type == ReportType.COMPARTMENT_SET:
1✔
1020
            rep_params.points = rep_params.target.get_point_list_from_compartment_set(
×
1021
                cell_manager=self._target_manager._cell_manager,
1022
                compartment_set=self._target_manager.get_compartment_set(
1023
                    rep_params.compartment_set
1024
                ),
1025
            )
1026
        else:
1027
            sections, compartments = rep_params.sections, rep_params.compartments
1✔
1028
            if rep_params.type == ReportType.SUMMATION and sections == SectionType.SOMA:
1✔
1029
                sections, compartments = SectionType.ALL, CompartmentType.ALL
1✔
1030
            rep_params.points = rep_params.target.get_point_list(
1✔
1031
                cell_manager=self._target_manager._cell_manager,
1032
                section_type=sections,
1033
                compartment_type=compartments,
1034
            )
1035

1036
    # -
1037
    @mpi_no_errors
4✔
1038
    def sim_init(self, corenrn_gen=None, **sim_opts):
4✔
1039
        """Finalize the model and prepare to run simulation.
1040

1041
        After finalizing the model, will eventually write coreneuron config
1042
        and initialize the neuron simulation if applicable.
1043

1044
        Args:
1045
            corenrn_gen: Whether to generate coreneuron config. Default: None (if required)
1046
            sim_opts - override _finalize_model options. E.g. spike_compress
1047
        """
1048
        if self._sim_ready:
3✔
1049
            return self._pc
1✔
1050

1051
        if not len(self._circuits.all_node_managers()):
3✔
1052
            raise RuntimeError("No CellDistributor was initialized. Please create a circuit.")
×
1053

1054
        self._finalize_model(**sim_opts)
3✔
1055

1056
        if corenrn_gen is None:
3✔
1057
            corenrn_gen = SimConfig.use_coreneuron
3✔
1058
        if corenrn_gen:
3✔
1059
            self._coreneuron_configure_datadir(
2✔
1060
                corenrn_restore=False, coreneuron_direct_mode=SimConfig.coreneuron_direct_mode
1061
            )
1062
            self._coreneuron_write_sim_config(corenrn_restore=False)
2✔
1063

1064
        if SimConfig.use_neuron or SimConfig.coreneuron_direct_mode:
3✔
1065
            self._sim_init_neuron()
3✔
1066

1067
        assert not (SimConfig.use_neuron and SimConfig.use_coreneuron)
3✔
1068
        if SimConfig.use_neuron:
3✔
1069
            self.dump_cell_states()
3✔
1070

1071
        self._sim_ready = True
3✔
1072
        return self._pc
3✔
1073

1074
    # -
1075
    @mpi_no_errors
4✔
1076
    @timeit(name="Model Finalized")
4✔
1077
    def _finalize_model(self, spike_compress=3):
4✔
1078
        """Set up simulation run parameters and initialization.
1079

1080
        Handles setup_transfer, spike_compress, _record_spikes, stdinit, timeout
1081
        Args:
1082
            spike_compress: The spike_compress() parameters (tuple or int)
1083
        """
1084
        logging.info("Preparing to run simulation...")
3✔
1085
        for mgr in self._circuits.all_node_managers():
3✔
1086
            mgr.pre_stdinit()
3✔
1087

1088
        is_save_state = SimConfig.save or SimConfig.restore
3✔
1089
        self._pc.setup_transfer()
3✔
1090

1091
        if spike_compress and not is_save_state and not self._is_ngv_run:
3✔
1092
            # multisend 13 is combination of multisend(1) + two_phase(8) + two_intervals(4)
1093
            # to activate set spike_compress=(0, 0, 13)
1094
            if SimConfig.loadbal_mode == LoadBalanceMode.Memory:
2✔
1095
                logging.info("Disabling spike compression for Memory Load Balance")
2✔
1096
                spike_compress = False
2✔
1097
            if not isinstance(spike_compress, tuple):
2✔
1098
                spike_compress = (spike_compress, 1, 0)
2✔
1099
            self._pc.spike_compress(*spike_compress)
2✔
1100

1101
        # LFP calculation requires WholeCell balancing and extracellular mechanism.
1102
        # This is incompatible with efficient caching atm AND Incompatible with
1103
        # mcd & Glut
1104
        if not self._is_ngv_run:
3✔
1105
            Nd.cvode.cache_efficient("ElectrodesPath" not in self._run_conf)
2✔
1106
        self._pc.set_maxstep(4)
3✔
1107
        with timeit(name="stdinit"):
3✔
1108
            Nd.stdinit()
3✔
1109

1110
    # -
1111
    def _sim_init_neuron(self):
4✔
1112
        # === Neuron specific init ===
1113
        restore_path = SimConfig.restore
3✔
1114

1115
        # create a spike_id vector which stores the pairs for spikes and timings for
1116
        # every engine
1117
        for cell_manager in self._circuits.all_node_managers():
3✔
1118
            if cell_manager.population_name is not None:
3✔
1119
                self._spike_populations.append(
3✔
1120
                    (cell_manager.population_name, cell_manager.local_nodes.offset)
1121
                )
1122
                self._spike_vecs.append(cell_manager.record_spikes() or (Nd.Vector(), Nd.Vector()))
3✔
1123

1124
        self._pc.timeout(200)  # increase by 10x
3✔
1125

1126
        if restore_path:
3✔
1127
            with timeit(name="restoretime"):
×
1128
                logging.info("Restoring state...")
×
1129
                self._stim_manager.saveStatePreparation(self._bbss)
×
1130
                self._bbss.vector_play_init()
×
1131
                self._restart_events()  # On restore the event queue is cleared
×
1132
                return  # Upon restore sim is ready
×
1133

1134
    # -
1135
    def _restart_events(self):
4✔
1136
        logging.info("Restarting connections events (Replay and Spont Minis)")
×
1137
        for syn_manager in self._circuits.all_synapse_managers():
×
1138
            syn_manager.restart_events()
×
1139

1140
    @contextmanager
4✔
1141
    def _coreneuron_ensure_all_ranks_have_gids(self, corenrn_data):
4✔
1142
        local_gid_count = sum(
2✔
1143
            len(manager.local_nodes) for manager in self._circuits.all_node_managers()
1144
        )
1145
        if local_gid_count > 0:
2✔
1146
            yield
2✔
1147
            return
2✔
1148

1149
        # Create a dummy cell manager with node_pop = None
1150
        # which holds a fake node with a fake population "zzz" to get an unused gid.
1151
        # coreneuron fails if this edge case is reached multiple times as we
1152
        # try to add twice the same gid. pop "zzz" is reserved to be used
1153
        # exclusively for handling cases where no real GIDs are assigned to
1154
        # a rank, ensuring that CoreNeuron does not crash due to missing GIDs.
1155
        log_verbose("Creating fake gid for CoreNeuron")
1✔
1156
        assert not PopulationNodes.get("zzz"), "Population 'zzz' is reserved "
1✔
1157
        "for handling empty GID ranks and should not be used elsewhere."
1✔
1158
        pop_group = PopulationNodes.get("zzz", create=True)
1✔
1159
        fake_gid = pop_group.offset + 1 + MPI.rank
1✔
1160
        # Add the fake cell to a dummy manager
1161
        dummy_cell_manager = CellDistributor(
1✔
1162
            circuit_conf=make_circuit_config({"CellLibraryFile": "<NONE>"}),
1163
            target_manager=self._target_manager,
1164
        )
1165
        dummy_cell_manager.load_artificial_cell(fake_gid, CoreConfig.artificial_cell_object)
1✔
1166
        yield
1✔
1167

1168
        # register_mapping() doesn't work for this artificial cell as somatic attr is
1169
        # missing, so create a dummy mapping file manually, required for reporting
1170
        cur_files = glob.glob(f"{corenrn_data}/*_3.dat")
1✔
1171
        example_mapfile = cur_files[0]
1✔
1172
        with open(example_mapfile, "rb") as f_mapfile:
1✔
1173
            # read the version from the existing mapping file generated by coreneuron
1174
            coredata_version = f_mapfile.readline().rstrip().decode("ascii")
1✔
1175

1176
        mapping_file = Path(corenrn_data, f"{fake_gid}_3.dat")
1✔
1177
        if not mapping_file.is_file():
1✔
1178
            mapping_file.write_text(f"{coredata_version}\n0\n", encoding="utf-8")
1✔
1179

1180
    def _coreneuron_configure_datadir(self, corenrn_restore, coreneuron_direct_mode):
4✔
1181
        """Configures the CoreNEURON data directory and handles shared memory (SHM) setup.
1182

1183
        - Creates the data directory if it doesn't exist.
1184
        - If in direct mode, returns immediately since the default behavior is fine.
1185
        - If restoring, skips the setup.
1186
        - If not restoring, checks if SHM should be enabled based on available memory,
1187
          and sets up symlinks for CoreNEURON's necessary files in SHM.
1188

1189
        Args:
1190
            corenrn_restore (bool): Flag indicating if CoreNEURON is in restore mode.
1191
            coreneuron_direct_mode (bool): Flag indicating if direct mode is enabled.
1192
        """
1193
        corenrn_datadir = SimConfig.coreneuron_datadir_path(create=True)
2✔
1194
        if coreneuron_direct_mode:
2✔
1195
            return
1✔
1196
        corenrn_datadir_shm = SHMUtil.get_datadir_shm(corenrn_datadir)
2✔
1197

1198
        # Clean-up any previous simulations in the same output directory
1199
        if self._cycle_i == 0 and corenrn_datadir_shm:
2✔
1200
            rmtree(corenrn_datadir_shm)
1✔
1201

1202
        # Ensure that we have a folder in /dev/shm (i.e., 'SHMDIR' ENV variable)
1203
        if SimConfig.cli_options.enable_shm and not corenrn_datadir_shm:
2✔
1204
            logging.warning("Unknown SHM directory for model file transfer in CoreNEURON.")
×
1205
        # Try to configure the /dev/shm folder as the output directory for the files
1206
        elif (
2✔
1207
            self._cycle_i == 0
1208
            and not corenrn_restore
1209
            and (SimConfig.cli_options.enable_shm and SimConfig.delete_corenrn_data)
1210
        ):
1211
            # Check for the available memory in /dev/shm and estimate the RSS by multiplying
1212
            # the number of cycles in the multi-step model build with an approximate
1213
            # factor
1214
            mem_avail = SHMUtil.get_mem_avail()
1✔
1215
            shm_avail = SHMUtil.get_shm_avail()
1✔
1216
            initial_rss = self._initial_rss
1✔
1217
            current_rss = SHMUtil.get_node_rss()
1✔
1218
            factor = SHMUtil.get_shm_factor()
1✔
1219
            rss_diff = (current_rss - initial_rss) if initial_rss < current_rss else current_rss
1✔
1220
            # 'rss_diff' prevents <0 estimates
1221
            rss_req = int(rss_diff * self._n_cycles * factor)
1✔
1222

1223
            # Sync condition value with all ranks to ensure that all of them can use
1224
            # /dev/shm
1225
            shm_possible = (rss_req < shm_avail) and (rss_req < mem_avail)
1✔
1226
            if MPI.allreduce(int(shm_possible), MPI.SUM) == MPI.size:
1✔
1227
                logging.info("SHM file transfer mode for CoreNEURON enabled")
1✔
1228

1229
                # Create SHM folder and links to GPFS for the global data structures
1230
                os.makedirs(corenrn_datadir_shm, exist_ok=True)
1✔
1231

1232
                # Important: These three files must be available on every node, as they are shared
1233
                #            across all of the processes. The trick here is to fool NEURON into
1234
                #            thinking that the files are written in /dev/shm, but they are actually
1235
                #            written on GPFS. The workflow is identical, meaning that rank 0 writes
1236
                #            the content and every other rank reads it afterwards in CoreNEURON.
1237
                for filename in ("bbcore_mech.dat", "files.dat", "globals.dat"):
1✔
1238
                    path = os.path.join(corenrn_datadir, filename)
1✔
1239
                    path_shm = os.path.join(corenrn_datadir_shm, filename)
1✔
1240

1241
                    try:
1✔
1242
                        os.close(os.open(path, os.O_CREAT))
1✔
1243
                        os.symlink(path, path_shm)
1✔
1244
                    except FileExistsError:
×
1245
                        pass  # Ignore if other process has already created it
×
1246

1247
                # Update the flag to confirm the configuration
1248
                self._shm_enabled = True
1✔
1249
            else:
1250
                logging.warning(
×
1251
                    "Unable to utilize SHM for model file transfer in CoreNEURON. "
1252
                    "Increase the number of nodes to reduce the memory footprint "
1253
                    "(Current use node: %d MB / SHM Limit: %d MB / Mem. Limit: %d MB)",
1254
                    (rss_req >> 20),
1255
                    (shm_avail >> 20),
1256
                    (mem_avail >> 20),
1257
                )
1258
        _SimConfig.coreneuron_datadir = (
2✔
1259
            corenrn_datadir if not self._shm_enabled else corenrn_datadir_shm
1260
        )
1261

1262
    # -
1263
    @timeit(name="corewrite")
4✔
1264
    def _coreneuron_write_sim_config(self, corenrn_restore):
4✔
1265
        log_stage("Dataset generation for CoreNEURON")
2✔
1266

1267
        if not corenrn_restore:
2✔
1268
            CompartmentMapping(self._circuits.global_manager).register_mapping()
2✔
1269
            if not SimConfig.coreneuron_direct_mode:
2✔
1270
                with self._coreneuron_ensure_all_ranks_have_gids(CoreConfig.datadir):
2✔
1271
                    self._pc.nrnbbcore_write(CoreConfig.datadir)
2✔
1272
                    MPI.barrier()  # wait for all ranks to finish corenrn data generation
2✔
1273

1274
        prcellgid = self._dump_cell_state_gids[0] if self._dump_cell_state_gids else -1
2✔
1275
        if self._dump_cell_state_gids and len(self._dump_cell_state_gids) > 1:
2✔
1276
            logging.warning(
1✔
1277
                "Multiple cell state GIDs provided. Using the first one: %d",
1278
                self._dump_cell_state_gids[0],
1279
            )
1280

1281
        core_simulation_config = CoreSimulationConfig(
2✔
1282
            outpath=CoreConfig.output_root,
1283
            datpath=CoreConfig.datadir,
1284
            tstop=Nd.tstop,
1285
            dt=Nd.dt,
1286
            prcellgid=prcellgid,
1287
            celsius=getattr(SimConfig, "celsius", 34.0),
1288
            voltage=getattr(SimConfig, "v_init", -65.0),
1289
            cell_permute=int(SimConfig.cell_permute),
1290
            pattern=self._core_replay_file or None,
1291
            seed=SimConfig.rng_info.getGlobalSeed(),
1292
            model_stats=int(SimConfig.cli_options.model_stats),
1293
            report_conf=CoreConfig.report_config_file_save
1294
            if self._run_conf["EnableReports"]
1295
            else None,
1296
            mpi=int(os.environ.get("NEURON_INIT_MPI", "1")),
1297
        )
1298
        core_simulation_config.dump(CoreConfig.sim_config_file)
2✔
1299
        # Wait for rank0 to write the sim config file
1300
        MPI.barrier()
2✔
1301
        logging.info(" => Dataset written to '%s'", CoreConfig.datadir)
2✔
1302

1303
    # -
1304
    def run_all(self):
4✔
1305
        """Run the whole simulation according to the simulation config file"""
1306
        if not self._sim_ready:
3✔
1307
            self.sim_init()
×
1308

1309
        timings = None
3✔
1310
        if SimConfig.use_neuron:
3✔
1311
            timings = self._run_neuron()
3✔
1312
            self.sonata_spikes()
3✔
1313
        if SimConfig.use_coreneuron:
3✔
1314
            print_mem_usage()
2✔
1315
            if not SimConfig.coreneuron_direct_mode:
2✔
1316
                self.clear_model(avoid_clearing_queues=False)
2✔
1317
            self._run_coreneuron()
2✔
1318
            if SimConfig.coreneuron_direct_mode:
2✔
1319
                self.sonata_spikes()
1✔
1320
        return timings
3✔
1321

1322
    # -
1323
    @return_neuron_timings
4✔
1324
    def _run_neuron(self):
4✔
1325
        if MPI.rank == 0:
3✔
1326
            _ = SimulationProgress()
3✔
1327
        self.solve()
3✔
1328
        logging.info("Simulation finished.")
3✔
1329

1330
    @staticmethod
4✔
1331
    def _run_coreneuron():
4✔
1332
        logging.info("Launching simulation with CoreNEURON")
2✔
1333
        CoreConfig.psolve_core(
2✔
1334
            SimConfig.coreneuron_direct_mode,
1335
        )
1336

1337
    def _sim_event_handlers(self, tstart, tstop):
4✔
1338
        """Create handlers for "in-simulation" events, like activating delayed
1339
        connections, execute Save-State, etc
1340
        """
1341
        events = defaultdict(list)  # each key (time) points to a list of handlers
3✔
1342

1343
        if SimConfig.save:
3✔
1344
            tsave = SimConfig.tstop  # Consider 0 as the end too!
×
1345
            save_f = self._create_save_handler()
×
1346
            events[tsave].append(save_f)
×
1347

1348
        event_list = [(t, events[t]) for t in sorted(events)]
3✔
1349
        return event_list
3✔
1350

1351
    # -
1352
    def _create_save_handler(self):
4✔
1353
        @timeit(name="savetime")
×
1354
        def save_f():
×
1355
            """Function that saves the current simulation state:
1356
            syncs MPI, saves stimuli, flushes reports, clears the model,
1357
            and logs progress.
1358
            """
1359
            logging.info("Saving State... (t=%f)", SimConfig.tstop)
×
1360
            MPI.barrier()
×
1361
            self._stim_manager.saveStatePreparation(self._bbss)
×
1362
            log_verbose("SaveState Initialization Done")
×
1363

1364
            # If event at the end of the sim we can actually clearModel()
1365
            # before savestate()
1366
            log_verbose("Clearing model prior to final save")
×
1367
            self._sonatareport_helper.flush()
×
1368

1369
            self.clear_model()
×
1370
            logging.info(" => Save done successfully")
×
1371

1372
        return save_f
×
1373

1374
    # -
1375
    @mpi_no_errors
4✔
1376
    @timeit(name="psolve")
4✔
1377
    def solve(self, tstop=None):
4✔
1378
        """Call solver with a given stop time (default: whole interval).
1379
        Be sure to have sim_init()'d the simulation beforehand
1380
        """
1381
        if not self._sim_ready:
3✔
1382
            raise ConfigurationError("Initialize simulation first")
×
1383

1384
        tstart = Nd.t
3✔
1385
        tstop = tstop or Nd.tstop
3✔
1386
        event_list = self._sim_event_handlers(tstart, tstop)
3✔
1387

1388
        # NOTE: _psolve_loop is called among events in order to eventually split long
1389
        # simulation blocks, where one or more report flush(es) can happen. It is a simplified
1390
        # design relatively to the original version where the report checkpoint would not happen
1391
        # before the checkpoint timeout (25ms default). However there shouldn't be almost any
1392
        # performance penalty since the simulation is already halted between events.
1393

1394
        logging.info("Running simulation until t=%d ms", tstop)
3✔
1395
        t = tstart  # default if there are no events
3✔
1396
        for t, events in event_list:
3✔
1397
            self._psolve_loop(t)
×
1398
            for event in events:
×
1399
                event()
×
1400
            self.dump_cell_states()
×
1401
        # Run until the end
1402
        if t < tstop:
3✔
1403
            self._psolve_loop(tstop)
3✔
1404
            self.dump_cell_states()
3✔
1405

1406
        # Final flush
1407
        self._sonatareport_helper.flush()
3✔
1408

1409
    # psolve_loop: There was an issue where MPI collective routines for reporting and spike exchange
1410
    # are mixed such that some cpus are blocked waiting to complete reporting while others to
1411
    # finish spike exchange. As a work-around, periodically halt simulation and flush reports
1412
    # Default is 25 ms / cycle
1413
    def _psolve_loop(self, tstop):
4✔
1414
        cur_t = round(Nd.t, 2)  # fp innnacuracies could lead to infinitesimal loops
3✔
1415
        buffer_t = SimConfig.buffer_time
3✔
1416
        for _ in range(math.ceil((tstop - cur_t) / buffer_t)):
3✔
1417
            next_flush = min(tstop, cur_t + buffer_t)
3✔
1418
            self._pc.psolve(next_flush)
3✔
1419
            cur_t = next_flush
3✔
1420
        Nd.t = cur_t
3✔
1421

1422
    # -
1423
    @mpi_no_errors
4✔
1424
    def clear_model(self, avoid_creating_objs=False, avoid_clearing_queues=True):
4✔
1425
        """Clears appropriate lists and other stored references.
1426
        For use with intrinsic load balancing. After creating and evaluating the network using
1427
        round robin distribution, we want to clear the cells and synapses in order to have a
1428
        clean slate on which to instantiate the balanced cells.
1429
        """
1430
        logging.info("Clearing model")
3✔
1431
        self._pc.gid_clear()
3✔
1432
        self._target_manager.clear_simulation_data()
3✔
1433

1434
        if not avoid_creating_objs and SimConfig.use_neuron and self._sonatareport_helper:
3✔
1435
            self._sonatareport_helper.clear()
2✔
1436

1437
        # Reset vars
1438
        self._reset()
3✔
1439

1440
        # Clear BBSaveState
1441
        self._bbss.ignore()
3✔
1442

1443
        # Shrink ArrayPools holding mechanism's data in NEURON
1444
        pool_shrink()
3✔
1445

1446
        # Free event queues in NEURON
1447
        if not avoid_clearing_queues:
3✔
1448
            free_event_queues()
2✔
1449

1450
        # Garbage collect all Python objects without references
1451
        gc.collect()
3✔
1452

1453
        # Finally call malloc_trim to return all the freed pages back to the OS
1454
        trim_memory()
3✔
1455
        print_mem_usage()
3✔
1456

1457
    # -------------------------------------------------------------------------
1458
    #  output
1459
    # -------------------------------------------------------------------------
1460

1461
    def sonata_spikes(self):
4✔
1462
        """Write the spike events that occured on each node into a single output SONATA file."""
1463
        output_root = SimConfig.output_root_path(create=True)
3✔
1464
        if hasattr(self._sonatareport_helper, "create_spikefile"):
3✔
1465
            # Write spike report for multiple populations if exist
1466
            spike_path = self._run_conf.get("SpikesFile")
3✔
1467

1468
            # Get only the spike file name
1469
            file_name = spike_path.split("/")[-1] if spike_path is not None else "out.h5"
3✔
1470

1471
            # create a sonata spike file
1472
            self._sonatareport_helper.create_spikefile(output_root, file_name)
3✔
1473
            # write spikes per population
1474
            for (population, population_offset), (spikevec, idvec) in zip(
3✔
1475
                self._spike_populations, self._spike_vecs, strict=True
1476
            ):
1477
                extra_args = (
3✔
1478
                    (population, population_offset) if population else ("All", population_offset)
1479
                )
1480
                self._sonatareport_helper.add_spikes_population(spikevec, idvec, *extra_args)
3✔
1481
            # write all spike populations
1482
            self._sonatareport_helper.write_spike_populations()
3✔
1483
            # close the spike file
1484
            self._sonatareport_helper.close_spikefile()
3✔
1485
        else:
1486
            # fallback: write spike report with one single population "ALL"
1487
            logging.warning(
×
1488
                "Writing spike reports with multiple populations is not supported. "
1489
                "If needed, please update to a newer version of neurodamus."
1490
            )
1491
            population = self._target_spec.population or "All"
×
1492
            extra_args = (population,)
×
1493
            self._sonatareport_helper.write_spikes(spikevec, idvec, output_root, *extra_args)
×
1494

1495
    def dump_cell_states(self):
4✔
1496
        """Dump the _pr_cell_gid cell state if not already done
1497

1498
        We assume that the parallel context is ready. Thus, This function should
1499
        not be called if coreNeuron is employed and we are not at t=0.0.
1500
        """
1501
        assert SimConfig.use_neuron, "This function can work only with Neuron. Use sim.conf to "
3✔
1502
        "instruct coreNeuron to dump a cell state instead."
3✔
1503
        if not self._dump_cell_state_gids:
3✔
1504
            return
3✔
1505
        if self._last_cell_state_dump_t == Nd.t:  # avoid duplicating output
1✔
1506
            return
×
1507

1508
        for i in self._dump_cell_state_gids:
1✔
1509
            log_verbose("Dumping info about cell %d", i)
1✔
1510

1511
            self._pc.prcellstate(i, f"py_Neuron_t{Nd.t}")
1✔
1512

1513
        self._last_cell_state_dump_t = Nd.t
1✔
1514

1515
    @staticmethod
4✔
1516
    @run_only_rank0
4✔
1517
    def coreneuron_cleanup():
4✔
1518
        """Clean coreneuron save files after running"""
1519
        data_folder = Path(CoreConfig.datadir)
2✔
1520
        logging.info("Deleting intermediate data in %s", data_folder)
2✔
1521
        assert data_folder.is_dir(), "Data folder must be a directory"
2✔
1522
        if data_folder.is_symlink():
2✔
1523
            # in restore, coreneuron data is a symbolic link
1524
            data_folder.unlink()
1✔
1525
        else:
1526
            rmtree(data_folder)
2✔
1527

1528
        build_path = Path(SimConfig.build_path())
2✔
1529
        if build_path.exists():
2✔
1530
            shutil.rmtree(build_path)
2✔
1531

1532
        sim_conf = Path(CoreConfig.sim_config_file)
2✔
1533
        assert not sim_conf.exists()
2✔
1534

1535
        report_file = Path(CoreConfig.report_config_file_save)
2✔
1536
        assert not report_file.exists()
2✔
1537

1538
    def cleanup(self):
4✔
1539
        """Have the compute nodes wrap up tasks before exiting."""
1540
        # MemUsage constructor will do MPI communications
1541
        print_mem_usage()
3✔
1542

1543
        # Coreneuron runs clear the model before starting
1544
        if not SimConfig.use_coreneuron or SimConfig.simulate_model is False:
3✔
1545
            self.clear_model(avoid_creating_objs=True)
3✔
1546

1547
        if SimConfig.delete_corenrn_data and not SimConfig.save and not SimConfig.dry_run:
3✔
1548
            with timeit(name="Delete corenrn data"):
2✔
1549
                self.coreneuron_cleanup()
2✔
1550
                MPI.barrier()
2✔
1551

1552
    @staticmethod
4✔
1553
    @run_only_rank0
4✔
1554
    def move_dumpcellstates_to_output_root():
4✔
1555
        """Check for .corenrn or .nrn files in the current directory
1556
        and move them to CoreConfig.output_root_path(create=True).
1557
        """
1558
        current_dir = Path.cwd()
3✔
1559
        output_root = Path(SimConfig.output_root_path(create=True))
3✔
1560

1561
        # Iterate through files in the current directory
1562
        for file in current_dir.iterdir():
3✔
1563
            if file.suffix in {".corenrn", ".nrn", ".nrndat"}:
3✔
1564
                shutil.move(str(file), output_root / file.name)
1✔
1565
                logging.info("Moved %s to %s", file.name, output_root)
1✔
1566

1567

1568
class Neurodamus(Node):
4✔
1569
    """A high level interface to Neurodamus"""
1570

1571
    def __init__(self, config_file, auto_init=True, logging_level=None, **user_opts):
4✔
1572
        """Creates and initializes a neurodamus run node
1573

1574
        As part of Initiazation it calls:
1575
         * load_targets
1576
         * compute_load_balance
1577
         * Build the circuit (cells, synapses, GJs)
1578
         * Add stimulus & replays
1579
         * Activate reports if requested
1580

1581
        Args:
1582
            config_file: The simulation config recipe file
1583
            logging_level: (int) Redefine the global logging level.
1584
                0 - Only warnings / errors
1585
                1 - Info messages (default)
1586
                2 - Verbose
1587
                3 - Debug messages
1588
            user_opts: Options to Neurodamus overriding the simulation config file
1589
        """
1590
        enable_reports = not user_opts.pop("disable_reports", False)
3✔
1591
        if logging_level is not None:
3✔
1592
            GlobalConfig.verbosity = logging_level
1✔
1593

1594
        Node.__init__(self, config_file, user_opts)
3✔
1595
        # Use the run_conf dict to avoid passing it around
1596
        self._run_conf["EnableReports"] = enable_reports
3✔
1597
        self._run_conf["AutoInit"] = auto_init
3✔
1598

1599
        if SimConfig.dry_run:
3✔
1600
            if self._is_ngv_run:
2✔
1601
                raise Exception("Dry run not available for ngv circuit")
1✔
1602
            self.load_targets()
2✔
1603
            self.create_cells()
2✔
1604
            self.create_synapses()
2✔
1605
            return
2✔
1606

1607
        if SimConfig.restore_coreneuron:
3✔
1608
            self._coreneuron_restore()
1✔
1609
        elif SimConfig.build_model:
3✔
1610
            self._instantiate_simulation()
3✔
1611

1612
        # Remove .SUCCESS file if exists
1613
        self._success_file = SimConfig.config_file + ".SUCCESS"
3✔
1614
        self._remove_file(self._success_file)
3✔
1615

1616
    # -
1617
    def _build_single_model(self):
4✔
1618
        """Construct the model for a single cycle.
1619

1620
        This process includes:
1621
        - Computing load balance across ranks.
1622
        - Building the circuit by creating cells and applying configurations.
1623
        - Establishing synaptic connections.
1624
        - Enabling replay mechanisms if applicable.
1625
        - Initializing the simulation if 'AutoInit' is enabled.
1626
        """
1627
        log_stage("================ CALCULATING LOAD BALANCE ================")
3✔
1628
        load_bal = self.compute_load_balance()
3✔
1629
        print_mem_usage()
3✔
1630

1631
        log_stage("==================== BUILDING CIRCUIT ====================")
3✔
1632
        self.create_cells(load_bal)
3✔
1633
        print_mem_usage()
3✔
1634

1635
        # Create connections
1636
        self.create_synapses()
3✔
1637
        print_mem_usage()
3✔
1638

1639
        log_stage("================ INSTANTIATING SIMULATION ================")
3✔
1640
        # Apply replay
1641
        self.enable_replay()
3✔
1642
        print_mem_usage()
3✔
1643

1644
        if self._run_conf["AutoInit"]:
3✔
1645
            self.init()
3✔
1646

1647
    # -
1648
    def init(self):
4✔
1649
        """Explicitly initialize, allowing users to make last changes before simulation"""
1650
        if self._sim_ready:
3✔
1651
            logging.warning("Simulation already initialized. Skip second init")
×
1652
            return
×
1653

1654
        log_stage("Creating connections in the simulator")
3✔
1655
        base_seed = self._run_conf.get("BaseSeed", 0)  # base seed for synapse RNG
3✔
1656
        for syn_manager in self._circuits.all_synapse_managers():
3✔
1657
            syn_manager.finalize(base_seed)
3✔
1658
        print_mem_usage()
3✔
1659

1660
        self.enable_stimulus()
3✔
1661
        print_mem_usage()
3✔
1662
        self.enable_modifications()
3✔
1663

1664
        if self._run_conf["EnableReports"]:
3✔
1665
            self.enable_reports()
3✔
1666
        print_mem_usage()
3✔
1667

1668
        self.sim_init()
3✔
1669
        assert self._sim_ready, "sim_init should have set this"
3✔
1670

1671
    @staticmethod
4✔
1672
    def _merge_filesdat(ncycles):
4✔
1673
        log_stage("Generating merged CoreNeuron files.dat")
1✔
1674
        coreneuron_datadir = CoreConfig.datadir
1✔
1675
        cn_entries = []
1✔
1676
        for i in range(ncycles):
1✔
1677
            log_verbose(f"files_{i}.dat")
1✔
1678
            filename = ospath.join(coreneuron_datadir, f"files_{i}.dat")
1✔
1679
            with open(filename, encoding="utf-8") as fd:
1✔
1680
                first_line = fd.readline()
1✔
1681
                nlines = int(fd.readline())
1✔
1682
                for _ in range(nlines):
1✔
1683
                    line = fd.readline()
1✔
1684
                    cn_entries.append(line)
1✔
1685

1686
        cnfilename = ospath.join(coreneuron_datadir, "files.dat")
1✔
1687
        with open(cnfilename, "w", encoding="utf-8") as cnfile:
1✔
1688
            cnfile.write(first_line)
1✔
1689
            cnfile.write(str(len(cn_entries)) + "\n")
1✔
1690
            cnfile.writelines(cn_entries)
1✔
1691

1692
        logging.info(" => %s files merged successfully", ncycles)
1✔
1693

1694
    def _coreneuron_restore(self):
4✔
1695
        """Restore CoreNEURON simulation state.
1696

1697
        This method sets up the CoreNEURON environment for restoring a simulation:
1698
        - load targets
1699
        - enable replay
1700
        - enable reports (this writes also report.conf)
1701
        - write sim.conf
1702
        - set and link coreneuron_datadir to the old restore one
1703
        """
1704
        log_stage(" =============== CORENEURON RESTORE ===============")
1✔
1705
        self.load_targets()
1✔
1706
        self.enable_replay()
1✔
1707
        if self._run_conf["EnableReports"]:
1✔
1708
            self.enable_reports()
1✔
1709

1710
        self._coreneuron_write_sim_config(corenrn_restore=True)
1✔
1711
        self._setup_coreneuron_datadir_from_restore()
1✔
1712

1713
        self._sim_ready = True
1✔
1714

1715
    @run_only_rank0
4✔
1716
    def _setup_coreneuron_datadir_from_restore(self):
4✔
1717
        """Configure the environment for restoring CoreNEURON.
1718

1719
        This involves:
1720
        - setting the coreneuron_datadir
1721
        - writing the sim.conf
1722
        - linking the old coreneuron_datadir to the new one
1723
        (in save_path or output_root)
1724
        """
1725
        self._coreneuron_configure_datadir(
1✔
1726
            corenrn_restore=True, coreneuron_direct_mode=SimConfig.coreneuron_direct_mode
1727
        )
1728

1729
        # handle coreneuron_input movements
1730
        src_datadir = Path(SimConfig.coreneuron_datadir_restore_path())
1✔
1731
        dst_datadir = Path(SimConfig.coreneuron_datadir_path())
1✔
1732
        # Check if source directory exists
1733
        if not src_datadir.exists():
1✔
1734
            raise FileNotFoundError(
×
1735
                f"Coreneuron input directory in `{src_datadir}` does not exist!"
1736
            )
1737

1738
        # If the source exists,
1739
        # remove the destination directory or symlink (if it exists)
1740
        if dst_datadir.exists():
1✔
1741
            if dst_datadir.is_symlink():
1✔
1742
                # Remove the symlink
1743
                dst_datadir.unlink()
×
1744
            else:
1745
                # Remove the folder if it's not a symlink
1746
                shutil.rmtree(dst_datadir)
1✔
1747

1748
        dst_datadir.symlink_to(src_datadir)
1✔
1749

1750
    def compute_n_cycles(self):
4✔
1751
        """Determine the number of model-building cycles
1752

1753
        It is based on configuration and system constraints.
1754
        """
1755
        n_cycles = SimConfig.modelbuilding_steps
3✔
1756
        # No multi-cycle. Trivial result, this is always possible
1757
        if n_cycles == 1:
3✔
1758
            return n_cycles
3✔
1759

1760
        target = self._target_manager.get_target(self._target_spec)
1✔
1761
        target_name = self._target_spec.name
1✔
1762
        max_cell_count = target.max_gid_count_per_population()
1✔
1763
        logging.info(
1✔
1764
            "Simulation target: %s, Max cell count per population: %d", target_name, max_cell_count
1765
        )
1766

1767
        if SimConfig.use_coreneuron and max_cell_count / n_cycles < MPI.size and max_cell_count > 0:
1✔
1768
            # coreneuron with no. ranks >> no. cells
1769
            # need to assign fake gids to artificial cells in empty threads
1770
            # during module building fake gids start from max_gid + 1
1771
            # currently not support engine plugin where target is loaded later
1772
            # We can always have only 1 cycle. coreneuron throws an error if a
1773
            # rank does not have cells during a cycle. There is a way to prevent
1774
            # this for unbalanced multi-populations but if more than one cycle
1775
            # happens on a rank without instantiating cells another error raises.
1776
            # Thus, the number of cycles should be rounded down; on the safe side
1777
            max_num_cycles = int(max_cell_count / MPI.size) or 1
1✔
1778
            if n_cycles > max_num_cycles:
1✔
1779
                logging.warning(
1✔
1780
                    "Your simulation is using multi-cycle without enough cells.\n"
1781
                    "  => Number of cycles has been automatically set to the max: %d",
1782
                    max_num_cycles,
1783
                )
1784
                n_cycles = max_num_cycles
1✔
1785
        return n_cycles
1✔
1786

1787
    def _build_model(self):
4✔
1788
        """Build the model
1789

1790
        Internally it calls _build_single_model, over multiple
1791
        cycles if necessary.
1792

1793
        Note: only relevant for coreNeuron
1794
        """
1795
        self._n_cycles = self.compute_n_cycles()
3✔
1796

1797
        # Without multi-cycle, it's a trivial model build.
1798
        # sub_targets is False
1799
        if self._n_cycles == 1:
3✔
1800
            self._build_single_model()
3✔
1801
            return
3✔
1802

1803
        logging.info("MULTI-CYCLE RUN: %s Cycles", self._n_cycles)
1✔
1804
        target = self._target_manager.get_target(self._target_spec)
1✔
1805
        TimerManager.archive(archive_name="Before Cycle Loop")
1✔
1806

1807
        PopulationNodes.freeze_offsets()
1✔
1808

1809
        if SimConfig.loadbal_mode != LoadBalanceMode.Memory:
1✔
1810
            sub_targets = target.generate_subtargets(self._n_cycles)
1✔
1811

1812
        for cycle_i in range(self._n_cycles):
1✔
1813
            logging.info("")
1✔
1814
            logging.info("-" * 60)
1✔
1815
            log_stage(f"==> CYCLE {cycle_i + 1} (OUT OF {self._n_cycles})")
1✔
1816
            logging.info("-" * 60)
1✔
1817

1818
            self.clear_model()
1✔
1819

1820
            if SimConfig.loadbal_mode != LoadBalanceMode.Memory:
1✔
1821
                for cur_target in sub_targets[cycle_i]:
1✔
1822
                    self._target_manager.register_target(cur_target)
1✔
1823
                    pop = next(iter(cur_target.population_names))
1✔
1824
                    for circuit in self._sonata_circuits.values():
1✔
1825
                        tmp_target_spec = TargetSpec(circuit.CircuitTarget)
1✔
1826
                        if tmp_target_spec.population == pop:
1✔
1827
                            tmp_target_spec.name = cur_target.name
1✔
1828
                            circuit.CircuitTarget = str(tmp_target_spec)
1✔
1829

1830
            self._cycle_i = cycle_i
1✔
1831
            self._build_single_model()
1✔
1832

1833
            # Move generated files aside (to be merged later)
1834
            if MPI.rank == 0:
1✔
1835
                base_filesdat = ospath.join(CoreConfig.datadir, "files")
1✔
1836
                os.rename(base_filesdat + ".dat", base_filesdat + f"_{cycle_i}.dat")
1✔
1837
            # Archive timers for this cycle
1838
            TimerManager.archive(archive_name=f"Cycle Run {cycle_i + 1:d}")
1✔
1839

1840
        if MPI.rank == 0:
1✔
1841
            self._merge_filesdat(self._n_cycles)
1✔
1842

1843
    # -
1844
    def _instantiate_simulation(self):
4✔
1845
        """Initialize the simulation
1846

1847
        - load targets
1848
        - check connections
1849
        - build the model
1850
        """
1851
        # Keep the initial RSS for the SHM file transfer calculations
1852
        self._initial_rss = SHMUtil.get_node_rss()
3✔
1853
        print_mem_usage()
3✔
1854

1855
        self.load_targets()
3✔
1856

1857
        # Check connection block configuration and raise warnings for overriding
1858
        # parameters
1859
        SimConfig.check_connections_configure(self._target_manager)
3✔
1860

1861
        self._build_model()
3✔
1862

1863
    # -
1864
    @timeit(name="finished Run")
4✔
1865
    def run(self, cleanup=True):
4✔
1866
        """Prepares and launches the simulation according to the loaded config.
1867
        If '--only-build-model' option is set, simulation is skipped.
1868

1869
        Args:
1870
            cleanup (bool): Free up the model and intermediate files [default: true]
1871
                Rationale is: the high-level run() method it's typically for a
1872
                one shot simulation so we should cleanup. If not it can be set to False
1873
        """
1874
        if SimConfig.dry_run:
3✔
1875
            log_stage("============= DRY RUN (SKIP SIMULATION) =============")
2✔
1876
            self._dry_run_stats.display_total()
2✔
1877
            self._dry_run_stats.display_node_suggestions()
2✔
1878
            ranks = self._dry_run_stats.get_num_target_ranks(SimConfig.num_target_ranks)
2✔
1879
            self._dry_run_stats.collect_all_mpi()
2✔
1880
            try:
2✔
1881
                self._dry_run_stats.distribute_cells_with_validation(
2✔
1882
                    ranks, SimConfig.modelbuilding_steps
1883
                )
1884
            except RuntimeError:
×
1885
                logging.exception("Dry run failed")
×
1886
            return
2✔
1887

1888
        if not SimConfig.simulate_model:
3✔
1889
            self.sim_init()
1✔
1890
            log_stage("======== [SKIPPED] SIMULATION (MODEL BUILD ONLY) ========")
1✔
1891
        elif not SimConfig.build_model:
3✔
1892
            log_stage("============= SIMULATION (SKIP MODEL BUILD) =============")
1✔
1893
            # coreneuron needs the report file created
1894
            self._run_coreneuron()
1✔
1895
        else:
1896
            log_stage("======================= SIMULATION =======================")
3✔
1897
            self.run_all()
3✔
1898

1899
        # Create SUCCESS file if the simulation finishes successfully
1900
        self._touch_file(self._success_file)
3✔
1901
        logging.info("Finished! Creating .SUCCESS file: '%s'", self._success_file)
3✔
1902

1903
        # Save seclamp holding currents for gap junction user corrections
1904
        if (
3✔
1905
            gj_target_pop := SimConfig.beta_features.get("gapjunction_target_population")
1906
        ) and SimConfig.beta_features.get("procedure_type") == "find_holding_current":
1907
            gj_manager = self._circuits.get_edge_manager(
1✔
1908
                gj_target_pop, gj_target_pop, GapJunctionManager
1909
            )
1910
            gj_manager.save_seclamp()
1✔
1911

1912
        self.move_dumpcellstates_to_output_root()
3✔
1913

1914
        if cleanup:
3✔
1915
            self.cleanup()
3✔
1916

1917
    @staticmethod
4✔
1918
    @run_only_rank0
4✔
1919
    def _remove_file(file_name):
4✔
1920
        import contextlib
3✔
1921

1922
        with contextlib.suppress(FileNotFoundError):
3✔
1923
            os.remove(file_name)
3✔
1924

1925
    @staticmethod
4✔
1926
    @run_only_rank0
4✔
1927
    def _touch_file(file_name):
4✔
1928
        with open(file_name, "a", encoding="utf-8"):
3✔
1929
            os.utime(file_name, None)
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc