• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 12279337432

11 Dec 2024 03:15PM UTC coverage: 47.031% (+0.2%) from 46.79%
12279337432

Pull #751

github

d4edef
web-flow
Merge eea51f2f7 into 2f9d65d86
Pull Request #751: Fiber trajectory GUI

0 of 114 new or added lines in 1 file covered. (0.0%)

1076 existing lines in 22 files now uncovered.

4246 of 9028 relevant lines covered (47.03%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.99
/iblrig/base_tasks.py
1
"""
2
Commonalities for all tasks.
3

4
This module provides hardware mixins that can be used together with BaseSession to compose tasks.
5
This module tries to exclude task related logic.
6
"""
7

8
import argparse
2✔
9
import contextlib
2✔
10
import datetime
2✔
11
import importlib.metadata
2✔
12
import inspect
2✔
13
import json
2✔
14
import logging
2✔
15
import signal
2✔
16
import sys
2✔
17
import time
2✔
18
import traceback
2✔
19
from abc import ABC, abstractmethod
2✔
20
from collections import OrderedDict
2✔
21
from collections.abc import Callable
2✔
22
from pathlib import Path
2✔
23
from typing import Protocol, final
2✔
24

25
import numpy as np
2✔
26
import pandas as pd
2✔
27
import yaml
2✔
28
from pythonosc import udp_client
2✔
29

30
import ibllib.io.session_params as ses_params
2✔
31
import iblrig.path_helper
2✔
32
import pybpodapi
2✔
33
from ibllib.oneibl.registration import IBLRegistrationClient
2✔
34
from iblrig import net, path_helper, sound
2✔
35
from iblrig.constants import BASE_PATH, BONSAI_EXE, PYSPIN_AVAILABLE
2✔
36
from iblrig.frame2ttl import Frame2TTL
2✔
37
from iblrig.hardware import SOFTCODE, Bpod, RotaryEncoderModule, sound_device_factory
2✔
38
from iblrig.hifi import HiFi
2✔
39
from iblrig.path_helper import load_pydantic_yaml
2✔
40
from iblrig.pydantic_definitions import HardwareSettings, RigSettings, TrialDataModel
2✔
41
from iblrig.tools import call_bonsai, get_number
2✔
42
from iblrig.transfer_experiments import BehaviorCopier, VideoCopier
2✔
43
from iblrig.valve import Valve
2✔
44
from iblutil.io.net.base import ExpMessage
2✔
45
from iblutil.spacer import Spacer
2✔
46
from iblutil.util import Bunch, flatten, setup_logger
2✔
47
from one.alf.io import next_num_folder
2✔
48
from one.api import ONE, OneAlyx
2✔
49
from pybpodapi.protocol import StateMachine
2✔
50

51
OSC_CLIENT_IP = '127.0.0.1'
2✔
52

53
log = logging.getLogger(__name__)
2✔
54

55

56
class HasBpod(Protocol):
2✔
57
    bpod: Bpod
2✔
58

59

60
class BaseSession(ABC):
2✔
61
    version = None
2✔
62
    """str: !!CURRENTLY UNUSED!! task version string."""
2✔
63
    # protocol_name: str | None = None
64
    """str: The name of the task protocol (NB: avoid spaces)."""
2✔
65
    base_parameters_file: Path | None = None
2✔
66
    """Path: A YAML file containing base, default task parameters."""
2✔
67
    is_mock = False
2✔
68
    """list of str: One or more ibllib.pipes.tasks.Task names for task extraction."""
2✔
69
    logger: logging.Logger = None
2✔
70
    """logging.Logger: Log instance used solely to keep track of log level passed to constructor."""
2✔
71
    experiment_description: dict = {}
2✔
72
    """dict: The experiment description."""
2✔
73
    extractor_tasks: list | None = None
2✔
74
    """list of str: An optional list of pipeline task class names to instantiate when preprocessing task data."""
2✔
75

76
    TrialDataModel: type[TrialDataModel]
2✔
77

78
    @property
2✔
79
    @abstractmethod
2✔
80
    def protocol_name(self) -> str: ...
2✔
81

82
    def __init__(
2✔
83
        self,
84
        subject=None,
85
        task_parameter_file=None,
86
        file_hardware_settings=None,
87
        hardware_settings: HardwareSettings = None,
88
        file_iblrig_settings=None,
89
        iblrig_settings: RigSettings = None,
90
        one=None,
91
        interactive=True,
92
        projects=None,
93
        procedures=None,
94
        stub=None,
95
        subject_weight_grams=None,
96
        append=False,
97
        wizard=False,
98
        log_level='INFO',
99
        **kwargs,
100
    ):
101
        """
102
        :param subject: The subject nickname. Required.
103
        :param task_parameter_file: an optional path to the task_parameters.yaml file
104
        :param file_hardware_settings: name of the hardware file in the settings folder, or full file path
105
        :param hardware_settings: an optional dictionary of hardware settings. Keys will override any keys in the file
106
        :param file_iblrig_settings: name of the iblrig file in the settings folder, or full file path
107
        :param iblrig_settings: an optional dictionary of iblrig settings. Keys will override any keys in the file
108
        :param one: an optional instance of ONE
109
        :param interactive:
110
        :param projects: An optional list of Alyx protocols.
111
        :param procedures: An optional list of Alyx procedures.
112
        :param subject_weight_grams: weight of the subject
113
        :param stub: A full path to an experiment description file containing experiment information.
114
        :param append: bool, if True, append to the latest existing session of the same subject for the same day
115
        """
116
        self.extractor_tasks = getattr(self, 'extractor_tasks', None)
2✔
117
        self._logger = None
2✔
118
        self._setup_loggers(level=log_level)
2✔
119
        if not isinstance(self, EmptySession):
2✔
120
            log.info(f'iblrig version {iblrig.__version__}')
2✔
121
            log.info(f'pybpod version {pybpodapi.__version__}')
2✔
122
            log.info(
2✔
123
                f'Session protocol: {self.protocol_name} '
124
                f'({f"version {self.version})" if self.version is not None else "undefined version"})'
125
            )
126

127
        log.info(f'Session call: {" ".join(sys.argv)}')
2✔
128
        self.interactive = interactive
2✔
129
        self._one = one
2✔
130
        self.init_datetime = datetime.datetime.now()
2✔
131

132
        # loads in the settings: first load the files, then update with the input argument if provided
133
        self.hardware_settings: HardwareSettings = load_pydantic_yaml(HardwareSettings, file_hardware_settings)
2✔
134
        if hardware_settings is not None:
2✔
135
            self.hardware_settings.update(hardware_settings)
2✔
136
            HardwareSettings.model_validate(self.hardware_settings)
2✔
137
        self.iblrig_settings: RigSettings = load_pydantic_yaml(RigSettings, file_iblrig_settings)
2✔
138
        if iblrig_settings is not None:
2✔
139
            self.iblrig_settings.update(iblrig_settings)
2✔
140
            RigSettings.model_validate(self.iblrig_settings)
2✔
141

142
        self.wizard = wizard
2✔
143

144
        # Load the tasks settings, from the task folder or override with the input argument
145
        self.task_params = self.read_task_parameter_files(task_parameter_file)
2✔
146

147
        self.session_info = Bunch(
2✔
148
            {
149
                'NTRIALS': 0,
150
                'NTRIALS_CORRECT': 0,
151
                'PROCEDURES': procedures,
152
                'PROJECTS': projects,
153
                'SESSION_START_TIME': self.init_datetime.isoformat(),
154
                'SESSION_END_TIME': None,
155
                'SESSION_NUMBER': 0,
156
                'SUBJECT_NAME': subject,
157
                'SUBJECT_WEIGHT': subject_weight_grams,
158
                'TOTAL_WATER_DELIVERED': 0,
159
            }
160
        )
161
        # Executes mixins init methods
162
        self._execute_mixins_shared_function('init_mixin')
2✔
163
        self.paths = self._init_paths(append=append)
2✔
164
        if not isinstance(self, EmptySession):
2✔
165
            log.info(f'Session raw data: {self.paths.SESSION_RAW_DATA_FOLDER}')
2✔
166
        # Prepare the experiment description dictionary
167
        self.experiment_description = self.make_experiment_description_dict(
2✔
168
            self.protocol_name,
169
            self.paths.get('TASK_COLLECTION'),
170
            procedures,
171
            projects,
172
            self.hardware_settings,
173
            stub,
174
            extractors=self.extractor_tasks,
175
        )
176

177
    @classmethod
2✔
178
    def get_task_file(cls) -> Path:
2✔
179
        """
180
        Get the path to the task's python file.
181

182
        Returns
183
        -------
184
        Path
185
            The path to the task file.
186
        """
187
        return Path(inspect.getfile(cls))
2✔
188

189
    @classmethod
2✔
190
    def get_task_directory(cls) -> Path:
2✔
191
        """
192
        Get the path to the task's directory.
193

194
        Returns
195
        -------
196
        Path
197
            The path to the task's directory.
198
        """
199
        return cls.get_task_file().parent
2✔
200

201
    @classmethod
2✔
202
    def read_task_parameter_files(cls, task_parameter_file: str | Path | None = None) -> Bunch:
2✔
203
        """
204
        Get the task's parameters from the various YAML files in the hierarchy.
205

206
        Parameters
207
        ----------
208
        task_parameter_file : str or Path, optional
209
            Path to override the task parameter file
210

211
        Returns
212
        -------
213
        Bunch
214
            Task parameters
215
        """
216
        # Load the tasks settings, from the task folder or override with the input argument
217
        base_parameters_files = [task_parameter_file or cls.get_task_directory().joinpath('task_parameters.yaml')]
2✔
218

219
        # loop through the task hierarchy to gather parameter files
220
        for c in cls.__mro__:
2✔
221
            base_file = getattr(c, 'base_parameters_file', None)
2✔
222
            if base_file is not None:
2✔
223
                base_parameters_files.append(base_file)
2✔
224

225
        # remove list duplicates while preserving order, we want the highest order first
226
        base_parameters_files = list(reversed(list(dict.fromkeys(base_parameters_files))))
2✔
227

228
        # loop through files and update the dictionary, the latest files in the hierarchy have precedence
229
        task_params = dict()
2✔
230
        for param_file in base_parameters_files:
2✔
231
            if Path(param_file).exists():
2✔
232
                with open(param_file) as fp:
2✔
233
                    params = yaml.safe_load(fp)
2✔
234
                if params is not None:
2✔
235
                    task_params.update(params)
2✔
236

237
        # at last sort the dictionary so itÅ› easier for a human to navigate the many keys, return as a Bunch
238
        return Bunch(sorted(task_params.items()))
2✔
239

240
    def _init_paths(self, append: bool = False) -> Bunch:
2✔
241
        r"""
242
        Initialize session paths.
243

244
        Parameters
245
        ----------
246
        append : bool
247
            Iterate task collection within today's most recent session folder for the selected subject, instead of
248
            iterating session number.
249

250
        Returns
251
        -------
252
        Bunch
253
            Bunch with keys:
254

255
            *   BONSAI: full path to the bonsai executable
256
                `C:\iblrigv8\Bonsai\Bonsai.exe`
257
            *   VISUAL_STIM_FOLDER: full path to the visual stimulus folder
258
                `C:\iblrigv8\visual_stim`
259
            *   LOCAL_SUBJECT_FOLDER: full path to the local subject folder
260
                `C:\iblrigv8_data\mainenlab\Subjects`
261
            *   REMOTE_SUBJECT_FOLDER: full path to the remote subject folder
262
                `Y:\Subjects`
263
            *   SESSION_FOLDER: full path to the current session:
264
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001`
265
            *   TASK_COLLECTION: folder name of the current task
266
                `raw_task_data_00`
267
            *   SESSION_RAW_DATA_FOLDER: concatenation of the session folder and the task collection.
268
                This is where the task data gets written
269
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001\raw_task_data_00`
270
            *   DATA_FILE_PATH: contains the bpod trials
271
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001\raw_task_data_00\_iblrig_taskData.raw.jsonable`
272
            *   SETTINGS_FILE_PATH: contains the task settings
273
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001\raw_task_data_00\_iblrig_taskSettings.raw.json`
274
        """
275
        rig_computer_paths = path_helper.get_local_and_remote_paths(
2✔
276
            local_path=self.iblrig_settings.iblrig_local_data_path,
277
            remote_path=self.iblrig_settings.iblrig_remote_data_path,
278
            lab=self.iblrig_settings.ALYX_LAB,
279
            iblrig_settings=self.iblrig_settings,
280
        )
281
        paths = Bunch({'IBLRIG_FOLDER': BASE_PATH})
2✔
282
        paths.BONSAI = BONSAI_EXE
2✔
283
        paths.VISUAL_STIM_FOLDER = BASE_PATH.joinpath('visual_stim')
2✔
284
        paths.LOCAL_SUBJECT_FOLDER = rig_computer_paths['local_subjects_folder']
2✔
285
        paths.REMOTE_SUBJECT_FOLDER = rig_computer_paths['remote_subjects_folder']
2✔
286
        # initialize the session path
287
        date_folder = paths.LOCAL_SUBJECT_FOLDER.joinpath(
2✔
288
            self.session_info.SUBJECT_NAME, self.session_info.SESSION_START_TIME[:10]
289
        )
290
        if append:
2✔
291
            # this is the case where we append a new protocol to an existing session
292
            todays_sessions = sorted(filter(Path.is_dir, date_folder.glob('*')), reverse=True)
2✔
293
            assert len(todays_sessions) > 0, f'Trying to chain a protocol, but no session folder found in {date_folder}'
2✔
294
            paths.SESSION_FOLDER = todays_sessions[0]
2✔
295
            paths.TASK_COLLECTION = iblrig.path_helper.iterate_collection(paths.SESSION_FOLDER)
2✔
296
            if self.hardware_settings.get('MAIN_SYNC', False) and not paths.TASK_COLLECTION.endswith('00'):
2✔
UNCOV
297
                """
×
298
                Chained protocols make little sense when Bpod is the main sync as there is no
299
                continuous acquisition between protocols.  Only one sync collection can be defined in
300
                the experiment description file.
301
                If you are running experiments with an ephys rig (nidq) or an external daq, you should
302
                correct the MAIN_SYNC parameter in the hardware settings file in ./settings/hardware_settings.yaml
303
                """
UNCOV
304
                raise RuntimeError('Chained protocols not supported for bpod-only sessions')
×
305
        else:
306
            # in this case the session path is created from scratch
307
            paths.SESSION_FOLDER = date_folder / next_num_folder(date_folder)
2✔
308
            paths.TASK_COLLECTION = iblrig.path_helper.iterate_collection(paths.SESSION_FOLDER)
2✔
309

310
        self.session_info.SESSION_NUMBER = int(paths.SESSION_FOLDER.name)
2✔
311
        paths.SESSION_RAW_DATA_FOLDER = paths.SESSION_FOLDER.joinpath(paths.TASK_COLLECTION)
2✔
312
        paths.DATA_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskData.raw.jsonable')
2✔
313
        paths.SETTINGS_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskSettings.raw.json')
2✔
314
        return paths
2✔
315

316
    @property
2✔
317
    def exp_ref(self):
2✔
318
        """Construct an experiment reference string from the session info attribute."""
319
        subject, date, number = (self.session_info[k] for k in ('SUBJECT_NAME', 'SESSION_START_TIME', 'SESSION_NUMBER'))
2✔
320
        if not all([subject, date, number]):
2✔
321
            return None
2✔
322
        return self.one.dict2ref(dict(subject=subject, date=date[:10], sequence=str(number)))
2✔
323

324
    def _setup_loggers(self, level='INFO', level_bpod='WARNING', file=None):
2✔
325
        self._logger = setup_logger('iblrig', level=level, file=file)  # logger attr used by create_session to determine log level
2✔
326
        setup_logger('pybpodapi', level=level_bpod, file=file)
2✔
327

328
    @staticmethod
2✔
329
    def _remove_file_loggers():
2✔
330
        for logger_name in ['iblrig', 'pybpodapi']:
2✔
331
            logger = logging.getLogger(logger_name)
2✔
332
            file_handlers = [fh for fh in logger.handlers if isinstance(fh, logging.FileHandler)]
2✔
333
            for fh in file_handlers:
2✔
334
                fh.close()
2✔
335
                logger.removeHandler(fh)
2✔
336

337
    @staticmethod
2✔
338
    def make_experiment_description_dict(
2✔
339
        task_protocol: str,
340
        task_collection: str,
341
        procedures: list = None,
342
        projects: list = None,
343
        hardware_settings: dict | HardwareSettings = None,
344
        stub: Path = None,
345
        extractors: list = None,
346
        camera_config: str = None,
347
    ):
348
        """
349
        Construct an experiment description dictionary.
350

351
        Parameters
352
        ----------
353
        task_protocol : str
354
            The task protocol name, e.g. _ibl_trainingChoiceWorld2.0.0.
355
        task_collection : str
356
            The task collection name, e.g. raw_task_data_00.
357
        procedures : list
358
            An optional list of Alyx procedures.
359
        projects : list
360
            An optional list of Alyx protocols.
361
        hardware_settings : dict
362
            An optional dict of hardware devices, loaded from the hardware_settings.yaml file.
363
        stub : dict
364
            An optional experiment description stub to update.
365
        extractors: list
366
            An optional list of extractor names for the task.
367
        camera_config : str
368
            The camera configuration name in the hardware settings. Defaults to the first key in
369
            'device_cameras'.
370

371
        Returns
372
        -------
373
        dict
374
            The experiment description.
375
        """
376
        description = ses_params.read_params(stub) if stub else {}
2✔
377

378
        # Add hardware devices
379
        if hardware_settings is not None:
2✔
380
            if isinstance(hardware_settings, HardwareSettings):
2✔
381
                hardware_settings = hardware_settings.model_dump()
2✔
382
            devices = {}
2✔
383
            cams = hardware_settings.get('device_cameras', None)
2✔
384
            if cams:
2✔
385
                devices['cameras'] = {}
2✔
386
                camera_config = camera_config or next((k for k in cams), {})
2✔
387
                devices.update(VideoCopier.config2stub(cams[camera_config])['devices'])
2✔
388
            if hardware_settings.get('device_microphone', None):
2✔
389
                devices['microphone'] = {'microphone': {'collection': task_collection, 'sync_label': 'audio'}}
2✔
390
            ses_params.merge_params(description, {'devices': devices})
2✔
391

392
        # Add projects and procedures
393
        description['procedures'] = list(set(description.get('procedures', []) + (procedures or [])))
2✔
394
        description['projects'] = list(set(description.get('projects', []) + (projects or [])))
2✔
395
        is_main_sync = (hardware_settings or {}).get('MAIN_SYNC', False)
2✔
396
        # Add sync key if required
397
        if is_main_sync and 'sync' not in description:
2✔
398
            description['sync'] = {
2✔
399
                'bpod': {'collection': task_collection, 'acquisition_software': 'pybpod', 'extension': '.jsonable'}
400
            }
401
        # Add task
402
        task = {task_protocol: {'collection': task_collection}}
2✔
403
        if not is_main_sync:
2✔
404
            task[task_protocol]['sync_label'] = 'bpod'
2✔
405
        if extractors:
2✔
406
            assert isinstance(extractors, list), 'extractors parameter must be a list of strings'
2✔
407
            task[task_protocol].update({'extractors': extractors})
2✔
408
        if 'tasks' not in description:
2✔
409
            description['tasks'] = [task]
2✔
410
        else:
411
            description['tasks'].append(task)
2✔
412
        return description
2✔
413

414
    def _make_task_parameters_dict(self):
2✔
415
        """
416
        Create dictionary that will be saved to the settings json file for extraction.
417

418
        Returns
419
        -------
420
        dict
421
            A dictionary that will be saved to the settings json file for extraction.
422
        """
423
        output_dict = dict(self.task_params)  # Grab parameters from task_params session
2✔
424
        output_dict.update(self.hardware_settings.model_dump())  # Update dict with hardware settings from session
2✔
425
        output_dict.update(dict(self.session_info))  # Update dict with session_info (subject, procedure, projects)
2✔
426
        patch_dict = {  # Various values added to ease transition from iblrig v7 to v8, different home may be desired
2✔
427
            'IBLRIG_VERSION': iblrig.__version__,
428
            'PYBPOD_PROTOCOL': self.protocol_name,
429
            'ALYX_USER': self.iblrig_settings.ALYX_USER,
430
            'ALYX_LAB': self.iblrig_settings.ALYX_LAB,
431
        }
432
        with contextlib.suppress(importlib.metadata.PackageNotFoundError):
2✔
433
            patch_dict['PROJECT_EXTRACTION_VERSION'] = importlib.metadata.version('project_extraction')
2✔
434
        output_dict.update(patch_dict)
2✔
435
        return output_dict
2✔
436

437
    def save_task_parameters_to_json_file(self, destination_folder: Path | None = None) -> Path:
2✔
438
        """
439
        Collects the various settings and parameters of the session and outputs them to a JSON file.
440

441
        Returns
442
        -------
443
        Path
444
            Path to the resultant JSON file
445
        """
446
        output_dict = self._make_task_parameters_dict()
2✔
447
        if destination_folder:
2✔
UNCOV
448
            json_file = destination_folder.joinpath('_iblrig_taskSettings.raw.json')
×
449
        else:
450
            json_file = self.paths['SETTINGS_FILE_PATH']
2✔
451
        json_file.parent.mkdir(parents=True, exist_ok=True)
2✔
452
        with open(json_file, 'w') as outfile:
2✔
453
            json.dump(output_dict, outfile, indent=4, sort_keys=True, default=str)  # converts datetime objects to string
2✔
454
        return json_file  # PosixPath
2✔
455

456
    @final
2✔
457
    def save_trial_data_to_json(self, bpod_data: dict):
2✔
458
        """Validate and save trial data.
459

460
        This method retrieve's the current trial's data from the trial_table and validates it using a Pydantic model
461
        (self.TrialDataDefinition). In merges in the trial's bpod_data dict and appends everything to the session's
462
        JSON data file.
463

464
        Parameters
465
        ----------
466
        bpod_data : dict
467
            Trial data returned from pybpod.
468
        """
469
        # get trial's data as a dict
470
        trial_data = self.trials_table.iloc[self.trial_num].to_dict()
2✔
471

472
        # warn about entries not covered by pydantic model
473
        if trial_data.get('trial_num', 1) == 0:
2✔
474
            for key in set(trial_data.keys()) - set(self.TrialDataModel.model_fields) - {'index'}:
2✔
475
                log.warning(
2✔
476
                    f'Key "{key}" in trial_data is missing from TrialDataModel - '
477
                    f'its value ({trial_data[key]}) will not be validated.'
478
                )
479

480
        # validate by passing through pydantic model
481
        trial_data = self.TrialDataModel.model_validate(trial_data).model_dump()
2✔
482

483
        # add bpod_data as 'behavior_data'
484
        trial_data['behavior_data'] = bpod_data
2✔
485

486
        # write json data to file
487
        with open(self.paths['DATA_FILE_PATH'], 'a') as fp:
2✔
488
            fp.write(json.dumps(trial_data) + '\n')
2✔
489

490
    @property
2✔
491
    def one(self):
2✔
492
        """ONE getter."""
493
        if self._one is None:
2✔
494
            if self.iblrig_settings['ALYX_URL'] is None:
2✔
495
                return
2✔
496
            info_str = (
×
497
                f"alyx client with user name {self.iblrig_settings['ALYX_USER']} "
498
                + f"and url: {self.iblrig_settings['ALYX_URL']}"
499
            )
500
            try:
×
UNCOV
501
                self._one = ONE(
×
502
                    base_url=str(self.iblrig_settings['ALYX_URL']),
503
                    username=self.iblrig_settings['ALYX_USER'],
504
                    mode='remote',
505
                    cache_rest=None,
506
                )
UNCOV
507
                log.info('instantiated ' + info_str)
×
UNCOV
508
            except Exception:
×
509
                log.error(traceback.format_exc())
×
510
                log.error('could not connect to ' + info_str)
×
511
        return self._one
2✔
512

513
    def register_to_alyx(self):
2✔
514
        """
515
        Registers the session to Alyx.
516

517
        This registers the session using the IBLRegistrationClient class.  This uses the settings
518
        file(s) and experiment description file to extract the session data.  This may be called
519
        any number of times and if the session record already exists in Alyx it will be updated.
520
        If session registration fails, it will be done before extraction in the ibllib pipeline.
521

522
        Note that currently the subject weight is registered once and only once.  The recorded
523
        weight of the first protocol run is used.
524

525
        Water administrations are added separately by this method: it is expected that
526
        `register_session` is first called with no recorded total water. This method will then add
527
        a water administration each time it is called, and should therefore be called only once
528
        after each protocol is run. If water administration registration fails for all protocols,
529
        this will be done before extraction in the ibllib pipline, however, if a water
530
        administration is successfully registered for one protocol and subsequent ones fail to
531
        register, these will not be added before extraction in ibllib and therefore must be
532
        manually added to Alyx.
533

534
        Returns
535
        -------
536
        dict
537
            The registered session record.
538

539
        See Also
540
        --------
541
        :external+iblenv:meth:`ibllib.oneibl.registration.IBLRegistrationClient.register_session` - The registration method.
542
        """
543
        if self.session_info['SUBJECT_NAME'] in ('iblrig_test_subject', 'test', 'test_subject'):
2✔
544
            log.warning('Not registering test subject to Alyx')
2✔
545
            return
2✔
546
        if not self.one or self.one.offline:
2✔
547
            return
2✔
548
        try:
2✔
549
            client = IBLRegistrationClient(self.one)
2✔
550
            ses, _ = client.register_session(self.paths.SESSION_FOLDER, register_reward=False)
2✔
551
        except Exception:
2✔
552
            log.error(traceback.format_exc())
2✔
553
            log.error('Could not register session to Alyx')
2✔
554
            return
2✔
555
        # add the water administration if there was water administered
556
        try:
2✔
557
            if self.session_info['TOTAL_WATER_DELIVERED']:
2✔
558
                wa = client.register_water_administration(
2✔
559
                    self.session_info.SUBJECT_NAME,
560
                    self.session_info['TOTAL_WATER_DELIVERED'] / 1000,
561
                    session=ses['url'][-36:],
562
                    water_type=self.task_params.get('REWARD_TYPE', None),
563
                )
564
                log.info(f"Water administered registered in Alyx database: {ses['subject']}, " f"{wa['water_administered']}mL")
2✔
565
        except Exception:
2✔
566
            log.error(traceback.format_exc())
2✔
567
            log.error('Could not register water administration to Alyx')
2✔
568
            return
2✔
569
        return ses
2✔
570

571
    def _execute_mixins_shared_function(self, pattern):
2✔
572
        """
573
        Loop over all methods of the class that start with pattern and execute them.
574

575
        Parameters
576
        ----------
577
        pattern : str
578
            'init_mixin', 'start_mixin', 'stop_mixin', or 'cleanup_mixin'
579
        """
580
        method_names = [method for method in dir(self) if method.startswith(pattern)]
2✔
581
        methods = [getattr(self, method) for method in method_names if inspect.ismethod(getattr(self, method))]
2✔
582
        for meth in methods:
2✔
583
            meth()
2✔
584

585
    @property
2✔
586
    def time_elapsed(self):
2✔
587
        return datetime.datetime.now() - self.init_datetime
2✔
588

589
    def mock(self):
2✔
590
        self.is_mock = True
2✔
591

592
    def create_session(self):
2✔
593
        """
594
        Create the session path and save json parameters in the task collection folder.
595

596
        This will also create the protocol folder.
597
        """
598
        self.paths['TASK_PARAMETERS_FILE'] = self.save_task_parameters_to_json_file()
2✔
599
        # enable file logging
600
        logfile = self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_ibl_log.info-acquisition.log')
2✔
601
        self._setup_loggers(level=self._logger.level, file=logfile)
2✔
602
        # copy the acquisition stub to the remote session folder
603
        sc = BehaviorCopier(self.paths.SESSION_FOLDER, remote_subjects_folder=self.paths['REMOTE_SUBJECT_FOLDER'])
2✔
604
        sc.initialize_experiment(self.experiment_description, overwrite=False)
2✔
605
        self.register_to_alyx()
2✔
606

607
    def run(self):
2✔
608
        """
609
        Common pre-run instructions for all tasks.
610

611
        Defines sigint handler for a graceful exit.
612
        """
613
        # here we make sure we connect to the hardware before writing the session to disk
614
        # this prevents from incrementing endlessly the session number if the hardware fails to connect
615
        self.start_hardware()
2✔
616
        self.create_session()
2✔
617
        # When not running the first chained protocol, we can skip the weighing dialog
618
        first_protocol = int(self.paths.SESSION_RAW_DATA_FOLDER.name.split('_')[-1]) == 0
2✔
619

620
        # get subject weight
621
        if self.session_info.SUBJECT_WEIGHT is None and self.interactive and first_protocol:
2✔
622
            self.session_info.SUBJECT_WEIGHT = get_number('Subject weight (g): ', float, lambda x: x > 0)
2✔
623

624
        def sigint_handler(*args, **kwargs):
2✔
625
            # create a signal handler for a graceful exit: create a stop flag in the session folder
626
            self.paths.SESSION_FOLDER.joinpath('.stop').touch()
×
UNCOV
627
            log.critical('SIGINT signal detected, will exit at the end of the trial')
×
628

629
        # if upon starting there is a flag just remove it, this is to prevent killing a session in the egg
630
        if self.paths.SESSION_FOLDER.joinpath('.stop').exists():
2✔
UNCOV
631
            self.paths.SESSION_FOLDER.joinpath('.stop').unlink()
×
632

633
        signal.signal(signal.SIGINT, sigint_handler)
2✔
634
        self._run()  # runs the specific task logic i.e. trial loop etc...
2✔
635
        # post task instructions
636
        log.critical('Graceful exit')
2✔
637
        log.info(f'Session {self.paths.SESSION_RAW_DATA_FOLDER}')
2✔
638
        self.session_info.SESSION_END_TIME = datetime.datetime.now().isoformat()
2✔
639

640
        # get poop count
641
        if self.interactive and not self.wizard:
2✔
642
            self.session_info.POOP_COUNT = get_number('Droppings count: ', int, lambda x: x >= 0)
2✔
643

644
        self.save_task_parameters_to_json_file()
2✔
645
        self.register_to_alyx()
2✔
646
        self._execute_mixins_shared_function('stop_mixin')
2✔
647
        self._execute_mixins_shared_function('cleanup_mixin')
2✔
648

649
    @abstractmethod
2✔
650
    def start_hardware(self):
2✔
651
        """
652
        Start the hardware.
653

654
        This method doesn't explicitly start the mixins as the order has to be defined in the child classes.
655
        This needs to be implemented in the child classes, and should start and connect to all hardware pieces.
656
        """
657
        ...
×
658

659
    @abstractmethod
2✔
660
    def _run(self): ...
2✔
661

662
    @staticmethod
2✔
663
    def extra_parser():
2✔
664
        """
665
        Specify extra kwargs arguments to expose to the user prior running the task.
666

667
        Make sure you instantiate the parser.
668

669
        Returns
670
        -------
671
        argparse.ArgumentParser
672
            The extra parser instance.
673
        """
674
        parser = argparse.ArgumentParser(add_help=False)
2✔
675
        return parser
2✔
676

677

678
# this class gets called to get the path constructor utility to predict the session path
679
class EmptySession(BaseSession):
2✔
680
    protocol_name = 'empty'
2✔
681

682
    def _run(self):
2✔
683
        pass
2✔
684

685
    def start_hardware(self):
2✔
686
        pass
2✔
687

688

689
class OSCClient(udp_client.SimpleUDPClient):
2✔
690
    """
691
    Handles communication to Bonsai using a UDP Client
692
    OSC channels:
693
        USED:
694
        /t  -> (int)    trial number current
695
        /p  -> (int)    position of stimulus init for current trial
696
        /h  -> (float)  phase of gabor for current trial
697
        /c  -> (float)  contrast of stimulus for current trial
698
        /f  -> (float)  frequency of gabor patch for current trial
699
        /a  -> (float)  angle of gabor patch for current trial
700
        /g  -> (float)  gain of RE to visual stim displacement
701
        /s  -> (float)  sigma of the 2D gaussian of gabor
702
        /e  -> (int)    events transitions  USED BY SOFTCODE HANDLER FUNC
703
        /r  -> (int)    whether to reverse the side contingencies (0, 1)
704
    """
705

706
    OSC_PROTOCOL = {
2✔
707
        'trial_num': dict(mess='/t', type=int),
708
        'position': dict(mess='/p', type=int),
709
        'stim_phase': dict(mess='/h', type=float),
710
        'contrast': dict(mess='/c', type=float),
711
        'stim_freq': dict(mess='/f', type=float),
712
        'stim_angle': dict(mess='/a', type=float),
713
        'stim_gain': dict(mess='/g', type=float),
714
        'stim_sigma': dict(mess='/s', type=float),
715
        # 'stim_reverse': dict(mess='/r', type=int),  # this is not handled by Bonsai
716
    }
717

718
    def __init__(self, port, ip='127.0.0.1'):
2✔
719
        super().__init__(ip, port)
2✔
720

721
    def __del__(self):
2✔
722
        self._sock.close()
2✔
723

724
    def send2bonsai(self, **kwargs):
2✔
725
        """
726
        :param see list of keys in OSC_PROTOCOL
727
        :example: client.send2bonsai(trial_num=6, sim_freq=50)
728
        :return:
729
        """
730
        for k, v in kwargs.items():
2✔
731
            if k in self.OSC_PROTOCOL:
2✔
732
                # need to convert basic numpy types to low-level python types for OSC module
733
                value = v.item() if isinstance(v, np.generic) else v
2✔
734
                self.send_message(self.OSC_PROTOCOL[k]['mess'], self.OSC_PROTOCOL[k]['type'](value))
2✔
735

736
    def exit(self):
2✔
737
        self.send_message('/x', 1)
2✔
738

739

740
class BonsaiRecordingMixin(BaseSession):
2✔
741
    config: dict
2✔
742

743
    def init_mixin_bonsai_recordings(self, *args, **kwargs):
2✔
744
        self.bonsai_camera = Bunch({'udp_client': OSCClient(port=7111)})
2✔
745
        self.bonsai_microphone = Bunch({'udp_client': OSCClient(port=7112)})
2✔
746
        self.config = None  # the name of the configuration to run
2✔
747

748
    def stop_mixin_bonsai_recordings(self):
2✔
749
        log.info('Stopping Bonsai recordings')
2✔
750
        self.bonsai_camera.udp_client.exit()
2✔
751
        self.bonsai_microphone.udp_client.exit()
2✔
752

753
    def start_mixin_bonsai_microphone(self):
2✔
754
        if not self.config:
2✔
755
            # Use the first key in the device_cameras map
756
            self.config = next((k for k in self.hardware_settings.device_cameras), None)
×
757
        # The camera workflow on the behaviour computer already contains the microphone recording
758
        # so the device camera workflow and the microphone one are exclusive
759
        if self.config:
2✔
760
            return  # Camera workflow defined; so no need to separately start microphone.
2✔
761
        if not self.task_params.RECORD_SOUND:
×
762
            return  # Sound should not be recorded
×
UNCOV
763
        workflow_file = self.paths.IBLRIG_FOLDER.joinpath(*self.hardware_settings.device_microphone['BONSAI_WORKFLOW'].parts)
×
764
        parameters = {
×
765
            'FileNameMic': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_micData.raw.wav'),
766
            'RecordSound': self.task_params.RECORD_SOUND,
767
        }
UNCOV
768
        call_bonsai(workflow_file, parameters, wait=False, editor=False)
×
UNCOV
769
        log.info('Bonsai microphone recording module loaded: OK')
×
770

771
    @staticmethod
2✔
772
    def _camera_mixin_bonsai_get_workflow_file(cameras: dict | None, name: str) -> Path | None:
2✔
773
        """
774
        Returns the bonsai workflow file for the cameras from the hardware_settings.yaml file.
775

776
        Parameters
777
        ----------
778
        cameras : dict
779
            The hardware settings configuration.
780
        name : {'setup', 'recording'} str
781
            The workflow type.
782

783
        Returns
784
        -------
785
        Path
786
            The workflow path.
787
        """
788
        if cameras is None:
2✔
789
            return None
2✔
790
        return cameras['BONSAI_WORKFLOW'][name]
2✔
791

792
    def start_mixin_bonsai_cameras(self):
2✔
793
        """
794
        Prepare the cameras.
795

796
        Starts the pipeline that aligns the camera focus with the desired borders of rig features.
797
        The actual triggering of the cameras is done in the trigger_bonsai_cameras method.
798
        """
799
        if not self.config:
2✔
800
            # Use the first key in the device_cameras map
801
            try:
2✔
802
                self.config = next(k for k in self.hardware_settings.device_cameras)
2✔
UNCOV
803
            except StopIteration:
×
UNCOV
804
                return
×
805
        configuration = self.hardware_settings.device_cameras[self.config]
2✔
806
        if (workflow_file := self._camera_mixin_bonsai_get_workflow_file(configuration, 'setup')) is None:
2✔
807
            return
×
808

809
        # enable trigger of cameras (so Bonsai can disable it again ... sigh)
810
        if PYSPIN_AVAILABLE:
2✔
UNCOV
811
            from iblrig.video_pyspin import enable_camera_trigger
×
812

UNCOV
813
            enable_camera_trigger(True)
×
814

815
        call_bonsai(workflow_file, wait=True)  # TODO Parameterize using configuration cameras
2✔
816
        log.info('Bonsai cameras setup module loaded: OK')
2✔
817

818
    def trigger_bonsai_cameras(self):
2✔
819
        if not self.config:
2✔
820
            # Use the first key in the device_cameras map
UNCOV
821
            try:
×
822
                self.config = next(k for k in self.hardware_settings.device_cameras)
×
823
            except StopIteration:
×
824
                return
×
825
        configuration = self.hardware_settings.device_cameras[self.config]
2✔
826
        if set(configuration.keys()) != {'BONSAI_WORKFLOW', 'left'}:
2✔
UNCOV
827
            raise NotImplementedError
×
828
        workflow_file = self._camera_mixin_bonsai_get_workflow_file(configuration, 'recording')
2✔
829
        if workflow_file is None:
2✔
UNCOV
830
            return
×
831
        iblrig.path_helper.create_bonsai_layout_from_template(workflow_file)
2✔
832
        # FIXME Use parameters in configuration map
833
        parameters = {
2✔
834
            'FileNameLeft': self.paths.SESSION_FOLDER.joinpath('raw_video_data', '_iblrig_leftCamera.raw.avi'),
835
            'FileNameLeftData': self.paths.SESSION_FOLDER.joinpath('raw_video_data', '_iblrig_leftCamera.frameData.bin'),
836
            'FileNameMic': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_micData.raw.wav'),
837
            'RecordSound': self.task_params.RECORD_SOUND,
838
        }
839
        call_bonsai(workflow_file, parameters, wait=False, editor=False)
2✔
840
        log.info('Bonsai camera recording process started')
2✔
841

842

843
class BonsaiVisualStimulusMixin(BaseSession):
2✔
844
    def init_mixin_bonsai_visual_stimulus(self, *args, **kwargs):
2✔
845
        # camera 7111, microphone 7112
846
        self.bonsai_visual_udp_client = OSCClient(port=7110)
2✔
847

848
    def start_mixin_bonsai_visual_stimulus(self):
2✔
849
        self.choice_world_visual_stimulus()
2✔
850

851
    def stop_mixin_bonsai_visual_stimulus(self):
2✔
852
        log.info('Stopping Bonsai visual stimulus')
2✔
853
        self.bonsai_visual_udp_client.exit()
2✔
854

855
    def send_trial_info_to_bonsai(self):
2✔
856
        """
857
        Send the trial information to Bonsai via UDP.
858

859
        The OSC protocol is documented in iblrig.base_tasks.BonsaiVisualStimulusMixin
860
        """
861
        bonsai_dict = {
2✔
862
            k: self.trials_table[k][self.trial_num]
863
            for k in self.bonsai_visual_udp_client.OSC_PROTOCOL
864
            if k in self.trials_table.columns
865
        }
866

867
        # reverse wheel contingency: if stim_reverse is True we invert stim_gain
868
        if self.trials_table.get('stim_reverse', {}).get(self.trial_num, False):
2✔
UNCOV
869
            bonsai_dict['stim_gain'] = -bonsai_dict['stim_gain']
×
870

871
        self.bonsai_visual_udp_client.send2bonsai(**bonsai_dict)
2✔
872
        log.debug(bonsai_dict)
2✔
873

874
    def run_passive_visual_stim(self, map_time='00:05:00', rate=0.1, sa_time='00:05:00'):
2✔
875
        workflow_file = self.paths.VISUAL_STIM_FOLDER.joinpath('passiveChoiceWorld', 'passiveChoiceWorld_passive.bonsai')
2✔
876
        file_output_rfm = self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_RFMapStim.raw.bin')
2✔
877
        parameters = {
2✔
878
            'Stim.DisplayIndex': self.hardware_settings.device_screen['DISPLAY_IDX'],
879
            'Stim.SpontaneousActivity0.DueTime': sa_time,
880
            'Stim.ReceptiveFieldMappingStim.FileNameRFMapStim': file_output_rfm,
881
            'Stim.ReceptiveFieldMappingStim.MappingTime': map_time,
882
            'Stim.ReceptiveFieldMappingStim.Rate': rate,
883
        }
884
        map_seconds = pd.to_timedelta(map_time).seconds
2✔
885
        sa_seconds = pd.to_timedelta(sa_time).seconds
2✔
886
        log.info(f'Starting spontaneous activity ({sa_seconds} s) and RF mapping stims ({map_seconds} s)')
2✔
887
        s = call_bonsai(workflow_file, parameters, editor=False)
2✔
888
        log.info('Spontaneous activity and RF mapping stims finished')
2✔
889
        return s
2✔
890

891
    def choice_world_visual_stimulus(self):
2✔
892
        if self.task_params.VISUAL_STIMULUS is None:
2✔
893
            return
×
894
        workflow_file = self.paths.VISUAL_STIM_FOLDER.joinpath(self.task_params.VISUAL_STIMULUS)
2✔
895
        parameters = {
2✔
896
            'Stim.DisplayIndex': self.hardware_settings.device_screen['DISPLAY_IDX'],
897
            'Stim.FileNameStimPositionScreen': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_stimPositionScreen.raw.csv'),
898
            'Stim.FileNameSyncSquareUpdate': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_syncSquareUpdate.raw.csv'),
899
            'Stim.FileNamePositions': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_encoderPositions.raw.ssv'),
900
            'Stim.FileNameEvents': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_encoderEvents.raw.ssv'),
901
            'Stim.FileNameTrialInfo': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_encoderTrialInfo.raw.ssv'),
902
            'Stim.REPortName': self.hardware_settings.device_rotary_encoder['COM_ROTARY_ENCODER'],
903
            'Stim.sync_x': self.task_params.SYNC_SQUARE_X,
904
            'Stim.sync_y': self.task_params.SYNC_SQUARE_Y,
905
            'Stim.TranslationZ': -self.task_params.STIM_TRANSLATION_Z,  # MINUS!!
906
        }
907
        call_bonsai(workflow_file, parameters, wait=False, editor=self.task_params.BONSAI_EDITOR, bootstrap=False)
2✔
908
        log.info('Giving Bonsai some extra time to start ...')
2✔
909
        time.sleep(5)
2✔
910
        log.info('Bonsai visual stimulus module loaded: OK')
2✔
911

912

913
class BpodMixin(BaseSession):
2✔
914
    def _raise_on_undefined_softcode_handler(self, byte: int):
2✔
915
        raise ValueError(f'No handler defined for softcode #{byte}')
2✔
916

917
    def softcode_dictionary(self) -> OrderedDict[int, Callable]:
2✔
918
        """
919
        Returns a softcode handler dict where each key corresponds to the softcode and each value to the
920
        function to be called.
921

922
        This needs to be wrapped this way because
923
            1) we want to be able to inherit this and dynamically add softcode to the dictionry
924
            2) we need to provide the Task object (self) at run time to have the functions with static args
925
        This is tricky as it is unclear if the task object is a copy or a reference when passed here.
926

927

928
        Returns
929
        -------
930
        OrderedDict[int, Callable]
931
            Softcode dictionary
932
        """
933
        softcode_dict = OrderedDict(
2✔
934
            {
935
                SOFTCODE.STOP_SOUND: self.sound['sd'].stop,
936
                SOFTCODE.PLAY_TONE: lambda: self.sound['sd'].play(self.sound['GO_TONE'], self.sound['samplerate']),
937
                SOFTCODE.PLAY_NOISE: lambda: self.sound['sd'].play(self.sound['WHITE_NOISE'], self.sound['samplerate']),
938
                SOFTCODE.TRIGGER_CAMERA: getattr(
939
                    self, 'trigger_bonsai_cameras', lambda: self._raise_on_undefined_softcode_handler(SOFTCODE.TRIGGER_CAMERA)
940
                ),
941
            }
942
        )
943
        return softcode_dict
2✔
944

945
    def init_mixin_bpod(self, *args, **kwargs):
2✔
946
        self.bpod = Bpod()
2✔
947

948
    def stop_mixin_bpod(self):
2✔
949
        self.bpod.close()
2✔
950

951
    def start_mixin_bpod(self):
2✔
952
        if self.hardware_settings['device_bpod']['COM_BPOD'] is None:
2✔
953
            raise ValueError(
2✔
954
                'The value for device_bpod:COM_BPOD in '
955
                'settings/hardware_settings.yaml is null. Please '
956
                'provide a valid port name.'
957
            )
UNCOV
958
        disabled_ports = [x - 1 for x in self.hardware_settings['device_bpod']['DISABLE_BEHAVIOR_INPUT_PORTS']]
×
UNCOV
959
        self.bpod = Bpod(self.hardware_settings['device_bpod']['COM_BPOD'], disable_behavior_ports=disabled_ports)
×
UNCOV
960
        self.bpod.define_rotary_encoder_actions()
×
UNCOV
961
        self.bpod.set_status_led(False)
×
UNCOV
962
        assert self.bpod.is_connected
×
UNCOV
963
        log.info('Bpod hardware module loaded: OK')
×
964
        # make the bpod send spacer signals to the main sync clock for protocol discovery
UNCOV
965
        self.send_spacers()
×
966

967
    def send_spacers(self):
2✔
UNCOV
968
        log.info('Starting task by sending a spacer signal on BNC1')
×
UNCOV
969
        sma = StateMachine(self.bpod)
×
UNCOV
970
        Spacer().add_spacer_states(sma, next_state='exit')
×
UNCOV
971
        self.bpod.send_state_machine(sma)
×
UNCOV
972
        self.bpod.run_state_machine(sma)  # Locks until state machine 'exit' is reached
×
UNCOV
973
        return self.bpod.session.current_trial.export()
×
974

975

976
class Frame2TTLMixin(BaseSession):
2✔
977
    """Frame 2 TTL interface for state machine."""
978

979
    def init_mixin_frame2ttl(self, *args, **kwargs):
2✔
980
        pass
2✔
981

982
    def start_mixin_frame2ttl(self):
2✔
983
        # todo assert calibration
984
        if self.hardware_settings['device_frame2ttl']['COM_F2TTL'] is None:
2✔
985
            raise ValueError(
2✔
986
                'The value for device_frame2ttl:COM_F2TTL in '
987
                'settings/hardware_settings.yaml is null. Please '
988
                'provide a valid port name.'
989
            )
UNCOV
990
        Frame2TTL(
×
991
            port=self.hardware_settings['device_frame2ttl']['COM_F2TTL'],
992
            threshold_dark=self.hardware_settings['device_frame2ttl']['F2TTL_DARK_THRESH'],
993
            threshold_light=self.hardware_settings['device_frame2ttl']['F2TTL_LIGHT_THRESH'],
994
        ).close()
UNCOV
995
        log.info('Frame2TTL: Thresholds set.')
×
996

997

998
class RotaryEncoderMixin(BaseSession, HasBpod):
2✔
999
    """Rotary encoder interface for state machine."""
1000

1001
    device_rotary_encoder: RotaryEncoderModule
2✔
1002

1003
    @property
2✔
1004
    def stimulus_gain(self) -> float:
2✔
1005
        return self.task_params.STIM_GAIN
2✔
1006

1007
    def init_mixin_rotary_encoder(self):
2✔
1008
        thresholds_deg = self.task_params.STIM_POSITIONS + self.task_params.QUIESCENCE_THRESHOLDS
2✔
1009
        self.device_rotary_encoder = RotaryEncoderModule(
2✔
1010
            self.hardware_settings.device_rotary_encoder, thresholds_deg, self.stimulus_gain
1011
        )
1012

1013
    def start_mixin_rotary_encoder(self):
2✔
1014
        self.device_rotary_encoder.gain = self.stimulus_gain
2✔
1015
        self.device_rotary_encoder.open()
2✔
UNCOV
1016
        self.device_rotary_encoder.write_parameters()
×
UNCOV
1017
        self.device_rotary_encoder.close()
×
UNCOV
1018
        log.info('Rotary Encoder Module loaded: OK')
×
1019

1020

1021
class ValveMixin(BaseSession, HasBpod):
2✔
1022
    def init_mixin_valve(self: object):
2✔
1023
        self.valve = Valve(self.hardware_settings.device_valve)
2✔
1024

1025
    def start_mixin_valve(self):
2✔
1026
        # assert that valve has been calibrated
UNCOV
1027
        assert self.valve.is_calibrated, """VALVE IS NOT CALIBRATED - PLEASE CALIBRATE THE VALVE"""
×
1028

1029
        # regardless of the calibration method, the reward valve time has to be lower than 1 second
UNCOV
1030
        assert self.compute_reward_time(amount_ul=1.5) < 1, """VALVE IS NOT PROPERLY CALIBRATED - PLEASE RECALIBRATE"""
×
UNCOV
1031
        log.info('Water valve module loaded: OK')
×
1032

1033
    def compute_reward_time(self, amount_ul: float | None = None) -> float:
2✔
1034
        """
1035
        Converts the valve opening time from a given volume.
1036

1037
        Parameters
1038
        ----------
1039
        amount_ul : float, optional
1040
            The volume of liquid (μl) to be dispensed from the valve. Defaults to task_params.REWARD_AMOUNT_UL.
1041

1042
        Returns
1043
        -------
1044
        float
1045
            Valve opening time in seconds.
1046
        """
1047
        amount_ul = self.task_params.REWARD_AMOUNT_UL if amount_ul is None else amount_ul
2✔
1048
        return self.valve.values.ul2ms(amount_ul) / 1e3
2✔
1049

1050
    def valve_open(self, reward_valve_time):
2✔
1051
        """
1052
        Open the reward valve for a given amount of time and return bpod data.
1053

1054
        Parameters
1055
        ----------
1056
        reward_valve_time : float
1057
            Amount of time in seconds to open the reward valve.
1058
        """
UNCOV
1059
        sma = StateMachine(self.bpod)
×
UNCOV
1060
        sma.add_state(
×
1061
            state_name='valve_open',
1062
            state_timer=reward_valve_time,
1063
            output_actions=[('Valve1', 255), ('BNC1', 255)],  # To FPGA
1064
            state_change_conditions={'Tup': 'exit'},
1065
        )
UNCOV
1066
        self.bpod.send_state_machine(sma)
×
UNCOV
1067
        self.bpod.run_state_machine(sma)  # Locks until state machine 'exit' is reached
×
UNCOV
1068
        return self.bpod.session.current_trial.export()
×
1069

1070

1071
class SoundMixin(BaseSession, HasBpod):
2✔
1072
    """Sound interface methods for state machine."""
1073

1074
    def init_mixin_sound(self):
2✔
1075
        self.sound = Bunch({'GO_TONE': None, 'WHITE_NOISE': None})
2✔
1076
        sound_output = self.hardware_settings.device_sound['OUTPUT']
2✔
1077

1078
        # additional gain factor for bringing the different combinations of sound-cards and amps to the same output level
1079
        # TODO: this needs proper calibration and refactoring
1080
        if self.hardware_settings.device_sound.OUTPUT == 'hifi' and self.hardware_settings.device_sound.AMP_TYPE == 'AMP2X15':
2✔
UNCOV
1081
            amp_gain_factor = 0.25
×
1082
        else:
1083
            amp_gain_factor = 1.0
2✔
1084
        self.task_params.GO_TONE_AMPLITUDE *= amp_gain_factor
2✔
1085
        self.task_params.WHITE_NOISE_AMPLITUDE *= amp_gain_factor
2✔
1086

1087
        # sound device sd is actually the module soundevice imported above.
1088
        # not sure how this plays out when referenced outside of this python file
1089
        self.sound['sd'], self.sound['samplerate'], self.sound['channels'] = sound_device_factory(output=sound_output)
2✔
1090
        # Create sounds and output actions of state machine
1091
        self.sound['GO_TONE'] = iblrig.sound.make_sound(
2✔
1092
            rate=self.sound['samplerate'],
1093
            frequency=self.task_params.GO_TONE_FREQUENCY,
1094
            duration=self.task_params.GO_TONE_DURATION,
1095
            amplitude=self.task_params.GO_TONE_AMPLITUDE * amp_gain_factor,
1096
            fade=0.01,
1097
            chans=self.sound['channels'],
1098
        )
1099
        self.sound['WHITE_NOISE'] = iblrig.sound.make_sound(
2✔
1100
            rate=self.sound['samplerate'],
1101
            frequency=-1,
1102
            duration=self.task_params.WHITE_NOISE_DURATION,
1103
            amplitude=self.task_params.WHITE_NOISE_AMPLITUDE * amp_gain_factor,
1104
            fade=0.01,
1105
            chans=self.sound['channels'],
1106
        )
1107

1108
    def start_mixin_sound(self):
2✔
1109
        """
1110
        Depends on bpod mixin start for hard sound card
1111
        :return:
1112
        """
UNCOV
1113
        assert self.bpod.is_connected, 'The sound mixin depends on the bpod mixin being connected'
×
1114
        # SoundCard config params
UNCOV
1115
        match self.hardware_settings.device_sound['OUTPUT']:
×
UNCOV
1116
            case 'harp':
×
UNCOV
1117
                assert self.bpod.sound_card is not None, 'No harp sound-card connected to Bpod'
×
UNCOV
1118
                sound.configure_sound_card(
×
1119
                    sounds=[self.sound.GO_TONE, self.sound.WHITE_NOISE],
1120
                    indexes=[self.task_params.GO_TONE_IDX, self.task_params.WHITE_NOISE_IDX],
1121
                    sample_rate=self.sound['samplerate'],
1122
                )
UNCOV
1123
                self.bpod.define_harp_sounds_actions(
×
1124
                    module=self.bpod.sound_card,
1125
                    go_tone_index=self.task_params.GO_TONE_IDX,
1126
                    noise_index=self.task_params.WHITE_NOISE_IDX,
1127
                )
UNCOV
1128
            case 'hifi':
×
UNCOV
1129
                module = self.bpod.get_module('^HiFi')
×
UNCOV
1130
                assert module is not None, 'No HiFi module connected to Bpod'
×
UNCOV
1131
                assert self.hardware_settings.device_sound.COM_SOUND is not None
×
UNCOV
1132
                hifi = HiFi(port=self.hardware_settings.device_sound.COM_SOUND, sampling_rate_hz=self.sound['samplerate'])
×
UNCOV
1133
                hifi.load(index=self.task_params.GO_TONE_IDX, data=self.sound.GO_TONE)
×
UNCOV
1134
                hifi.load(index=self.task_params.WHITE_NOISE_IDX, data=self.sound.WHITE_NOISE)
×
UNCOV
1135
                hifi.push()
×
UNCOV
1136
                hifi.close()
×
UNCOV
1137
                self.bpod.define_harp_sounds_actions(
×
1138
                    module=module,
1139
                    go_tone_index=self.task_params.GO_TONE_IDX,
1140
                    noise_index=self.task_params.WHITE_NOISE_IDX,
1141
                )
UNCOV
1142
            case _:
×
UNCOV
1143
                self.bpod.define_xonar_sounds_actions()
×
UNCOV
1144
        log.info(f"Sound module loaded: OK: {self.hardware_settings.device_sound['OUTPUT']}")
×
1145

1146
    def sound_play_noise(self, state_timer=0.510, state_name='play_noise'):
2✔
1147
        """
1148
        Play the noise sound for the error feedback using bpod state machine.
1149
        :return: bpod current trial export
1150
        """
UNCOV
1151
        return self._sound_play(state_name=state_name, output_actions=[self.bpod.actions.play_tone], state_timer=state_timer)
×
1152

1153
    def sound_play_tone(self, state_timer=0.102, state_name='play_tone'):
2✔
1154
        """
1155
        Play the ready tone beep using bpod state machine.
1156
        :return: bpod current trial export
1157
        """
UNCOV
1158
        return self._sound_play(state_name=state_name, output_actions=[self.bpod.actions.play_tone], state_timer=state_timer)
×
1159

1160
    def _sound_play(self, state_timer=None, output_actions=None, state_name='play_sound'):
2✔
1161
        """Plays a sound using bpod state machine.
1162

1163
        The sound must be defined in the init_mixin_sound method.
1164
        """
UNCOV
1165
        assert state_timer is not None, 'state_timer must be defined'
×
UNCOV
1166
        assert output_actions is not None, 'output_actions must be defined'
×
UNCOV
1167
        sma = StateMachine(self.bpod)
×
UNCOV
1168
        sma.add_state(
×
1169
            state_name=state_name,
1170
            state_timer=state_timer,
1171
            output_actions=[self.bpod.actions.play_tone],
1172
            state_change_conditions={'BNC2Low': 'exit', 'Tup': 'exit'},
1173
        )
UNCOV
1174
        self.bpod.send_state_machine(sma)
×
UNCOV
1175
        self.bpod.run_state_machine(sma)  # Locks until state machine 'exit' is reached
×
UNCOV
1176
        return self.bpod.session.current_trial.export()
×
1177

1178

1179
class NetworkSession(BaseSession):
2✔
1180
    """A mixin for communicating to auxiliary acquisition PC over a network."""
1181

1182
    remote_rigs = None
2✔
1183
    """net.Auxiliaries: An auxiliary services object for communicating with remote devices."""
2✔
1184
    exp_ref = None
2✔
1185
    """dict: The experiment reference (i.e. subject, date, sequence) as returned by main remote service."""
2✔
1186

1187
    def __init__(self, *_, remote_rigs=None, **kwargs):
2✔
1188
        """
1189
        A mixin for communicating to auxiliary acquisition PC over a network.
1190

1191
        This should retrieve the services list, i.e. the list of available auxiliary rigs,
1192
        and determine which is the main sync. The main sync is the rig that determines the
1193
        experiment.
1194

1195
        The services list is in a yaml file somewhere, called 'remote_rigs.yaml' and should
1196
        be a map of device name to URI. These are then selectable in the GUI and the URI of
1197
        those selected are added to the experiment description.
1198

1199
        Subclasses should add their callbacks within init by calling :meth:`self.remote_rigs.services.assign_callback`.
1200

1201
        Parameters
1202
        ----------
1203
        remote_rigs : list, dict
1204
            Either a list of remote device names (in which case URI is looked up from remote devices
1205
            file), or a map of device name to URI.
1206
        kwargs
1207
            Optional args such as 'file_iblrig_settings' for defining location of remote data folder
1208
            when loading remote devices file.
1209
        """
1210
        if isinstance(remote_rigs, list):
2✔
1211
            # For now we flatten to list of remote rig names but could permit list of (name, URI) tuples
1212
            remote_rigs = list(filter(None, flatten(remote_rigs)))
2✔
1213
            all_remote_rigs = net.get_remote_devices(iblrig_settings=kwargs.get('iblrig_settings'))
2✔
1214
            if not set(remote_rigs).issubset(all_remote_rigs.keys()):
2✔
1215
                raise ValueError('Selected remote rigs not in remote rigs list')
2✔
UNCOV
1216
            remote_rigs = {k: v for k, v in all_remote_rigs.items() if k in remote_rigs}
×
1217
        # Load and connect to remote services
1218
        self.connect(remote_rigs)
2✔
1219
        self.exp_ref = {}
2✔
1220
        try:
2✔
1221
            super().__init__(**kwargs)
2✔
1222
        except Exception as ex:
2✔
1223
            self.cleanup_mixin_network()
2✔
1224
            raise ex
2✔
1225

1226
    @property
2✔
1227
    def one(self):
2✔
1228
        """Return ONE instance.
1229

1230
        Unlike super class getter, this method will always instantiate ONE, allowing subclasses to update with an Alyx
1231
        token from a remotely connected rig.  This instance is used for formatting the experiment reference string.
1232

1233
        Returns
1234
        -------
1235
        one.api.One
1236
            An instance of ONE.
1237
        """
1238
        if super().one is None:
2✔
UNCOV
1239
            self._one = OneAlyx(silent=True, mode='local')
×
1240
        return self._one
2✔
1241

1242
    def connect(self, remote_rigs):
2✔
1243
        """
1244
        Connect to remote services.
1245

1246
        Instantiates the Communicator objects that establish connections with each remote device.
1247
        This also creates the thread that uses asynchronous callbacks.
1248

1249
        Parameters
1250
        ----------
1251
        remote_rigs : dict
1252
            A map of name to URI.
1253
        """
1254
        self.remote_rigs = net.Auxiliaries(remote_rigs or {})
2✔
1255
        assert not remote_rigs or self.remote_rigs.is_connected
2✔
1256
        # Handle termination event by graciously completing thread
1257
        signal.signal(signal.SIGTERM, lambda sig, frame: self.cleanup_mixin_network())
2✔
1258

1259
    def _init_paths(self, append: bool = False):
2✔
1260
        """
1261
        Determine session paths.
1262

1263
        Unlike :meth:`BaseSession._init_paths`, this method determines the session number from the remote main sync if
1264
        connected.
1265

1266
        Parameters
1267
        ----------
1268
        append : bool
1269
            Iterate task collection within today's most recent session folder for the selected subject, instead of
1270
            iterating session number.
1271

1272
        Returns
1273
        -------
1274
        iblutil.util.Bunch
1275
            A bunch of paths.
1276
        """
1277
        if self.hardware_settings.MAIN_SYNC:
2✔
1278
            return BaseSession._init_paths(self, append)
2✔
1279
        # Check if we have rigs connected
1280
        if not self.remote_rigs.is_connected:
2✔
1281
            log.warning('No remote rigs; experiment reference may not match the main sync.')
2✔
1282
            return BaseSession._init_paths(self, append)
2✔
1283
        # Set paths in a similar way to the super class
1284
        rig_computer_paths = iblrig.path_helper.get_local_and_remote_paths(
2✔
1285
            local_path=self.iblrig_settings['iblrig_local_data_path'],
1286
            remote_path=self.iblrig_settings['iblrig_remote_data_path'],
1287
            lab=self.iblrig_settings['ALYX_LAB'],
1288
            iblrig_settings=self.iblrig_settings,
1289
        )
1290
        paths = Bunch({'IBLRIG_FOLDER': BASE_PATH})
2✔
1291
        paths.BONSAI = BONSAI_EXE
2✔
1292
        paths.VISUAL_STIM_FOLDER = paths.IBLRIG_FOLDER.joinpath('visual_stim')
2✔
1293
        paths.LOCAL_SUBJECT_FOLDER = rig_computer_paths['local_subjects_folder']
2✔
1294
        paths.REMOTE_SUBJECT_FOLDER = rig_computer_paths['remote_subjects_folder']
2✔
1295
        date_folder = paths.LOCAL_SUBJECT_FOLDER.joinpath(
2✔
1296
            self.session_info.SUBJECT_NAME, self.session_info.SESSION_START_TIME[:10]
1297
        )
1298
        assert self.exp_ref
2✔
1299
        paths.SESSION_FOLDER = date_folder / f'{self.exp_ref["sequence"]:03}'
2✔
1300
        paths.TASK_COLLECTION = iblrig.path_helper.iterate_collection(paths.SESSION_FOLDER)
2✔
1301
        if append == paths.TASK_COLLECTION.endswith('00'):
2✔
1302
            raise ValueError(
2✔
1303
                f'Append value incorrect. Either remove previous task collections from '
1304
                f'{paths.SESSION_FOLDER}, or select append in GUI (--append arg in cli)'
1305
            )
1306

1307
        paths.SESSION_RAW_DATA_FOLDER = paths.SESSION_FOLDER.joinpath(paths.TASK_COLLECTION)
2✔
1308
        paths.DATA_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskData.raw.jsonable')
2✔
1309
        paths.SETTINGS_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskSettings.raw.json')
2✔
1310
        self.session_info.SESSION_NUMBER = int(paths.SESSION_FOLDER.name)
2✔
1311
        return paths
2✔
1312

1313
    def run(self):
2✔
1314
        """Run session and report exceptions to remote services."""
1315
        self.start_mixin_network()
2✔
1316
        try:
2✔
1317
            return super().run()
2✔
1318
        except Exception as e:
2✔
1319
            # Communicate error to services
1320
            if self.remote_rigs.is_connected:
2✔
1321
                tb = e.__traceback__  # TODO maybe go one level down with tb_next?
2✔
1322
                details = {
2✔
1323
                    'error': e.__class__.__name__,  # exception name str
1324
                    'message': str(e),  # error str
1325
                    'traceback': traceback.format_exc(),  # stack str
1326
                    'file': tb.tb_frame.f_code.co_filename,  # filename str
1327
                    'line_no': (tb.tb_lineno, tb.tb_lasti),  # (int, int)
1328
                }
1329
                self.remote_rigs.push(ExpMessage.EXPINTERRUPT, details, wait=True)
2✔
1330
                self.cleanup_mixin_network()
2✔
1331
            raise e
2✔
1332

1333
    def communicate(self, message, *args, raise_on_exception=True):
2✔
1334
        """
1335
        Communicate message to remote services.
1336

1337
        This method is blocking and by default will raise if not all responses received in time.
1338

1339
        Parameters
1340
        ----------
1341
        message : iblutil.io.net.base.ExpMessage, str, int
1342
            An experiment message to send to remote services.
1343
        args
1344
            One or more optional variables to send.
1345
        raise_on_exception : bool
1346
            If true, exceptions arising from message timeouts will be re-raised in main thread and
1347
            services will be cleaned up. Only applies when wait is true.
1348

1349
        Returns
1350
        -------
1351
        Exception | dict
1352
            If raise_on_exception is False, returns an exception if failed to receive all responses
1353
            in time, otherwise a map of service name to response is returned.
1354
        """
1355
        r = self.remote_rigs.push(message, *args, wait=True)
2✔
1356
        if isinstance(r, Exception):
2✔
1357
            log.error('Error on %s network mixin: %s', message, r)
2✔
1358
            if raise_on_exception:
2✔
1359
                self.cleanup_mixin_network()
2✔
1360
                raise r
2✔
1361
        return r
2✔
1362

1363
    def get_exp_info(self):
2✔
1364
        ref = self.exp_ref or None
2✔
1365
        if isinstance(ref, dict) and self.one:
2✔
UNCOV
1366
            ref = self.one.dict2ref(ref)
×
1367
        is_main_sync = self.hardware_settings.get('MAIN_SYNC', False)
2✔
1368
        info = net.ExpInfo(ref, is_main_sync, self.experiment_description, master=is_main_sync)
2✔
1369
        return info.to_dict()
2✔
1370

1371
    def init_mixin_network(self):
2✔
1372
        """Initialize remote services.
1373

1374
        This method sends an EXPINFO message to all services, expecting exactly one of the responses
1375
        to contain main_sync: True, along with the experiment reference to use. It then sends an
1376
        EXPINIT message to all services.
1377
        """
1378
        if not self.remote_rigs.is_connected:
2✔
1379
            return
2✔
1380
        # Determine experiment reference from main sync
1381
        is_main_sync = self.hardware_settings.get('MAIN_SYNC', False)
2✔
1382
        if is_main_sync:
2✔
1383
            raise NotImplementedError
2✔
1384
        assert self.one
2✔
1385

1386
        expinfo = self.get_exp_info() | {'subject': self.session_info['SUBJECT_NAME']}
2✔
1387
        r = self.communicate('EXPINFO', 'CONNECTED', expinfo)
2✔
1388
        assert sum(x[-1]['main_sync'] for x in r.values()) == 1, 'one main sync expected'
2✔
1389
        main_rig_name, (status, info) = next((k, v) for k, v in r.items() if v[-1]['main_sync'])
2✔
1390
        self.exp_ref = self.one.ref2dict(info['exp_ref']) if isinstance(info['exp_ref'], str) else info['exp_ref']
2✔
1391
        if self.exp_ref['subject'] != self.session_info['SUBJECT_NAME']:
2✔
1392
            log.error(
2✔
1393
                'Running task for "%s" but main sync returned exp ref for "%s".',
1394
                self.session_info['SUBJECT_NAME'],
1395
                self.exp_ref['subject'],
1396
            )
1397
            raise ValueError("Subject name doesn't match remote session on " + main_rig_name)
2✔
1398
        if str(self.exp_ref['date']) != self.session_info['SESSION_START_TIME'][:10]:
2✔
1399
            raise RuntimeError(
2✔
1400
                f'Session dates do not match between this rig and {main_rig_name}. \n'
1401
                f'Running past or future sessions not currently supported. \n'
1402
                f'Please check the system date time settings on each rig.'
1403
            )
1404

1405
        # exp_ref = ConversionMixin.path2ref(self.paths['SESSION_FOLDER'], as_dict=False)
1406
        exp_ref = self.one.dict2ref(self.exp_ref)
2✔
1407
        self.communicate('EXPINIT', {'exp_ref': exp_ref})
2✔
1408

1409
    def start_mixin_network(self):
2✔
1410
        """Start remote services.
1411

1412
        This method sends an EXPSTART message to all services, along with an exp_ref string.
1413
        Responses are required but ignored.
1414
        """
1415
        if not self.remote_rigs.is_connected:
2✔
1416
            return
2✔
1417
        self.communicate('EXPSTART', self.exp_ref)
2✔
1418

1419
    def stop_mixin_network(self):
2✔
1420
        """Start remote services.
1421

1422
        This method sends an EXPEND message to all services. Responses are required but ignored.
1423
        """
1424
        if not self.remote_rigs.is_connected:
2✔
1425
            return
2✔
1426
        self.communicate('EXPEND')
2✔
1427

1428
    def cleanup_mixin_network(self):
2✔
1429
        """Clean up services."""
1430
        self.remote_rigs.close()
2✔
1431
        if self.remote_rigs.is_connected:
2✔
1432
            log.warning('Failed to properly clean up network mixin')
2✔
1433

1434

1435
class SpontaneousSession(BaseSession):
2✔
1436
    """
1437
    A Spontaneous task doesn't have trials, it just runs until the user stops it.
1438

1439
    It is used to get extraction structure for data streams
1440
    """
1441

1442
    def __init__(self, duration_secs=None, **kwargs):
2✔
1443
        super().__init__(**kwargs)
2✔
1444
        self.duration_secs = duration_secs
2✔
1445

1446
    def start_hardware(self):
2✔
1447
        pass  # no mixin here, life is but a dream
2✔
1448

1449
    def _run(self):
2✔
1450
        """Run the task with the actual state machine."""
1451
        log.info('Starting spontaneous acquisition')
2✔
1452
        while True:
2✔
1453
            time.sleep(1.5)
2✔
1454
            if self.duration_secs is not None and self.time_elapsed.seconds > self.duration_secs:
2✔
1455
                break
2✔
UNCOV
1456
            if self.paths.SESSION_FOLDER.joinpath('.stop').exists():
×
UNCOV
1457
                self.paths.SESSION_FOLDER.joinpath('.stop').unlink()
×
UNCOV
1458
                break
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc