• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 15738036488

18 Jun 2025 04:10PM UTC coverage: 48.249% (+1.5%) from 46.79%
15738036488

Pull #815

github

9b495a
web-flow
Merge fd70c12e3 into 5c537cbb7
Pull Request #815: extended tests for photometry copier

23 of 32 new or added lines in 1 file covered. (71.88%)

1106 existing lines in 22 files now uncovered.

4408 of 9136 relevant lines covered (48.25%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.5
/iblrig/base_tasks.py
1
"""
2
Commonalities for all tasks.
3

4
This module provides hardware mixins that can be used together with BaseSession to compose tasks.
5
This module tries to exclude task related logic.
6
"""
7

8
import argparse
2✔
9
import contextlib
2✔
10
import datetime
2✔
11
import importlib.metadata
2✔
12
import inspect
2✔
13
import json
2✔
14
import logging
2✔
15
import signal
2✔
16
import sys
2✔
17
import time
2✔
18
import traceback
2✔
19
from abc import ABC, abstractmethod
2✔
20
from collections import OrderedDict
2✔
21
from collections.abc import Callable
2✔
22
from pathlib import Path
2✔
23
from typing import Protocol, final
2✔
24

25
import numpy as np
2✔
26
import pandas as pd
2✔
27
import yaml
2✔
28
from pythonosc import udp_client
2✔
29

30
import ibllib.io.session_params as ses_params
2✔
31
import iblrig.path_helper
2✔
32
import pybpodapi
2✔
33
from ibllib.oneibl.registration import IBLRegistrationClient
2✔
34
from iblrig import net, path_helper, sound
2✔
35
from iblrig.constants import BASE_PATH, BONSAI_EXE, PYSPIN_AVAILABLE
2✔
36
from iblrig.frame2ttl import Frame2TTL
2✔
37
from iblrig.hardware import DTYPE_AMBIENT_SENSOR_BIN, SOFTCODE, Bpod, RotaryEncoderModule, sound_device_factory
2✔
38
from iblrig.hifi import HiFi
2✔
39
from iblrig.path_helper import load_pydantic_yaml
2✔
40
from iblrig.pydantic_definitions import HardwareSettings, RigSettings, TrialDataModel
2✔
41
from iblrig.tools import call_bonsai, get_number
2✔
42
from iblrig.transfer_experiments import BehaviorCopier, VideoCopier
2✔
43
from iblrig.valve import Valve
2✔
44
from iblutil.io import binary
2✔
45
from iblutil.io.net.base import ExpMessage
2✔
46
from iblutil.spacer import Spacer
2✔
47
from iblutil.util import Bunch, flatten, setup_logger
2✔
48
from one.alf.io import next_num_folder
2✔
49
from one.api import ONE, OneAlyx
2✔
50
from pybpodapi.protocol import StateMachine
2✔
51

52
OSC_CLIENT_IP = '127.0.0.1'
2✔
53

54
log = logging.getLogger(__name__)
2✔
55

56

57
class HasBpod(Protocol):
2✔
58
    bpod: Bpod
2✔
59

60

61
class BaseSession(ABC):
2✔
62
    """
63
    Abstract base class for all sessions.
64

65
    This class defines the common interface and shared logic for sessions,
66
    including session initialization, hardware management, Alyx registration,
67
    and mixin execution. Subclasses should implement the abstract methods to
68
    define task-specific behavior and hardware startup.
69
    """
70

71
    version: str | None = None
2✔
72
    """Task version string."""
2✔
73
    # protocol_name: str | None = None
74
    # """The name of the task protocol (NB: avoid spaces)."""
75
    base_parameters_file: Path | None = None
2✔
76
    """Path: A YAML file containing base, default task parameters."""
2✔
77
    is_mock: bool = False
2✔
78
    """Wether the session is a mock session."""
2✔
79
    logger: logging.Logger | None = None
2✔
80
    """Logger instance used solely to keep track of log level passed to constructor."""
2✔
81
    experiment_description: dict = {}
2✔
82
    """The experiment description."""
2✔
83
    extractor_tasks: list | None = None
2✔
84
    """An optional list of pipeline task class names to instantiate when preprocessing task data."""
2✔
85

86
    TrialDataModel: type[TrialDataModel]
2✔
87

88
    @property
2✔
89
    @abstractmethod
2✔
90
    def protocol_name(self) -> str: ...
2✔
91

92
    def __init__(
2✔
93
        self,
94
        subject: str,
95
        task_parameter_file: str | Path | None = None,
96
        file_hardware_settings: str | Path | None = None,
97
        hardware_settings: dict | None | HardwareSettings = None,
98
        file_iblrig_settings=None,
99
        iblrig_settings: RigSettings = None,
100
        one=None,
101
        interactive=True,
102
        projects=None,
103
        procedures=None,
104
        stub=None,
105
        subject_weight_grams=None,
106
        append=False,
107
        wizard=False,
108
        log_level='INFO',
109
        **kwargs,
110
    ):
111
        """
112
        Parameters
113
        ----------
114
        subject : str
115
            The subject nickname. Required.
116
        task_parameter_file : str or Path, optional
117
            An optional path to the task_parameters.yaml file.
118
        file_hardware_settings : str or Path, optional
119
            Name of the hardware file in the settings folder, or full file path.
120
        hardware_settings : dict, optional
121
            An optional dictionary of hardware settings. Keys will override any keys in the file.
122
        file_iblrig_settings : str or Path, optional
123
            Name of the iblrig file in the settings folder, or full file path.
124
        iblrig_settings : dict, optional
125
            An optional dictionary of iblrig settings. Keys will override any keys in the file.
126
        one : ONE, optional
127
            An optional instance of ONE.
128
        interactive : bool, optional
129
            If True, enables interactive mode.
130
        projects : list, optional
131
            An optional list of Alyx protocols.
132
        procedures : list, optional
133
            An optional list of Alyx procedures.
134
        subject_weight_grams : float, optional
135
            Weight of the subject in grams.
136
        stub : str or Path, optional
137
            A full path to an experiment description file containing experiment information.
138
        append : bool, optional
139
            If True, append to the latest existing session of the same subject for the same day.
140
        """
141
        self.extractor_tasks = getattr(self, 'extractor_tasks', None)
2✔
142
        self._logger = None
2✔
143
        self._setup_loggers(level=log_level)
2✔
144
        if not isinstance(self, EmptySession):
2✔
145
            log.info(f'iblrig version {iblrig.__version__}')
2✔
146
            log.info(f'pybpod version {pybpodapi.__version__}')
2✔
147
            log.info(
2✔
148
                f'Session protocol: {self.protocol_name} '
149
                f'({f"version {self.version})" if self.version is not None else "undefined version"})'
150
            )
151

152
        log.info(f'Session call: {" ".join(sys.argv)}')
2✔
153
        self.interactive = interactive
2✔
154
        self._one = one
2✔
155
        self.init_datetime = datetime.datetime.now()
2✔
156

157
        # loads in the settings: first load the files, then update with the input argument if provided
158
        self._load_settings(
2✔
159
            file_hardware_settings=file_hardware_settings,
160
            hardware_settings=hardware_settings,
161
            file_iblrig_settings=file_iblrig_settings,
162
            iblrig_settings=iblrig_settings,
163
        )
164
        self.wizard = wizard
2✔
165

166
        # Load the tasks settings, from the task folder or override with the input argument
167
        self.task_params = self.read_task_parameter_files(task_parameter_file)
2✔
168

169
        self.session_info = Bunch(
2✔
170
            {
171
                'NTRIALS': 0,
172
                'NTRIALS_CORRECT': 0,
173
                'PROCEDURES': procedures,
174
                'PROJECTS': projects,
175
                'SESSION_START_TIME': self.init_datetime.isoformat(),
176
                'SESSION_END_TIME': None,
177
                'SESSION_NUMBER': 0,
178
                'SUBJECT_NAME': subject,
179
                'SUBJECT_WEIGHT': subject_weight_grams,
180
                'TOTAL_WATER_DELIVERED': 0,
181
            }
182
        )
183
        # Executes mixins init methods
184
        self._execute_mixins_shared_function('init_mixin')
2✔
185
        self.paths = self._init_paths(append=append)
2✔
186
        if not isinstance(self, EmptySession):
2✔
187
            log.info(f'Session raw data: {self.paths.SESSION_RAW_DATA_FOLDER}')
2✔
188
        # Prepare the experiment description dictionary
189
        self.experiment_description = self.make_experiment_description_dict(
2✔
190
            self.protocol_name,
191
            self.paths.get('TASK_COLLECTION'),
192
            procedures,
193
            projects,
194
            self.hardware_settings,
195
            stub,
196
            extractors=self.extractor_tasks,
197
        )
198

199
    def _load_settings(
2✔
200
        self,
201
        file_hardware_settings: Path | str | None = None,
202
        hardware_settings: dict | None = None,
203
        file_iblrig_settings: Path | str | None = None,
204
        iblrig_settings: dict | None = None,
205
        **_,
206
    ):
207
        self.hardware_settings: HardwareSettings = load_pydantic_yaml(HardwareSettings, file_hardware_settings)
2✔
208
        if hardware_settings is not None:
2✔
209
            self.hardware_settings.update(hardware_settings)
2✔
210
            HardwareSettings.model_validate(self.hardware_settings)
2✔
211
        self.iblrig_settings: RigSettings = load_pydantic_yaml(RigSettings, file_iblrig_settings)
2✔
212
        if iblrig_settings is not None:
2✔
213
            self.iblrig_settings.update(iblrig_settings)
2✔
214
            RigSettings.model_validate(self.iblrig_settings)
2✔
215

216
    @classmethod
2✔
217
    def get_task_file(cls) -> Path:
2✔
218
        """
219
        Get the path to the task's python file.
220

221
        Returns
222
        -------
223
        Path
224
            The path to the task file.
225
        """
226
        return Path(inspect.getfile(cls))
2✔
227

228
    @classmethod
2✔
229
    def get_task_directory(cls) -> Path:
2✔
230
        """
231
        Get the path to the task's directory.
232

233
        Returns
234
        -------
235
        Path
236
            The path to the task's directory.
237
        """
238
        return cls.get_task_file().parent
2✔
239

240
    @classmethod
2✔
241
    def read_task_parameter_files(cls, task_parameter_file: str | Path | None = None) -> Bunch:
2✔
242
        """
243
        Get the task's parameters from the various YAML files in the hierarchy.
244

245
        Parameters
246
        ----------
247
        task_parameter_file : str or Path, optional
248
            Path to override the task parameter file
249

250
        Returns
251
        -------
252
        Bunch
253
            Task parameters
254
        """
255
        # Load the tasks settings, from the task folder or override with the input argument
256
        base_parameters_files = [task_parameter_file or cls.get_task_directory().joinpath('task_parameters.yaml')]
2✔
257

258
        # loop through the task hierarchy to gather parameter files
259
        for c in cls.__mro__:
2✔
260
            base_file = getattr(c, 'base_parameters_file', None)
2✔
261
            if base_file is not None:
2✔
262
                base_parameters_files.append(base_file)
2✔
263

264
        # remove list duplicates while preserving order, we want the highest order first
265
        base_parameters_files = list(reversed(list(dict.fromkeys(base_parameters_files))))
2✔
266

267
        # loop through files and update the dictionary, the latest files in the hierarchy have precedence
268
        task_params = dict()
2✔
269
        for param_file in base_parameters_files:
2✔
270
            if Path(param_file).exists():
2✔
271
                with open(param_file) as fp:
2✔
272
                    params = yaml.safe_load(fp)
2✔
273
                if params is not None:
2✔
274
                    task_params.update(params)
2✔
275

276
        # at last sort the dictionary so itÅ› easier for a human to navigate the many keys, return as a Bunch
277
        return Bunch(sorted(task_params.items()))
2✔
278

279
    def _init_paths(self, append: bool = False) -> Bunch:
2✔
280
        r"""
281
        Initialize session paths.
282

283
        Parameters
284
        ----------
285
        append : bool
286
            Iterate task collection within today's most recent session folder for the selected subject, instead of
287
            iterating session number.
288

289
        Returns
290
        -------
291
        Bunch
292
            Bunch with keys:
293

294
            *   BONSAI: full path to the bonsai executable
295
                `C:\iblrigv8\Bonsai\Bonsai.exe`
296
            *   VISUAL_STIM_FOLDER: full path to the visual stimulus folder
297
                `C:\iblrigv8\visual_stim`
298
            *   LOCAL_SUBJECT_FOLDER: full path to the local subject folder
299
                `C:\iblrigv8_data\mainenlab\Subjects`
300
            *   REMOTE_SUBJECT_FOLDER: full path to the remote subject folder
301
                `Y:\Subjects`
302
            *   SESSION_FOLDER: full path to the current session:
303
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001`
304
            *   TASK_COLLECTION: folder name of the current task
305
                `raw_task_data_00`
306
            *   SESSION_RAW_DATA_FOLDER: concatenation of the session folder and the task collection.
307
                This is where the task data gets written
308
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001\raw_task_data_00`
309
            *   DATA_FILE_PATH: contains the bpod trials
310
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001\raw_task_data_00\_iblrig_taskData.raw.jsonable`
311
            *   AMBIENT_FILE_PATH: contains the ambient sensor data
312
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001\raw_task_data_00\_iblrig_ambientSensorData.raw.bin`
313
            *   SETTINGS_FILE_PATH: contains the task settings
314
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001\raw_task_data_00\_iblrig_taskSettings.raw.json`
315
        """
316
        rig_computer_paths = path_helper.get_local_and_remote_paths(
2✔
317
            local_path=self.iblrig_settings.iblrig_local_data_path,
318
            remote_path=self.iblrig_settings.iblrig_remote_data_path,
319
            lab=self.iblrig_settings.ALYX_LAB,
320
            iblrig_settings=self.iblrig_settings,
321
        )
322
        paths = Bunch({'IBLRIG_FOLDER': BASE_PATH})
2✔
323
        paths.BONSAI = BONSAI_EXE
2✔
324
        paths.VISUAL_STIM_FOLDER = BASE_PATH.joinpath('visual_stim')
2✔
325
        paths.LOCAL_SUBJECT_FOLDER = rig_computer_paths['local_subjects_folder']
2✔
326
        paths.REMOTE_SUBJECT_FOLDER = rig_computer_paths['remote_subjects_folder']
2✔
327
        # initialize the session path
328
        date_folder = paths.LOCAL_SUBJECT_FOLDER.joinpath(
2✔
329
            self.session_info.SUBJECT_NAME, self.session_info.SESSION_START_TIME[:10]
330
        )
331
        if append:
2✔
332
            # this is the case where we append a new protocol to an existing session
333
            todays_sessions = sorted(filter(Path.is_dir, date_folder.glob('*')), reverse=True)
2✔
334
            assert len(todays_sessions) > 0, f'Trying to chain a protocol, but no session folder found in {date_folder}'
2✔
335
            paths.SESSION_FOLDER = todays_sessions[0]
2✔
336
            paths.TASK_COLLECTION = iblrig.path_helper.iterate_collection(paths.SESSION_FOLDER)
2✔
337
            if self.hardware_settings.get('MAIN_SYNC', False) and not paths.TASK_COLLECTION.endswith('00'):
2✔
UNCOV
338
                """
×
339
                Chained protocols make little sense when Bpod is the main sync as there is no
340
                continuous acquisition between protocols.  Only one sync collection can be defined in
341
                the experiment description file.
342
                If you are running experiments with an ephys rig (nidq) or an external daq, you should
343
                correct the MAIN_SYNC parameter in the hardware settings file in ./settings/hardware_settings.yaml
344
                """
UNCOV
345
                raise RuntimeError('Chained protocols not supported for bpod-only sessions')
×
346
        else:
347
            # in this case the session path is created from scratch
348
            paths.SESSION_FOLDER = date_folder / next_num_folder(date_folder)
2✔
349
            paths.TASK_COLLECTION = iblrig.path_helper.iterate_collection(paths.SESSION_FOLDER)
2✔
350

351
        self.session_info.SESSION_NUMBER = int(paths.SESSION_FOLDER.name)
2✔
352
        paths.SESSION_RAW_DATA_FOLDER = paths.SESSION_FOLDER.joinpath(paths.TASK_COLLECTION)
2✔
353
        paths.DATA_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskData.raw.jsonable')
2✔
354
        paths.AMBIENT_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_ambientSensorData.raw.bin')
2✔
355
        paths.SETTINGS_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskSettings.raw.json')
2✔
356
        return paths
2✔
357

358
    @property
2✔
359
    def exp_ref(self):
2✔
360
        """Construct an experiment reference string from the session info attribute."""
361
        subject, date, number = (self.session_info[k] for k in ('SUBJECT_NAME', 'SESSION_START_TIME', 'SESSION_NUMBER'))
2✔
362
        if not all([subject, date, number]):
2✔
363
            return None
2✔
364
        return self.one.dict2ref(dict(subject=subject, date=date[:10], sequence=str(number)))
2✔
365

366
    def _setup_loggers(self, level='INFO', level_bpod='WARNING', file=None):
2✔
367
        self._logger = setup_logger('iblrig', level=level, file=file)  # logger attr used by create_session to determine log level
2✔
368
        setup_logger('pybpodapi', level=level_bpod, file=file)
2✔
369

370
    @staticmethod
2✔
371
    def _remove_file_loggers():
2✔
372
        for logger_name in ['iblrig', 'pybpodapi']:
2✔
373
            logger = logging.getLogger(logger_name)
2✔
374
            file_handlers = [fh for fh in logger.handlers if isinstance(fh, logging.FileHandler)]
2✔
375
            for fh in file_handlers:
2✔
376
                fh.close()
2✔
377
                logger.removeHandler(fh)
2✔
378

379
    @staticmethod
2✔
380
    def make_experiment_description_dict(
2✔
381
        task_protocol: str,
382
        task_collection: str,
383
        procedures: list = None,
384
        projects: list = None,
385
        hardware_settings: dict | HardwareSettings = None,
386
        stub: Path = None,
387
        extractors: list = None,
388
        camera_config: str = None,
389
    ):
390
        """
391
        Construct an experiment description dictionary.
392

393
        Parameters
394
        ----------
395
        task_protocol : str
396
            The task protocol name, e.g. _ibl_trainingChoiceWorld2.0.0.
397
        task_collection : str
398
            The task collection name, e.g. raw_task_data_00.
399
        procedures : list
400
            An optional list of Alyx procedures.
401
        projects : list
402
            An optional list of Alyx protocols.
403
        hardware_settings : dict
404
            An optional dict of hardware devices, loaded from the hardware_settings.yaml file.
405
        stub : dict
406
            An optional experiment description stub to update.
407
        extractors: list
408
            An optional list of extractor names for the task.
409
        camera_config : str
410
            The camera configuration name in the hardware settings. Defaults to the first key in
411
            'device_cameras'.
412

413
        Returns
414
        -------
415
        dict
416
            The experiment description.
417
        """
418
        description = ses_params.read_params(stub) if stub else {}
2✔
419

420
        # Add hardware devices
421
        if hardware_settings is not None:
2✔
422
            if isinstance(hardware_settings, HardwareSettings):
2✔
423
                hardware_settings = hardware_settings.model_dump()
2✔
424
            devices = {}
2✔
425
            cams = hardware_settings.get('device_cameras', None)
2✔
426
            if cams:
2✔
427
                devices['cameras'] = {}
2✔
428
                camera_config = camera_config or next((k for k in cams), {})
2✔
429
                devices.update(VideoCopier.config2stub(cams[camera_config])['devices'])
2✔
430
            if hardware_settings.get('device_microphone', None):
2✔
431
                devices['microphone'] = {'microphone': {'collection': task_collection, 'sync_label': 'audio'}}
2✔
432
            ses_params.merge_params(description, {'devices': devices})
2✔
433

434
        # Add projects and procedures
435
        description['procedures'] = list(set(description.get('procedures', []) + (procedures or [])))
2✔
436
        description['projects'] = list(set(description.get('projects', []) + (projects or [])))
2✔
437
        is_main_sync = (hardware_settings or {}).get('MAIN_SYNC', False)
2✔
438
        # Add sync key if required
439
        if is_main_sync and 'sync' not in description:
2✔
440
            description['sync'] = {
2✔
441
                'bpod': {'collection': task_collection, 'acquisition_software': 'pybpod', 'extension': '.jsonable'}
442
            }
443
        # Add task
444
        task = {task_protocol: {'collection': task_collection}}
2✔
445
        if not is_main_sync:
2✔
446
            task[task_protocol]['sync_label'] = 'bpod'
2✔
447
        if extractors:
2✔
448
            assert isinstance(extractors, list), 'extractors parameter must be a list of strings'
2✔
449
            task[task_protocol].update({'extractors': extractors})
2✔
450
        if 'tasks' not in description:
2✔
451
            description['tasks'] = [task]
2✔
452
        else:
453
            description['tasks'].append(task)
2✔
454
        return description
2✔
455

456
    def _make_task_parameters_dict(self):
2✔
457
        """
458
        Create dictionary that will be saved to the settings json file for extraction.
459

460
        Returns
461
        -------
462
        dict
463
            A dictionary that will be saved to the settings json file for extraction.
464
        """
465
        output_dict = dict(self.task_params)  # Grab parameters from task_params session
2✔
466
        output_dict.update(self.hardware_settings.model_dump())  # Update dict with hardware settings from session
2✔
467
        output_dict.update(dict(self.session_info))  # Update dict with session_info (subject, procedure, projects)
2✔
468
        patch_dict = {  # Various values added to ease transition from iblrig v7 to v8, different home may be desired
2✔
469
            'IBLRIG_VERSION': iblrig.__version__,
470
            'PYBPOD_PROTOCOL': self.protocol_name,
471
            'ALYX_USER': self.iblrig_settings.ALYX_USER,
472
            'ALYX_LAB': self.iblrig_settings.ALYX_LAB,
473
        }
474
        with contextlib.suppress(importlib.metadata.PackageNotFoundError):
2✔
475
            patch_dict['PROJECT_EXTRACTION_VERSION'] = importlib.metadata.version('project_extraction')
2✔
476
        if self.version is not None:
2✔
UNCOV
477
            patch_dict['TASK_VERSION'] = self.version
×
478
        output_dict.update(patch_dict)
2✔
479
        return output_dict
2✔
480

481
    def save_task_parameters_to_json_file(self, destination_folder: Path | None = None) -> Path:
2✔
482
        """
483
        Collects the various settings and parameters of the session and outputs them to a JSON file.
484

485
        Returns
486
        -------
487
        Path
488
            Path to the resultant JSON file
489
        """
490
        output_dict = self._make_task_parameters_dict()
2✔
491
        if destination_folder:
2✔
UNCOV
492
            json_file = destination_folder.joinpath('_iblrig_taskSettings.raw.json')
×
493
        else:
494
            json_file = self.paths['SETTINGS_FILE_PATH']
2✔
495
        json_file.parent.mkdir(parents=True, exist_ok=True)
2✔
496
        with open(json_file, 'w') as outfile:
2✔
497
            json.dump(output_dict, outfile, indent=4, sort_keys=True, default=str)  # converts datetime objects to string
2✔
498
        return json_file  # PosixPath
2✔
499

500
    @final
2✔
501
    def save_trial_data_to_json(self, bpod_data: dict, validate: bool = True):
2✔
502
        """Validate and save trial data.
503

504
        This method retrieve's the current trial's data from the trial_table and validates it using a Pydantic model
505
        (self.TrialDataDefinition). In merges in the trial's bpod_data dict and appends everything to the session's
506
        JSON data file.
507

508
        Parameters
509
        ----------
510
        bpod_data : dict
511
            Trial data returned from pybpod.
512
        validate : bool, optional
513
            Validate trial's data using Pydantic model. Default: True.
514
        """
515
        # get trial's data as a dict
516
        trial_data = self.trials_table.iloc[self.trial_num].to_dict()
2✔
517

518
        if validate:
2✔
519
            # warn about entries not covered by pydantic model
520
            if trial_data.get('trial_num', 1) == 0:
2✔
521
                for key in set(trial_data.keys()) - set(self.TrialDataModel.model_fields) - {'index'}:
2✔
522
                    log.warning(
2✔
523
                        f'Key "{key}" in trial_data is missing from TrialDataModel - '
524
                        f'its value ({trial_data[key]}) will not be validated.'
525
                    )
526

527
            # validate by passing through pydantic model
528
            trial_data = self.TrialDataModel.model_validate(trial_data).model_dump()
2✔
529

530
        # add bpod_data as 'behavior_data'
531
        trial_data['behavior_data'] = bpod_data
2✔
532

533
        # write json data to file
534
        with open(self.paths['DATA_FILE_PATH'], 'a') as fp:
2✔
535
            fp.write(json.dumps(trial_data) + '\n')
2✔
536
        log.debug(f'Trial data dumped to `{self.paths["DATA_FILE_PATH"].name}`')
2✔
537

538
    @property
2✔
539
    def one(self):
2✔
540
        """ONE getter."""
541
        if self._one is None:
2✔
542
            if self.iblrig_settings['ALYX_URL'] is None:
2✔
543
                return
2✔
UNCOV
544
            info_str = (
×
545
                f'alyx client with user name {self.iblrig_settings["ALYX_USER"]} '
546
                + f'and url: {self.iblrig_settings["ALYX_URL"]}'
547
            )
548
            try:
×
549
                self._one = ONE(
×
550
                    base_url=str(self.iblrig_settings['ALYX_URL']),
551
                    username=self.iblrig_settings['ALYX_USER'],
552
                    mode='remote',
553
                    cache_rest=None,
554
                )
555
                log.info('instantiated ' + info_str)
×
556
            except Exception:
×
557
                log.error(traceback.format_exc())
×
558
                log.error('could not connect to ' + info_str)
×
559
        return self._one
2✔
560

561
    def register_to_alyx(self):
2✔
562
        """
563
        Registers the session to Alyx.
564

565
        This registers the session using the IBLRegistrationClient class.  This uses the settings
566
        file(s) and experiment description file to extract the session data.  This may be called
567
        any number of times and if the session record already exists in Alyx it will be updated.
568
        If session registration fails, it will be done before extraction in the ibllib pipeline.
569

570
        Note that currently the subject weight is registered once and only once.  The recorded
571
        weight of the first protocol run is used.
572

573
        Water administrations are added separately by this method: it is expected that
574
        `register_session` is first called with no recorded total water. This method will then add
575
        a water administration each time it is called, and should therefore be called only once
576
        after each protocol is run. If water administration registration fails for all protocols,
577
        this will be done before extraction in the ibllib pipline, however, if a water
578
        administration is successfully registered for one protocol and subsequent ones fail to
579
        register, these will not be added before extraction in ibllib and therefore must be
580
        manually added to Alyx.
581

582
        Returns
583
        -------
584
        dict
585
            The registered session record.
586

587
        See Also
588
        --------
589
        :external+iblenv:meth:`ibllib.oneibl.registration.IBLRegistrationClient.register_session` - The registration method.
590
        """
591
        if self.session_info['SUBJECT_NAME'] in ('iblrig_test_subject', 'test', 'test_subject'):
2✔
592
            log.warning('Not registering test subject to Alyx')
2✔
593
            return
2✔
594
        if not self.one or self.one.offline:
2✔
595
            return
2✔
596
        try:
2✔
597
            client = IBLRegistrationClient(self.one)
2✔
598
            ses, _ = client.register_session(self.paths.SESSION_FOLDER, register_reward=False)
2✔
599
        except Exception:
2✔
600
            log.error(traceback.format_exc())
2✔
601
            log.error('Could not register session to Alyx')
2✔
602
            return
2✔
603
        # add the water administration if there was water administered
604
        try:
2✔
605
            if self.session_info['TOTAL_WATER_DELIVERED']:
2✔
606
                wa = client.register_water_administration(
2✔
607
                    self.session_info.SUBJECT_NAME,
608
                    self.session_info['TOTAL_WATER_DELIVERED'] / 1000,
609
                    session=ses['url'][-36:],
610
                    water_type=self.task_params.get('REWARD_TYPE', None),
611
                )
612
                log.info(f'Water administered registered in Alyx database: {ses["subject"]}, {wa["water_administered"]}mL')
2✔
613
        except Exception:
2✔
614
            log.error(traceback.format_exc())
2✔
615
            log.error('Could not register water administration to Alyx')
2✔
616
            return
2✔
617
        return ses
2✔
618

619
    def _execute_mixins_shared_function(self, pattern: str) -> None:
2✔
620
        """
621
        Execute all methods of the class whose names start with the specified pattern.
622

623
        This method loops through all callable methods of the class that begin with the given pattern and invokes each
624
        of them in the order they are found. It is useful for executing a set of related methods that share a common
625
        naming convention, such as initialization, starting, stopping, or cleanup routines.
626

627
        Parameters
628
        ----------
629
        pattern : str
630
            The prefix pattern to match method names. Only methods whose names start with this pattern will be executed.
631
            Examples: 'init_mixin', 'start_mixin', 'stop_mixin', or 'cleanup_mixin'.
632
        """
633
        methods = [getattr(self, m) for m in dir(self) if m.startswith(pattern) and callable(getattr(self, m))]
2✔
634
        for method in methods:
2✔
635
            method()
2✔
636

637
    @property
2✔
638
    def time_elapsed(self):
2✔
639
        return datetime.datetime.now() - self.init_datetime
2✔
640

641
    def mock(self):
2✔
642
        self.is_mock = True
2✔
643

644
    def create_session(self):
2✔
645
        """
646
        Create the session path and save json parameters in the task collection folder.
647

648
        This will also create the protocol folder.
649
        """
650
        self.paths['TASK_PARAMETERS_FILE'] = self.save_task_parameters_to_json_file()
2✔
651
        # enable file logging
652
        logfile = self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_ibl_log.info-acquisition.log')
2✔
653
        self._setup_loggers(level=self._logger.level, file=logfile)
2✔
654
        # copy the acquisition stub to the remote session folder
655
        sc = BehaviorCopier(self.paths.SESSION_FOLDER, remote_subjects_folder=self.paths['REMOTE_SUBJECT_FOLDER'])
2✔
656
        sc.initialize_experiment(self.experiment_description, overwrite=False)
2✔
657
        self.register_to_alyx()
2✔
658

659
    def run(self):
2✔
660
        """
661
        Common pre-run instructions for all tasks.
662

663
        Defines sigint handler for a graceful exit.
664
        """
665
        # here we make sure we connect to the hardware before writing the session to disk
666
        # this prevents from incrementing endlessly the session number if the hardware fails to connect
667
        self.start_hardware()
2✔
668
        self.create_session()
2✔
669
        # When not running the first chained protocol, we can skip the weighing dialog
670
        first_protocol = int(self.paths.SESSION_RAW_DATA_FOLDER.name.split('_')[-1]) == 0
2✔
671

672
        # get subject weight
673
        if self.session_info.SUBJECT_WEIGHT is None and self.interactive and first_protocol:
2✔
674
            self.session_info.SUBJECT_WEIGHT = get_number('Subject weight (g): ', float, lambda x: x > 0)
2✔
675

676
        def sigint_handler(*args, **kwargs):
2✔
677
            # create a signal handler for a graceful exit: create a stop flag in the session folder
678
            self.paths.SESSION_FOLDER.joinpath('.stop').touch()
×
UNCOV
679
            log.critical('SIGINT signal detected, will exit at the end of the trial')
×
680

681
        # if upon starting there is a flag just remove it, this is to prevent killing a session in the egg
682
        if self.paths.SESSION_FOLDER.joinpath('.stop').exists():
2✔
UNCOV
683
            self.paths.SESSION_FOLDER.joinpath('.stop').unlink()
×
684

685
        signal.signal(signal.SIGINT, sigint_handler)
2✔
686
        self._run()  # runs the specific task logic i.e. trial loop etc...
2✔
687
        # post task instructions
688
        log.critical('Graceful exit')
2✔
689
        log.info(f'Session {self.paths.SESSION_RAW_DATA_FOLDER}')
2✔
690
        self.session_info.SESSION_END_TIME = datetime.datetime.now().isoformat()
2✔
691

692
        # get poop count
693
        if self.interactive and not self.wizard:
2✔
694
            self.session_info.POOP_COUNT = get_number('Droppings count: ', int, lambda x: x >= 0)
2✔
695

696
        self.save_task_parameters_to_json_file()
2✔
697
        self._execute_mixins_shared_function('stop_mixin')
2✔
698
        self._execute_mixins_shared_function('cleanup_mixin')
2✔
699
        self.register_to_alyx()
2✔
700

701
    @abstractmethod
2✔
702
    def start_hardware(self):
2✔
703
        """
704
        Start the hardware.
705

706
        This method doesn't explicitly start the mixins as the order has to be defined in the child classes.
707
        This needs to be implemented in the child classes, and should start and connect to all hardware pieces.
708
        """
UNCOV
709
        ...
×
710

711
    @abstractmethod
2✔
712
    def _run(self): ...
2✔
713

714
    @staticmethod
2✔
715
    def extra_parser():
2✔
716
        """
717
        Specify extra kwargs arguments to expose to the user prior running the task.
718

719
        Make sure you instantiate the parser.
720

721
        Returns
722
        -------
723
        argparse.ArgumentParser
724
            The extra parser instance.
725
        """
726
        parser = argparse.ArgumentParser(add_help=False)
2✔
727
        return parser
2✔
728

729

730
# this class gets called to get the path constructor utility to predict the session path
731
class EmptySession(BaseSession):
2✔
732
    protocol_name = 'empty'
2✔
733

734
    def _run(self):
2✔
735
        pass
2✔
736

737
    def start_hardware(self):
2✔
738
        pass
2✔
739

740

741
class OSCClient(udp_client.SimpleUDPClient):
2✔
742
    """
743
    Handles communication to Bonsai using a UDP Client
744
    OSC channels:
745
        USED:
746
        /t  -> (int)    trial number current
747
        /p  -> (int)    position of stimulus init for current trial
748
        /h  -> (float)  phase of gabor for current trial
749
        /c  -> (float)  contrast of stimulus for current trial
750
        /f  -> (float)  frequency of gabor patch for current trial
751
        /a  -> (float)  angle of gabor patch for current trial
752
        /g  -> (float)  gain of RE to visual stim displacement
753
        /s  -> (float)  sigma of the 2D gaussian of gabor
754
        /e  -> (int)    events transitions  USED BY SOFTCODE HANDLER FUNC
755
        /r  -> (int)    whether to reverse the side contingencies (0, 1)
756
    """
757

758
    OSC_PROTOCOL = {
2✔
759
        'trial_num': dict(mess='/t', type=int),
760
        'position': dict(mess='/p', type=int),
761
        'stim_phase': dict(mess='/h', type=float),
762
        'contrast': dict(mess='/c', type=float),
763
        'stim_freq': dict(mess='/f', type=float),
764
        'stim_angle': dict(mess='/a', type=float),
765
        'stim_gain': dict(mess='/g', type=float),
766
        'stim_sigma': dict(mess='/s', type=float),
767
        # 'stim_reverse': dict(mess='/r', type=int),  # this is not handled by Bonsai
768
    }
769

770
    def __init__(self, port, ip='127.0.0.1'):
2✔
771
        super().__init__(ip, port)
2✔
772

773
    def __del__(self):
2✔
774
        self._sock.close()
2✔
775

776
    def send2bonsai(self, **kwargs):
2✔
777
        """
778
        :param see list of keys in OSC_PROTOCOL
779
        :example: client.send2bonsai(trial_num=6, sim_freq=50)
780
        :return:
781
        """
782
        for k, v in kwargs.items():
2✔
783
            if k in self.OSC_PROTOCOL:
2✔
784
                # need to convert basic numpy types to low-level python types for OSC module
785
                value = v.item() if isinstance(v, np.generic) else v
2✔
786
                self.send_message(self.OSC_PROTOCOL[k]['mess'], self.OSC_PROTOCOL[k]['type'](value))
2✔
787

788
    def exit(self):
2✔
789
        self.send_message('/x', 1)
2✔
790

791

792
class BonsaiRecordingMixin(BaseSession):
2✔
793
    config: dict
2✔
794

795
    def init_mixin_bonsai_recordings(self, *args, **kwargs):
2✔
796
        self.bonsai_camera = Bunch({'udp_client': OSCClient(port=7111)})
2✔
797
        self.bonsai_microphone = Bunch({'udp_client': OSCClient(port=7112)})
2✔
798
        self.config = None  # the name of the configuration to run
2✔
799

800
    def stop_mixin_bonsai_recordings(self):
2✔
801
        log.info('Stopping Bonsai recordings')
2✔
802
        self.bonsai_camera.udp_client.exit()
2✔
803
        self.bonsai_microphone.udp_client.exit()
2✔
804

805
    def start_mixin_bonsai_microphone(self):
2✔
806
        if not self.config:
2✔
807
            # Use the first key in the device_cameras map
UNCOV
808
            self.config = next((k for k in self.hardware_settings.device_cameras), None)
×
809
        # The camera workflow on the behaviour computer already contains the microphone recording
810
        # so the device camera workflow and the microphone one are exclusive
811
        if self.config:
2✔
812
            return  # Camera workflow defined; so no need to separately start microphone.
2✔
UNCOV
813
        if not self.task_params.RECORD_SOUND:
×
UNCOV
814
            return  # Sound should not be recorded
×
815
        workflow_file = self.paths.IBLRIG_FOLDER.joinpath(*self.hardware_settings.device_microphone['BONSAI_WORKFLOW'].parts)
×
816
        parameters = {
×
817
            'FileNameMic': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_micData.raw.wav'),
818
            'RecordSound': self.task_params.RECORD_SOUND,
819
        }
UNCOV
820
        call_bonsai(workflow_file, parameters, wait=False, editor=False)
×
UNCOV
821
        log.info('Bonsai microphone recording module loaded: OK')
×
822

823
    @staticmethod
2✔
824
    def _camera_mixin_bonsai_get_workflow_file(cameras: dict | None, name: str) -> Path | None:
2✔
825
        """
826
        Returns the bonsai workflow file for the cameras from the hardware_settings.yaml file.
827

828
        Parameters
829
        ----------
830
        cameras : dict
831
            The hardware settings configuration.
832
        name : {'setup', 'recording'} str
833
            The workflow type.
834

835
        Returns
836
        -------
837
        Path
838
            The workflow path.
839
        """
840
        if cameras is None:
2✔
841
            return None
2✔
842
        return cameras['BONSAI_WORKFLOW'][name]
2✔
843

844
    def start_mixin_bonsai_cameras(self):
2✔
845
        """
846
        Prepare the cameras.
847

848
        Starts the pipeline that aligns the camera focus with the desired borders of rig features.
849
        The actual triggering of the cameras is done in the trigger_bonsai_cameras method.
850
        """
851
        if not self.config:
2✔
852
            # Use the first key in the device_cameras map
853
            try:
2✔
854
                self.config = next(k for k in self.hardware_settings.device_cameras)
2✔
UNCOV
855
            except StopIteration:
×
UNCOV
856
                return
×
857
        configuration = self.hardware_settings.device_cameras[self.config]
2✔
858
        if (workflow_file := self._camera_mixin_bonsai_get_workflow_file(configuration, 'setup')) is None:
2✔
UNCOV
859
            return
×
860

861
        # test acquisition and reset cameras if needed
862
        # enable trigger of cameras (so Bonsai can disable it again ... sigh)
863
        if PYSPIN_AVAILABLE:
2✔
864
            from iblrig import video_pyspin
×
865

UNCOV
866
            if not video_pyspin.acquisition_ok():
×
UNCOV
867
                video_pyspin.reset_all_cameras()
×
UNCOV
868
            video_pyspin.enable_camera_trigger(True)
×
869
            # with video_pyspin.Cameras() as cameras:
870
            #     video_pyspin.enable_camera_trigger(True, cameras)
871
            #     video_pyspin.set_line_mode(line=2, mode='Output', camera=cameras)
872
            #     video_pyspin.set_line_mode(line=3, mode='Input', camera=cameras)
873

874
        call_bonsai(workflow_file, wait=True)  # TODO Parameterize using configuration cameras
2✔
875
        log.info('Bonsai cameras setup module loaded: OK')
2✔
876

877
    def trigger_bonsai_cameras(self):
2✔
878
        if not self.config:
2✔
879
            # Use the first key in the device_cameras map
880
            try:
×
UNCOV
881
                self.config = next(k for k in self.hardware_settings.device_cameras)
×
UNCOV
882
            except StopIteration:
×
UNCOV
883
                return
×
884
        configuration = self.hardware_settings.device_cameras[self.config]
2✔
885
        if set(configuration.keys()) != {'BONSAI_WORKFLOW', 'left'}:
2✔
UNCOV
886
            raise NotImplementedError
×
887
        workflow_file = self._camera_mixin_bonsai_get_workflow_file(configuration, 'recording')
2✔
888
        if workflow_file is None:
2✔
UNCOV
889
            return
×
890
        iblrig.path_helper.create_bonsai_layout_from_template(workflow_file)
2✔
891
        # FIXME Use parameters in configuration map
892
        parameters = {
2✔
893
            'FileNameLeft': self.paths.SESSION_FOLDER.joinpath('raw_video_data', '_iblrig_leftCamera.raw.avi'),
894
            'FileNameLeftData': self.paths.SESSION_FOLDER.joinpath('raw_video_data', '_iblrig_leftCamera.frameData.bin'),
895
            'FileNameMic': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_micData.raw.wav'),
896
            'RecordSound': self.task_params.RECORD_SOUND,
897
        }
898
        call_bonsai(workflow_file, parameters, wait=False, editor=False)
2✔
899
        log.info('Bonsai camera recording process started')
2✔
900

901

902
class BonsaiVisualStimulusMixin(BaseSession):
2✔
903
    def init_mixin_bonsai_visual_stimulus(self, *args, **kwargs):
2✔
904
        # camera 7111, microphone 7112
905
        self.bonsai_visual_udp_client = OSCClient(port=7110)
2✔
906

907
    def start_mixin_bonsai_visual_stimulus(self):
2✔
908
        self.choice_world_visual_stimulus()
2✔
909

910
    def stop_mixin_bonsai_visual_stimulus(self):
2✔
911
        log.info('Stopping Bonsai visual stimulus')
2✔
912
        self.bonsai_visual_udp_client.exit()
2✔
913

914
    def send_trial_info_to_bonsai(self):
2✔
915
        """
916
        Send the trial information to Bonsai via UDP.
917

918
        The OSC protocol is documented in iblrig.base_tasks.BonsaiVisualStimulusMixin
919
        """
920
        bonsai_dict = {
2✔
921
            k: self.trials_table[k][self.trial_num]
922
            for k in self.bonsai_visual_udp_client.OSC_PROTOCOL
923
            if k in self.trials_table.columns
924
        }
925

926
        # reverse wheel contingency: if stim_reverse is True we invert stim_gain
927
        if self.trials_table.get('stim_reverse', {}).get(self.trial_num, False):
2✔
UNCOV
928
            bonsai_dict['stim_gain'] = -bonsai_dict['stim_gain']
×
929

930
        self.bonsai_visual_udp_client.send2bonsai(**bonsai_dict)
2✔
931
        log.debug(bonsai_dict)
2✔
932

933
    def run_passive_visual_stim(self, map_time='00:05:00', rate=0.1, sa_time='00:05:00'):
2✔
934
        workflow_file = self.paths.VISUAL_STIM_FOLDER.joinpath('passiveChoiceWorld', 'passiveChoiceWorld_passive.bonsai')
2✔
935
        file_output_rfm = self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_RFMapStim.raw.bin')
2✔
936
        parameters = {
2✔
937
            'Stim.DisplayIndex': self.hardware_settings.device_screen['DISPLAY_IDX'],
938
            'Stim.SpontaneousActivity0.DueTime': sa_time,
939
            'Stim.ReceptiveFieldMappingStim.FileNameRFMapStim': file_output_rfm,
940
            'Stim.ReceptiveFieldMappingStim.MappingTime': map_time,
941
            'Stim.ReceptiveFieldMappingStim.Rate': rate,
942
        }
943
        map_seconds = pd.to_timedelta(map_time).seconds
2✔
944
        sa_seconds = pd.to_timedelta(sa_time).seconds
2✔
945
        log.info(f'Starting spontaneous activity ({sa_seconds} s) and RF mapping stims ({map_seconds} s)')
2✔
946
        s = call_bonsai(workflow_file, parameters, editor=False)
2✔
947
        log.info('Spontaneous activity and RF mapping stims finished')
2✔
948
        return s
2✔
949

950
    def choice_world_visual_stimulus(self):
2✔
951
        if self.task_params.VISUAL_STIMULUS is None:
2✔
UNCOV
952
            return
×
953
        workflow_file = self.paths.VISUAL_STIM_FOLDER.joinpath(self.task_params.VISUAL_STIMULUS)
2✔
954
        parameters = {
2✔
955
            'Stim.DisplayIndex': self.hardware_settings.device_screen['DISPLAY_IDX'],
956
            'Stim.FileNameStimPositionScreen': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_stimPositionScreen.raw.csv'),
957
            'Stim.FileNameSyncSquareUpdate': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_syncSquareUpdate.raw.csv'),
958
            'Stim.FileNamePositions': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_encoderPositions.raw.ssv'),
959
            'Stim.FileNameEvents': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_encoderEvents.raw.ssv'),
960
            'Stim.FileNameTrialInfo': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_encoderTrialInfo.raw.ssv'),
961
            'Stim.REPortName': self.hardware_settings.device_rotary_encoder['COM_ROTARY_ENCODER'],
962
            'Stim.sync_x': self.task_params.SYNC_SQUARE_X,
963
            'Stim.sync_y': self.task_params.SYNC_SQUARE_Y,
964
            'Stim.TranslationZ': -self.task_params.STIM_TRANSLATION_Z,  # MINUS!!
965
        }
966
        call_bonsai(workflow_file, parameters, wait=False, editor=self.task_params.BONSAI_EDITOR, bootstrap=False)
2✔
967
        log.info('Giving Bonsai some extra time to start ...')
2✔
968
        time.sleep(5)
2✔
969
        log.info('Bonsai visual stimulus module loaded: OK')
2✔
970

971

972
class BpodMixin(BaseSession):
2✔
973
    def _raise_on_undefined_softcode_handler(self, byte: int):
2✔
974
        raise ValueError(f'No handler defined for softcode #{byte}')
2✔
975

976
    def softcode_dictionary(self) -> OrderedDict[int, Callable]:
2✔
977
        """
978
        Returns a softcode handler dict where each key corresponds to the softcode and each value to the
979
        function to be called.
980

981
        This needs to be wrapped this way because
982
            1) we want to be able to inherit this and dynamically add softcode to the dictionry
983
            2) we need to provide the Task object (self) at run time to have the functions with static args
984
        This is tricky as it is unclear if the task object is a copy or a reference when passed here.
985

986

987
        Returns
988
        -------
989
        OrderedDict[int, Callable]
990
            Softcode dictionary
991
        """
992
        softcode_dict = OrderedDict(
2✔
993
            {
994
                SOFTCODE.STOP_SOUND: self.sound['sd'].stop,
995
                SOFTCODE.PLAY_TONE: lambda: self.sound['sd'].play(self.sound['GO_TONE'], self.sound['samplerate']),
996
                SOFTCODE.PLAY_NOISE: lambda: self.sound['sd'].play(self.sound['WHITE_NOISE'], self.sound['samplerate']),
997
                SOFTCODE.TRIGGER_CAMERA: getattr(
998
                    self, 'trigger_bonsai_cameras', lambda: self._raise_on_undefined_softcode_handler(SOFTCODE.TRIGGER_CAMERA)
999
                ),
1000
            }
1001
        )
1002
        return softcode_dict
2✔
1003

1004
    def init_mixin_bpod(self, *args, **kwargs):
2✔
1005
        self.bpod = Bpod()
2✔
1006

1007
    def stop_mixin_bpod(self):
2✔
1008
        self.bpod.close()
2✔
1009

1010
        # convert ambient data from binary to parquet
1011
        if self.paths['AMBIENT_FILE_PATH'].exists():
2✔
1012
            pqt_file = binary.convert_to_parquet(
2✔
1013
                filepath_bin=self.paths['AMBIENT_FILE_PATH'], dtype=DTYPE_AMBIENT_SENSOR_BIN, delete_bin_file=True
1014
            )
1015
            log.info(f"'{self.paths['AMBIENT_FILE_PATH'].name}' converted to parqet and stored as '{pqt_file.name}'")
2✔
1016

1017
    def start_mixin_bpod(self):
2✔
1018
        if self.hardware_settings['device_bpod']['COM_BPOD'] is None:
2✔
1019
            raise ValueError(
2✔
1020
                'The value for device_bpod:COM_BPOD in settings/hardware_settings.yaml is null. Please provide a valid port name.'
1021
            )
1022
        disabled_ports = [x - 1 for x in self.hardware_settings['device_bpod']['DISABLE_BEHAVIOR_INPUT_PORTS']]
2✔
1023
        self.bpod = Bpod(self.hardware_settings['device_bpod']['COM_BPOD'], disable_behavior_ports=disabled_ports)
2✔
1024
        self.bpod.define_rotary_encoder_actions()
2✔
1025
        self.bpod.set_status_led(False)
2✔
1026
        assert self.bpod.is_connected
2✔
1027
        log.info('Bpod hardware module loaded: OK')
2✔
1028
        # make the bpod send spacer signals to the main sync clock for protocol discovery
1029
        self.send_spacers()
2✔
1030

1031
    def send_spacers(self):
2✔
UNCOV
1032
        log.info('Starting task by sending a spacer signal on BNC1')
×
UNCOV
1033
        sma = StateMachine(self.bpod)
×
UNCOV
1034
        Spacer().add_spacer_states(sma, next_state='exit')
×
UNCOV
1035
        self.bpod.send_state_machine(sma)
×
UNCOV
1036
        self.bpod.run_state_machine(sma)  # Locks until state machine 'exit' is reached
×
UNCOV
1037
        return self.bpod.session.current_trial.export()
×
1038

1039

1040
class Frame2TTLMixin(BaseSession):
2✔
1041
    """Frame 2 TTL interface for state machine."""
1042

1043
    def init_mixin_frame2ttl(self, *args, **kwargs):
2✔
1044
        pass
2✔
1045

1046
    def start_mixin_frame2ttl(self):
2✔
1047
        # todo assert calibration
1048
        if self.hardware_settings['device_frame2ttl']['COM_F2TTL'] is None:
2✔
1049
            raise ValueError(
2✔
1050
                'The value for device_frame2ttl:COM_F2TTL in '
1051
                'settings/hardware_settings.yaml is null. Please '
1052
                'provide a valid port name.'
1053
            )
UNCOV
1054
        Frame2TTL(
×
1055
            port=self.hardware_settings['device_frame2ttl']['COM_F2TTL'],
1056
            threshold_dark=self.hardware_settings['device_frame2ttl']['F2TTL_DARK_THRESH'],
1057
            threshold_light=self.hardware_settings['device_frame2ttl']['F2TTL_LIGHT_THRESH'],
1058
        ).close()
UNCOV
1059
        log.info('Frame2TTL: Thresholds set.')
×
1060

1061

1062
class RotaryEncoderMixin(BaseSession, HasBpod):
2✔
1063
    """Rotary encoder interface for state machine."""
1064

1065
    device_rotary_encoder: RotaryEncoderModule
2✔
1066

1067
    @property
2✔
1068
    def stimulus_gain(self) -> float:
2✔
1069
        return self.task_params.STIM_GAIN
2✔
1070

1071
    def init_mixin_rotary_encoder(self):
2✔
1072
        thresholds_deg = self.task_params.STIM_POSITIONS + self.task_params.QUIESCENCE_THRESHOLDS
2✔
1073
        self.device_rotary_encoder = RotaryEncoderModule(
2✔
1074
            self.hardware_settings.device_rotary_encoder, thresholds_deg, self.stimulus_gain
1075
        )
1076

1077
    def start_mixin_rotary_encoder(self):
2✔
1078
        self.device_rotary_encoder.gain = self.stimulus_gain
2✔
1079
        self.device_rotary_encoder.open()
2✔
UNCOV
1080
        self.device_rotary_encoder.write_parameters()
×
UNCOV
1081
        self.device_rotary_encoder.close()
×
UNCOV
1082
        log.info('Rotary Encoder Module loaded: OK')
×
1083

1084

1085
class ValveMixin(BaseSession, HasBpod):
2✔
1086
    def init_mixin_valve(self: object):
2✔
1087
        self.valve = Valve(self.hardware_settings.device_valve)
2✔
1088

1089
    def start_mixin_valve(self):
2✔
1090
        # assert that valve has been calibrated
UNCOV
1091
        assert self.valve.is_calibrated, """VALVE IS NOT CALIBRATED - PLEASE CALIBRATE THE VALVE"""
×
1092

1093
        # regardless of the calibration method, the reward valve time has to be lower than 1 second
UNCOV
1094
        assert self.compute_reward_time(amount_ul=1.5) < 1, """VALVE IS NOT PROPERLY CALIBRATED - PLEASE RECALIBRATE"""
×
UNCOV
1095
        log.info('Water valve module loaded: OK')
×
1096

1097
    def compute_reward_time(self, amount_ul: float | None = None) -> float:
2✔
1098
        """
1099
        Converts the valve opening time from a given volume.
1100

1101
        Parameters
1102
        ----------
1103
        amount_ul : float, optional
1104
            The volume of liquid (μl) to be dispensed from the valve. Defaults to task_params.REWARD_AMOUNT_UL.
1105

1106
        Returns
1107
        -------
1108
        float
1109
            Valve opening time in seconds.
1110
        """
1111
        amount_ul = self.task_params.REWARD_AMOUNT_UL if amount_ul is None else amount_ul
2✔
1112
        return self.valve.values.ul2ms(amount_ul) / 1e3
2✔
1113

1114
    def valve_open(self, reward_valve_time):
2✔
1115
        """
1116
        Open the reward valve for a given amount of time and return bpod data.
1117

1118
        Parameters
1119
        ----------
1120
        reward_valve_time : float
1121
            Amount of time in seconds to open the reward valve.
1122
        """
UNCOV
1123
        sma = StateMachine(self.bpod)
×
UNCOV
1124
        sma.add_state(
×
1125
            state_name='valve_open',
1126
            state_timer=reward_valve_time,
1127
            output_actions=[('Valve1', 255), ('BNC1', 255)],  # To FPGA
1128
            state_change_conditions={'Tup': 'exit'},
1129
        )
UNCOV
1130
        self.bpod.send_state_machine(sma)
×
UNCOV
1131
        self.bpod.run_state_machine(sma)  # Locks until state machine 'exit' is reached
×
UNCOV
1132
        return self.bpod.session.current_trial.export()
×
1133

1134

1135
class SoundMixin(BaseSession, HasBpod):
2✔
1136
    """Sound interface methods for state machine."""
1137

1138
    def init_mixin_sound(self):
2✔
1139
        self.sound = Bunch({'GO_TONE': None, 'WHITE_NOISE': None})
2✔
1140
        sound_output = self.hardware_settings.device_sound['OUTPUT']
2✔
1141

1142
        # additional gain factor for bringing the different combinations of sound-cards and amps to the same output level
1143
        # TODO: this needs proper calibration and refactoring
1144
        if self.hardware_settings.device_sound.OUTPUT == 'hifi' and self.hardware_settings.device_sound.AMP_TYPE == 'AMP2X15':
2✔
UNCOV
1145
            amp_gain_factor = 0.25
×
1146
        else:
1147
            amp_gain_factor = 1.0
2✔
1148
        self.task_params.GO_TONE_AMPLITUDE *= amp_gain_factor
2✔
1149
        self.task_params.WHITE_NOISE_AMPLITUDE *= amp_gain_factor
2✔
1150

1151
        # sound device sd is actually the module soundevice imported above.
1152
        # not sure how this plays out when referenced outside of this python file
1153
        self.sound['sd'], self.sound['samplerate'], self.sound['channels'] = sound_device_factory(output=sound_output)
2✔
1154
        # Create sounds and output actions of state machine
1155
        self.sound['GO_TONE'] = iblrig.sound.make_sound(
2✔
1156
            rate=self.sound['samplerate'],
1157
            frequency=self.task_params.GO_TONE_FREQUENCY,
1158
            duration=self.task_params.GO_TONE_DURATION,
1159
            amplitude=self.task_params.GO_TONE_AMPLITUDE * amp_gain_factor,
1160
            fade=0.01,
1161
            chans=self.sound['channels'],
1162
        )
1163
        self.sound['WHITE_NOISE'] = iblrig.sound.make_sound(
2✔
1164
            rate=self.sound['samplerate'],
1165
            frequency=-1,
1166
            duration=self.task_params.WHITE_NOISE_DURATION,
1167
            amplitude=self.task_params.WHITE_NOISE_AMPLITUDE * amp_gain_factor,
1168
            fade=0.01,
1169
            chans=self.sound['channels'],
1170
        )
1171

1172
    def start_mixin_sound(self):
2✔
1173
        """
1174
        Depends on bpod mixin start for hard sound card
1175
        :return:
1176
        """
UNCOV
1177
        assert self.bpod.is_connected, 'The sound mixin depends on the bpod mixin being connected'
×
1178
        # SoundCard config params
UNCOV
1179
        match self.hardware_settings.device_sound['OUTPUT']:
×
UNCOV
1180
            case 'harp':
×
UNCOV
1181
                assert self.bpod.sound_card is not None, 'No harp sound-card connected to Bpod'
×
UNCOV
1182
                sound.configure_sound_card(
×
1183
                    sounds=[self.sound.GO_TONE, self.sound.WHITE_NOISE],
1184
                    indexes=[self.task_params.GO_TONE_IDX, self.task_params.WHITE_NOISE_IDX],
1185
                    sample_rate=self.sound['samplerate'],
1186
                )
UNCOV
1187
                self.bpod.define_harp_sounds_actions(
×
1188
                    module=self.bpod.sound_card,
1189
                    go_tone_index=self.task_params.GO_TONE_IDX,
1190
                    noise_index=self.task_params.WHITE_NOISE_IDX,
1191
                )
UNCOV
1192
            case 'hifi':
×
UNCOV
1193
                module = self.bpod.get_module('^HiFi')
×
UNCOV
1194
                assert module is not None, 'No HiFi module connected to Bpod'
×
UNCOV
1195
                assert self.hardware_settings.device_sound.COM_SOUND is not None
×
UNCOV
1196
                hifi = HiFi(port=self.hardware_settings.device_sound.COM_SOUND, sampling_rate_hz=self.sound['samplerate'])
×
UNCOV
1197
                hifi.load(index=self.task_params.GO_TONE_IDX, data=self.sound.GO_TONE)
×
UNCOV
1198
                hifi.load(index=self.task_params.WHITE_NOISE_IDX, data=self.sound.WHITE_NOISE)
×
UNCOV
1199
                hifi.push()
×
UNCOV
1200
                hifi.close()
×
UNCOV
1201
                self.bpod.define_harp_sounds_actions(
×
1202
                    module=module,
1203
                    go_tone_index=self.task_params.GO_TONE_IDX,
1204
                    noise_index=self.task_params.WHITE_NOISE_IDX,
1205
                )
UNCOV
1206
            case _:
×
UNCOV
1207
                self.bpod.define_xonar_sounds_actions()
×
UNCOV
1208
        log.info(f'Sound module loaded: OK: {self.hardware_settings.device_sound["OUTPUT"]}')
×
1209

1210
    def sound_play_noise(self, state_timer=0.510, state_name='play_noise'):
2✔
1211
        """
1212
        Play the noise sound for the error feedback using bpod state machine.
1213
        :return: bpod current trial export
1214
        """
1215
        return self._sound_play(state_name=state_name, output_actions=[self.bpod.actions.play_noise], state_timer=state_timer)
2✔
1216

1217
    def sound_play_tone(self, state_timer=0.102, state_name='play_tone'):
2✔
1218
        """
1219
        Play the ready tone beep using bpod state machine.
1220
        :return: bpod current trial export
1221
        """
1222
        return self._sound_play(state_name=state_name, output_actions=[self.bpod.actions.play_tone], state_timer=state_timer)
2✔
1223

1224
    def _sound_play(self, state_timer=None, output_actions=None, state_name='play_sound'):
2✔
1225
        """Plays a sound using bpod state machine.
1226

1227
        The sound must be defined in the init_mixin_sound method.
1228
        """
1229
        assert state_timer is not None, 'state_timer must be defined'
2✔
1230
        assert output_actions is not None, 'output_actions must be defined'
2✔
1231
        sma = StateMachine(self.bpod)
2✔
1232
        sma.add_state(
2✔
1233
            state_name=state_name,
1234
            state_timer=state_timer,
1235
            output_actions=output_actions,
1236
            state_change_conditions={'BNC2Low': 'exit', 'Tup': 'exit'},
1237
        )
1238
        self.bpod.send_state_machine(sma)
2✔
1239
        self.bpod.run_state_machine(sma)  # Locks until state machine 'exit' is reached
2✔
1240
        return self.bpod.session.current_trial.export()
2✔
1241

1242

1243
class NetworkSession(BaseSession):
2✔
1244
    """A mixin for communicating to auxiliary acquisition PC over a network."""
1245

1246
    remote_rigs = None
2✔
1247
    """net.Auxiliaries: An auxiliary services object for communicating with remote devices."""
2✔
1248
    exp_ref = None
2✔
1249
    """dict: The experiment reference (i.e. subject, date, sequence) as returned by main remote service."""
2✔
1250

1251
    def __init__(self, *_, remote_rigs=None, **kwargs):
2✔
1252
        """
1253
        A mixin for communicating to auxiliary acquisition PC over a network.
1254

1255
        This should retrieve the services list, i.e. the list of available auxiliary rigs,
1256
        and determine which is the main sync. The main sync is the rig that determines the
1257
        experiment.
1258

1259
        The services list is in a yaml file somewhere, called 'remote_rigs.yaml' and should
1260
        be a map of device name to URI. These are then selectable in the GUI and the URI of
1261
        those selected are added to the experiment description.
1262

1263
        Subclasses should add their callbacks within init by calling :meth:`self.remote_rigs.services.assign_callback`.
1264

1265
        Parameters
1266
        ----------
1267
        remote_rigs : list, dict
1268
            Either a list of remote device names (in which case URI is looked up from remote devices
1269
            file), or a map of device name to URI.
1270
        kwargs
1271
            Optional args such as 'file_iblrig_settings' for defining location of remote data folder
1272
            when loading remote devices file.
1273
        """
1274
        if isinstance(remote_rigs, list):
2✔
1275
            # For now we flatten to list of remote rig names but could permit list of (name, URI) tuples
1276
            remote_rigs = list(filter(None, flatten(remote_rigs)))
2✔
1277
            self._load_settings(**kwargs)
2✔
1278
            all_remote_rigs = net.get_remote_devices(iblrig_settings=self.iblrig_settings)
2✔
1279
            if not set(remote_rigs).issubset(all_remote_rigs.keys()):
2✔
1280
                raise ValueError('Selected remote rigs not in remote rigs list')
2✔
UNCOV
1281
            remote_rigs = {k: v for k, v in all_remote_rigs.items() if k in remote_rigs}
×
1282
        # Load and connect to remote services
1283
        self.connect(remote_rigs)
2✔
1284
        self.exp_ref = {}
2✔
1285
        try:
2✔
1286
            super().__init__(**kwargs)
2✔
1287
            self.session_info['REMOTE_RIGS'] = remote_rigs
2✔
1288
        except Exception as ex:
2✔
1289
            self.cleanup_mixin_network()
2✔
1290
            raise ex
2✔
1291

1292
    @property
2✔
1293
    def one(self):
2✔
1294
        """Return ONE instance.
1295

1296
        Unlike super class getter, this method will always instantiate ONE, allowing subclasses to update with an Alyx
1297
        token from a remotely connected rig.  This instance is used for formatting the experiment reference string.
1298

1299
        Returns
1300
        -------
1301
        one.api.One
1302
            An instance of ONE.
1303
        """
1304
        if super().one is None:
2✔
UNCOV
1305
            self._one = OneAlyx(silent=True, mode='local')
×
1306
        return self._one
2✔
1307

1308
    def connect(self, remote_rigs):
2✔
1309
        """
1310
        Connect to remote services.
1311

1312
        Instantiates the Communicator objects that establish connections with each remote device.
1313
        This also creates the thread that uses asynchronous callbacks.
1314

1315
        Parameters
1316
        ----------
1317
        remote_rigs : dict
1318
            A map of name to URI.
1319
        """
1320
        self.remote_rigs = net.Auxiliaries(remote_rigs or {})
2✔
1321
        assert not remote_rigs or self.remote_rigs.is_connected
2✔
1322
        # Handle termination event by graciously completing thread
1323
        signal.signal(signal.SIGTERM, lambda sig, frame: self.cleanup_mixin_network())
2✔
1324

1325
    def _init_paths(self, append: bool = False):
2✔
1326
        """
1327
        Determine session paths.
1328

1329
        Unlike :meth:`BaseSession._init_paths`, this method determines the session number from the remote main sync if
1330
        connected.
1331

1332
        Parameters
1333
        ----------
1334
        append : bool
1335
            Iterate task collection within today's most recent session folder for the selected subject, instead of
1336
            iterating session number.
1337

1338
        Returns
1339
        -------
1340
        iblutil.util.Bunch
1341
            A bunch of paths.
1342
        """
1343
        if self.hardware_settings.MAIN_SYNC:
2✔
1344
            return BaseSession._init_paths(self, append)
2✔
1345
        # Check if we have rigs connected
1346
        if not self.remote_rigs.is_connected:
2✔
1347
            log.warning('No remote rigs; experiment reference may not match the main sync.')
2✔
1348
            return BaseSession._init_paths(self, append)
2✔
1349
        # Set paths in a similar way to the super class
1350
        rig_computer_paths = iblrig.path_helper.get_local_and_remote_paths(
2✔
1351
            local_path=self.iblrig_settings['iblrig_local_data_path'],
1352
            remote_path=self.iblrig_settings['iblrig_remote_data_path'],
1353
            lab=self.iblrig_settings['ALYX_LAB'],
1354
            iblrig_settings=self.iblrig_settings,
1355
        )
1356
        paths = Bunch({'IBLRIG_FOLDER': BASE_PATH})
2✔
1357
        paths.BONSAI = BONSAI_EXE
2✔
1358
        paths.VISUAL_STIM_FOLDER = paths.IBLRIG_FOLDER.joinpath('visual_stim')
2✔
1359
        paths.LOCAL_SUBJECT_FOLDER = rig_computer_paths['local_subjects_folder']
2✔
1360
        paths.REMOTE_SUBJECT_FOLDER = rig_computer_paths['remote_subjects_folder']
2✔
1361
        date_folder = paths.LOCAL_SUBJECT_FOLDER.joinpath(
2✔
1362
            self.session_info.SUBJECT_NAME, self.session_info.SESSION_START_TIME[:10]
1363
        )
1364
        assert self.exp_ref
2✔
1365
        paths.SESSION_FOLDER = date_folder / f'{self.exp_ref["sequence"]:03}'
2✔
1366
        paths.TASK_COLLECTION = iblrig.path_helper.iterate_collection(paths.SESSION_FOLDER)
2✔
1367
        # if append == paths.TASK_COLLECTION.endswith('00'):
1368
        #     raise ValueError(
1369
        #         f'Append value incorrect. Either remove previous task collections from '
1370
        #         f'{paths.SESSION_FOLDER}, or select append in GUI (--append arg in cli)'
1371
        #     )
1372
        log.critical('This is task number %i for %s', int(paths.TASK_COLLECTION.split('_')[-1]) + 1, self.exp_ref)
2✔
1373

1374
        paths.SESSION_RAW_DATA_FOLDER = paths.SESSION_FOLDER.joinpath(paths.TASK_COLLECTION)
2✔
1375
        paths.DATA_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskData.raw.jsonable')
2✔
1376
        paths.AMBIENT_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_ambientSensorData.raw.bin')
2✔
1377
        paths.SETTINGS_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskSettings.raw.json')
2✔
1378
        self.session_info.SESSION_NUMBER = int(paths.SESSION_FOLDER.name)
2✔
1379
        return paths
2✔
1380

1381
    @staticmethod
2✔
1382
    def extra_parser():
2✔
1383
        """
1384
        Parse network arguments.
1385

1386
        Namely adds the remote argument to the parser.
1387

1388
        :return: argparse.parser()
1389
        """
1390
        parser = super(NetworkSession, NetworkSession).extra_parser()
2✔
1391
        parser.add_argument(
2✔
1392
            '--remote',
1393
            dest='remote_rigs',
1394
            type=str,
1395
            required=False,
1396
            action='append',
1397
            nargs='+',
1398
            help='specify one of the remote rigs to interact with over the network',
1399
        )
1400
        return parser
2✔
1401

1402
    def run(self):
2✔
1403
        """Run session and report exceptions to remote services."""
1404
        self.start_mixin_network()
2✔
1405
        try:
2✔
1406
            return super().run()
2✔
1407
        except Exception as e:
2✔
1408
            # Communicate error to services
1409
            if self.remote_rigs.is_connected:
2✔
1410
                tb = e.__traceback__  # TODO maybe go one level down with tb_next?
2✔
1411
                details = {
2✔
1412
                    'error': e.__class__.__name__,  # exception name str
1413
                    'message': str(e),  # error str
1414
                    'traceback': traceback.format_exc(),  # stack str
1415
                    'file': tb.tb_frame.f_code.co_filename,  # filename str
1416
                    'line_no': (tb.tb_lineno, tb.tb_lasti),  # (int, int)
1417
                }
1418
                self.remote_rigs.push(ExpMessage.EXPINTERRUPT, details, wait=True)
2✔
1419
                self.cleanup_mixin_network()
2✔
1420
            raise e
2✔
1421

1422
    def communicate(self, message, *args, raise_on_exception=True):
2✔
1423
        """
1424
        Communicate message to remote services.
1425

1426
        This method is blocking and by default will raise if not all responses received in time.
1427

1428
        Parameters
1429
        ----------
1430
        message : iblutil.io.net.base.ExpMessage, str, int
1431
            An experiment message to send to remote services.
1432
        args
1433
            One or more optional variables to send.
1434
        raise_on_exception : bool
1435
            If true, exceptions arising from message timeouts will be re-raised in main thread and
1436
            services will be cleaned up. Only applies when wait is true.
1437

1438
        Returns
1439
        -------
1440
        Exception | dict
1441
            If raise_on_exception is False, returns an exception if failed to receive all responses
1442
            in time, otherwise a map of service name to response is returned.
1443
        """
1444
        r = self.remote_rigs.push(message, *args, wait=True)
2✔
1445
        if isinstance(r, Exception):
2✔
1446
            log.error('Error on %s network mixin: %s', message, r)
2✔
1447
            if raise_on_exception:
2✔
1448
                self.cleanup_mixin_network()
2✔
1449
                raise r
2✔
1450
        return r
2✔
1451

1452
    def get_exp_info(self):
2✔
1453
        ref = self.exp_ref or None
2✔
1454
        if isinstance(ref, dict) and self.one:
2✔
UNCOV
1455
            ref = self.one.dict2ref(ref)
×
1456
        is_main_sync = self.hardware_settings.get('MAIN_SYNC', False)
2✔
1457
        info = net.ExpInfo(ref, is_main_sync, self.experiment_description, master=is_main_sync)
2✔
1458
        return info.to_dict()
2✔
1459

1460
    def init_mixin_network(self):
2✔
1461
        """Initialize remote services.
1462

1463
        This method sends an EXPINFO message to all services, expecting exactly one of the responses
1464
        to contain main_sync: True, along with the experiment reference to use. It then sends an
1465
        EXPINIT message to all services.
1466
        """
1467
        if not self.remote_rigs.is_connected:
2✔
1468
            return
2✔
1469
        # Determine experiment reference from main sync
1470
        is_main_sync = self.hardware_settings.get('MAIN_SYNC', False)
2✔
1471
        if is_main_sync:
2✔
1472
            raise NotImplementedError
2✔
1473
        assert self.one
2✔
1474

1475
        expinfo = self.get_exp_info() | {'subject': self.session_info['SUBJECT_NAME']}
2✔
1476
        r = self.communicate('EXPINFO', 'CONNECTED', expinfo)
2✔
1477
        assert sum(x[-1]['main_sync'] for x in r.values()) == 1, 'one main sync expected'
2✔
1478
        main_rig_name, (status, info) = next((k, v) for k, v in r.items() if v[-1]['main_sync'])
2✔
1479
        self.exp_ref = self.one.ref2dict(info['exp_ref']) if isinstance(info['exp_ref'], str) else info['exp_ref']
2✔
1480
        if self.exp_ref['subject'] != self.session_info['SUBJECT_NAME']:
2✔
1481
            log.error(
2✔
1482
                'Running task for "%s" but main sync returned exp ref for "%s".',
1483
                self.session_info['SUBJECT_NAME'],
1484
                self.exp_ref['subject'],
1485
            )
1486
            raise ValueError("Subject name doesn't match remote session on " + main_rig_name)
2✔
1487
        if str(self.exp_ref['date']) != self.session_info['SESSION_START_TIME'][:10]:
2✔
1488
            raise RuntimeError(
2✔
1489
                f'Session dates do not match between this rig and {main_rig_name}. \n'
1490
                f'Running past or future sessions not currently supported. \n'
1491
                f'Please check the system date time settings on each rig.'
1492
            )
1493
        # TODO How to handle folder already existing before running UDP experiment?
1494

1495
        # exp_ref = ConversionMixin.path2ref(self.paths['SESSION_FOLDER'], as_dict=False)
1496
        exp_ref = self.one.dict2ref(self.exp_ref)
2✔
1497
        self.communicate('EXPINIT', {'exp_ref': exp_ref})
2✔
1498

1499
    def start_mixin_network(self):
2✔
1500
        """Start remote services.
1501

1502
        This method sends an EXPSTART message to all services, along with an exp_ref string.
1503
        Responses are required but ignored.
1504
        """
1505
        if not self.remote_rigs.is_connected:
2✔
1506
            return
2✔
1507
        self.communicate('EXPSTART', self.exp_ref)
2✔
1508

1509
    def stop_mixin_network(self):
2✔
1510
        """Start remote services.
1511

1512
        This method sends an EXPEND message to all services. Responses are required but ignored.
1513
        """
1514
        if not self.remote_rigs.is_connected:
2✔
1515
            return
2✔
1516
        self.communicate('EXPEND')
2✔
1517

1518
    def cleanup_mixin_network(self):
2✔
1519
        """Clean up services."""
1520
        log.info('Cleaning up network mixin')
2✔
1521
        self.remote_rigs.close()
2✔
1522
        if self.remote_rigs.is_connected:
2✔
1523
            log.warning('Failed to properly clean up network mixin')
2✔
1524
        else:
1525
            log.info('Cleaned up network mixin')
2✔
1526

1527

1528
class SpontaneousSession(BaseSession):
2✔
1529
    """
1530
    A Spontaneous task doesn't have trials, it just runs until the user stops it.
1531

1532
    It is used to get extraction structure for data streams
1533
    """
1534

1535
    def __init__(self, duration_secs=None, **kwargs):
2✔
1536
        super().__init__(**kwargs)
2✔
1537
        self.duration_secs = duration_secs
2✔
1538

1539
    def start_hardware(self):
2✔
1540
        pass  # no mixin here, life is but a dream
2✔
1541

1542
    def _run(self):
2✔
1543
        """Run the task with the actual state machine."""
1544
        log.info('Starting spontaneous acquisition')
2✔
1545
        while True:
2✔
1546
            time.sleep(1.5)
2✔
1547
            if self.duration_secs is not None and self.time_elapsed.seconds > self.duration_secs:
2✔
1548
                break
2✔
UNCOV
1549
            if self.paths.SESSION_FOLDER.joinpath('.stop').exists():
×
UNCOV
1550
                self.paths.SESSION_FOLDER.joinpath('.stop').unlink()
×
UNCOV
1551
                break
×
1552

1553

1554
class SpontaneousBpodSession(SpontaneousSession, BpodMixin):
2✔
1555
    """
1556
    Like SpontaneousSession but with the BpodMixin added in.
1557

1558
    This ensures that the Bpod spacers will be generated when starting the task.
1559
    """
1560

1561
    def start_hardware(self) -> None:
2✔
1562
        self.start_mixin_bpod()
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc