• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 11407201950

18 Oct 2024 04:12PM UTC coverage: 47.898% (+1.1%) from 46.79%
11407201950

Pull #730

github

86ab26
web-flow
Merge 9801a3e94 into 0f4a57326
Pull Request #730: 8.24.4

47 of 68 new or added lines in 8 files covered. (69.12%)

1013 existing lines in 22 files now uncovered.

4170 of 8706 relevant lines covered (47.9%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.97
/iblrig/base_tasks.py
1
"""
2
Commonalities for all tasks.
3

4
This module provides hardware mixins that can be used together with BaseSession to compose tasks.
5
This module tries to exclude task related logic.
6
"""
7

8
import argparse
2✔
9
import contextlib
2✔
10
import datetime
2✔
11
import importlib.metadata
2✔
12
import inspect
2✔
13
import json
2✔
14
import logging
2✔
15
import signal
2✔
16
import sys
2✔
17
import time
2✔
18
import traceback
2✔
19
from abc import ABC, abstractmethod
2✔
20
from collections import OrderedDict
2✔
21
from collections.abc import Callable
2✔
22
from pathlib import Path
2✔
23
from typing import Protocol, final
2✔
24

25
import numpy as np
2✔
26
import pandas as pd
2✔
27
import yaml
2✔
28
from pythonosc import udp_client
2✔
29

30
import ibllib.io.session_params as ses_params
2✔
31
import iblrig.graphic as graph
2✔
32
import iblrig.path_helper
2✔
33
import pybpodapi
2✔
34
from ibllib.oneibl.registration import IBLRegistrationClient
2✔
35
from iblrig import net, path_helper, sound
2✔
36
from iblrig.constants import BASE_PATH, BONSAI_EXE, PYSPIN_AVAILABLE
2✔
37
from iblrig.frame2ttl import Frame2TTL
2✔
38
from iblrig.hardware import SOFTCODE, Bpod, RotaryEncoderModule, sound_device_factory
2✔
39
from iblrig.hifi import HiFi
2✔
40
from iblrig.path_helper import load_pydantic_yaml
2✔
41
from iblrig.pydantic_definitions import HardwareSettings, RigSettings, TrialDataModel
2✔
42
from iblrig.tools import call_bonsai
2✔
43
from iblrig.transfer_experiments import BehaviorCopier, VideoCopier
2✔
44
from iblrig.valve import Valve
2✔
45
from iblutil.io.net.base import ExpMessage
2✔
46
from iblutil.spacer import Spacer
2✔
47
from iblutil.util import Bunch, flatten, setup_logger
2✔
48
from one.alf.io import next_num_folder
2✔
49
from one.api import ONE, OneAlyx
2✔
50
from pybpodapi.protocol import StateMachine
2✔
51

52
OSC_CLIENT_IP = '127.0.0.1'
2✔
53

54
log = logging.getLogger(__name__)
2✔
55

56

57
class HasBpod(Protocol):
2✔
58
    bpod: Bpod
2✔
59

60

61
class BaseSession(ABC):
2✔
62
    version = None
2✔
63
    """str: !!CURRENTLY UNUSED!! task version string."""
2✔
64
    # protocol_name: str | None = None
65
    """str: The name of the task protocol (NB: avoid spaces)."""
2✔
66
    base_parameters_file: Path | None = None
2✔
67
    """Path: A YAML file containing base, default task parameters."""
2✔
68
    is_mock = False
2✔
69
    """list of str: One or more ibllib.pipes.tasks.Task names for task extraction."""
2✔
70
    logger: logging.Logger = None
2✔
71
    """logging.Logger: Log instance used solely to keep track of log level passed to constructor."""
2✔
72
    experiment_description: dict = {}
2✔
73
    """dict: The experiment description."""
2✔
74
    extractor_tasks: list | None = None
2✔
75
    """list of str: An optional list of pipeline task class names to instantiate when preprocessing task data."""
2✔
76

77
    TrialDataModel: type[TrialDataModel]
2✔
78

79
    @property
2✔
80
    @abstractmethod
2✔
81
    def protocol_name(self) -> str: ...
2✔
82

83
    def __init__(
2✔
84
        self,
85
        subject=None,
86
        task_parameter_file=None,
87
        file_hardware_settings=None,
88
        hardware_settings: HardwareSettings = None,
89
        file_iblrig_settings=None,
90
        iblrig_settings: RigSettings = None,
91
        one=None,
92
        interactive=True,
93
        projects=None,
94
        procedures=None,
95
        stub=None,
96
        subject_weight_grams=None,
97
        append=False,
98
        wizard=False,
99
        log_level='INFO',
100
        **kwargs,
101
    ):
102
        """
103
        :param subject: The subject nickname. Required.
104
        :param task_parameter_file: an optional path to the task_parameters.yaml file
105
        :param file_hardware_settings: name of the hardware file in the settings folder, or full file path
106
        :param hardware_settings: an optional dictionary of hardware settings. Keys will override any keys in the file
107
        :param file_iblrig_settings: name of the iblrig file in the settings folder, or full file path
108
        :param iblrig_settings: an optional dictionary of iblrig settings. Keys will override any keys in the file
109
        :param one: an optional instance of ONE
110
        :param interactive:
111
        :param projects: An optional list of Alyx protocols.
112
        :param procedures: An optional list of Alyx procedures.
113
        :param subject_weight_grams: weight of the subject
114
        :param stub: A full path to an experiment description file containing experiment information.
115
        :param append: bool, if True, append to the latest existing session of the same subject for the same day
116
        """
117
        self.extractor_tasks = getattr(self, 'extractor_tasks', None)
2✔
118
        self._logger = None
2✔
119
        self._setup_loggers(level=log_level)
2✔
120
        if not isinstance(self, EmptySession):
2✔
121
            log.info(f'iblrig version {iblrig.__version__}')
2✔
122
            log.info(f'pybpod version {pybpodapi.__version__}')
2✔
123
            log.info(
2✔
124
                f'Session protocol: {self.protocol_name} '
125
                f'({f"version {self.version})" if self.version is not None else "undefined version"})'
126
            )
127

128
        log.info(f'Session call: {" ".join(sys.argv)}')
2✔
129
        self.interactive = interactive
2✔
130
        self._one = one
2✔
131
        self.init_datetime = datetime.datetime.now()
2✔
132

133
        # loads in the settings: first load the files, then update with the input argument if provided
134
        self.hardware_settings: HardwareSettings = load_pydantic_yaml(HardwareSettings, file_hardware_settings)
2✔
135
        if hardware_settings is not None:
2✔
136
            self.hardware_settings.update(hardware_settings)
2✔
137
            HardwareSettings.model_validate(self.hardware_settings)
2✔
138
        self.iblrig_settings: RigSettings = load_pydantic_yaml(RigSettings, file_iblrig_settings)
2✔
139
        if iblrig_settings is not None:
2✔
140
            self.iblrig_settings.update(iblrig_settings)
2✔
141
            RigSettings.model_validate(self.iblrig_settings)
2✔
142

143
        self.wizard = wizard
2✔
144

145
        # Load the tasks settings, from the task folder or override with the input argument
146
        self.task_params = self.read_task_parameter_files(task_parameter_file)
2✔
147

148
        self.session_info = Bunch(
2✔
149
            {
150
                'NTRIALS': 0,
151
                'NTRIALS_CORRECT': 0,
152
                'PROCEDURES': procedures,
153
                'PROJECTS': projects,
154
                'SESSION_START_TIME': self.init_datetime.isoformat(),
155
                'SESSION_END_TIME': None,
156
                'SESSION_NUMBER': 0,
157
                'SUBJECT_NAME': subject,
158
                'SUBJECT_WEIGHT': subject_weight_grams,
159
                'TOTAL_WATER_DELIVERED': 0,
160
            }
161
        )
162
        # Executes mixins init methods
163
        self._execute_mixins_shared_function('init_mixin')
2✔
164
        self.paths = self._init_paths(append=append)
2✔
165
        if not isinstance(self, EmptySession):
2✔
166
            log.info(f'Session raw data: {self.paths.SESSION_RAW_DATA_FOLDER}')
2✔
167
        # Prepare the experiment description dictionary
168
        self.experiment_description = self.make_experiment_description_dict(
2✔
169
            self.protocol_name,
170
            self.paths.get('TASK_COLLECTION'),
171
            procedures,
172
            projects,
173
            self.hardware_settings,
174
            stub,
175
            extractors=self.extractor_tasks,
176
        )
177

178
    @classmethod
2✔
179
    def get_task_file(cls) -> Path:
2✔
180
        """
181
        Get the path to the task's python file.
182

183
        Returns
184
        -------
185
        Path
186
            The path to the task file.
187
        """
188
        return Path(inspect.getfile(cls))
2✔
189

190
    @classmethod
2✔
191
    def get_task_directory(cls) -> Path:
2✔
192
        """
193
        Get the path to the task's directory.
194

195
        Returns
196
        -------
197
        Path
198
            The path to the task's directory.
199
        """
200
        return cls.get_task_file().parent
2✔
201

202
    @classmethod
2✔
203
    def read_task_parameter_files(cls, task_parameter_file: str | Path | None = None) -> Bunch:
2✔
204
        """
205
        Get the task's parameters from the various YAML files in the hierarchy.
206

207
        Parameters
208
        ----------
209
        task_parameter_file : str or Path, optional
210
            Path to override the task parameter file
211

212
        Returns
213
        -------
214
        Bunch
215
            Task parameters
216
        """
217
        # Load the tasks settings, from the task folder or override with the input argument
218
        base_parameters_files = [task_parameter_file or cls.get_task_directory().joinpath('task_parameters.yaml')]
2✔
219

220
        # loop through the task hierarchy to gather parameter files
221
        for c in cls.__mro__:
2✔
222
            base_file = getattr(c, 'base_parameters_file', None)
2✔
223
            if base_file is not None:
2✔
224
                base_parameters_files.append(base_file)
2✔
225

226
        # remove list duplicates while preserving order, we want the highest order first
227
        base_parameters_files = list(reversed(list(dict.fromkeys(base_parameters_files))))
2✔
228

229
        # loop through files and update the dictionary, the latest files in the hierarchy have precedence
230
        task_params = dict()
2✔
231
        for param_file in base_parameters_files:
2✔
232
            if Path(param_file).exists():
2✔
233
                with open(param_file) as fp:
2✔
234
                    params = yaml.safe_load(fp)
2✔
235
                if params is not None:
2✔
236
                    task_params.update(params)
2✔
237

238
        # at last sort the dictionary so itÅ› easier for a human to navigate the many keys, return as a Bunch
239
        return Bunch(sorted(task_params.items()))
2✔
240

241
    def _init_paths(self, append: bool = False) -> Bunch:
2✔
242
        r"""
243
        Initialize session paths.
244

245
        Parameters
246
        ----------
247
        append : bool
248
            Iterate task collection within today's most recent session folder for the selected subject, instead of
249
            iterating session number.
250

251
        Returns
252
        -------
253
        Bunch
254
            Bunch with keys:
255

256
            *   BONSAI: full path to the bonsai executable
257
                `C:\iblrigv8\Bonsai\Bonsai.exe`
258
            *   VISUAL_STIM_FOLDER: full path to the visual stimulus folder
259
                `C:\iblrigv8\visual_stim`
260
            *   LOCAL_SUBJECT_FOLDER: full path to the local subject folder
261
                `C:\iblrigv8_data\mainenlab\Subjects`
262
            *   REMOTE_SUBJECT_FOLDER: full path to the remote subject folder
263
                `Y:\Subjects`
264
            *   SESSION_FOLDER: full path to the current session:
265
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001`
266
            *   TASK_COLLECTION: folder name of the current task
267
                `raw_task_data_00`
268
            *   SESSION_RAW_DATA_FOLDER: concatenation of the session folder and the task collection.
269
                This is where the task data gets written
270
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001\raw_task_data_00`
271
            *   DATA_FILE_PATH: contains the bpod trials
272
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001\raw_task_data_00\_iblrig_taskData.raw.jsonable`
273
            *   SETTINGS_FILE_PATH: contains the task settings
274
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001\raw_task_data_00\_iblrig_taskSettings.raw.json`
275
        """
276
        rig_computer_paths = path_helper.get_local_and_remote_paths(
2✔
277
            local_path=self.iblrig_settings.iblrig_local_data_path,
278
            remote_path=self.iblrig_settings.iblrig_remote_data_path,
279
            lab=self.iblrig_settings.ALYX_LAB,
280
            iblrig_settings=self.iblrig_settings,
281
        )
282
        paths = Bunch({'IBLRIG_FOLDER': BASE_PATH})
2✔
283
        paths.BONSAI = BONSAI_EXE
2✔
284
        paths.VISUAL_STIM_FOLDER = BASE_PATH.joinpath('visual_stim')
2✔
285
        paths.LOCAL_SUBJECT_FOLDER = rig_computer_paths['local_subjects_folder']
2✔
286
        paths.REMOTE_SUBJECT_FOLDER = rig_computer_paths['remote_subjects_folder']
2✔
287
        # initialize the session path
288
        date_folder = paths.LOCAL_SUBJECT_FOLDER.joinpath(
2✔
289
            self.session_info.SUBJECT_NAME, self.session_info.SESSION_START_TIME[:10]
290
        )
291
        if append:
2✔
292
            # this is the case where we append a new protocol to an existing session
293
            todays_sessions = sorted(filter(Path.is_dir, date_folder.glob('*')), reverse=True)
2✔
294
            assert len(todays_sessions) > 0, f'Trying to chain a protocol, but no session folder found in {date_folder}'
2✔
295
            paths.SESSION_FOLDER = todays_sessions[0]
2✔
296
            paths.TASK_COLLECTION = iblrig.path_helper.iterate_collection(paths.SESSION_FOLDER)
2✔
297
            if self.hardware_settings.get('MAIN_SYNC', False) and not paths.TASK_COLLECTION.endswith('00'):
2✔
UNCOV
298
                """
×
299
                Chained protocols make little sense when Bpod is the main sync as there is no
300
                continuous acquisition between protocols.  Only one sync collection can be defined in
301
                the experiment description file.
302
                If you are running experiments with an ephys rig (nidq) or an external daq, you should
303
                correct the MAIN_SYNC parameter in the hardware settings file in ./settings/hardware_settings.yaml
304
                """
UNCOV
305
                raise RuntimeError('Chained protocols not supported for bpod-only sessions')
×
306
        else:
307
            # in this case the session path is created from scratch
308
            paths.SESSION_FOLDER = date_folder / next_num_folder(date_folder)
2✔
309
            paths.TASK_COLLECTION = iblrig.path_helper.iterate_collection(paths.SESSION_FOLDER)
2✔
310

311
        self.session_info.SESSION_NUMBER = int(paths.SESSION_FOLDER.name)
2✔
312
        paths.SESSION_RAW_DATA_FOLDER = paths.SESSION_FOLDER.joinpath(paths.TASK_COLLECTION)
2✔
313
        paths.DATA_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskData.raw.jsonable')
2✔
314
        paths.SETTINGS_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskSettings.raw.json')
2✔
315
        return paths
2✔
316

317
    @property
2✔
318
    def exp_ref(self):
2✔
319
        """Construct an experiment reference string from the session info attribute."""
320
        subject, date, number = (self.session_info[k] for k in ('SUBJECT_NAME', 'SESSION_START_TIME', 'SESSION_NUMBER'))
2✔
321
        if not all([subject, date, number]):
2✔
322
            return None
2✔
323
        return self.one.dict2ref(dict(subject=subject, date=date[:10], sequence=str(number)))
2✔
324

325
    def _setup_loggers(self, level='INFO', level_bpod='WARNING', file=None):
2✔
326
        self._logger = setup_logger('iblrig', level=level, file=file)  # logger attr used by create_session to determine log level
2✔
327
        setup_logger('pybpodapi', level=level_bpod, file=file)
2✔
328

329
    @staticmethod
2✔
330
    def _remove_file_loggers():
2✔
331
        for logger_name in ['iblrig', 'pybpodapi']:
2✔
332
            logger = logging.getLogger(logger_name)
2✔
333
            file_handlers = [fh for fh in logger.handlers if isinstance(fh, logging.FileHandler)]
2✔
334
            for fh in file_handlers:
2✔
335
                fh.close()
2✔
336
                logger.removeHandler(fh)
2✔
337

338
    @staticmethod
2✔
339
    def make_experiment_description_dict(
2✔
340
        task_protocol: str,
341
        task_collection: str,
342
        procedures: list = None,
343
        projects: list = None,
344
        hardware_settings: dict | HardwareSettings = None,
345
        stub: Path = None,
346
        extractors: list = None,
347
        camera_config: str = None,
348
    ):
349
        """
350
        Construct an experiment description dictionary.
351

352
        Parameters
353
        ----------
354
        task_protocol : str
355
            The task protocol name, e.g. _ibl_trainingChoiceWorld2.0.0.
356
        task_collection : str
357
            The task collection name, e.g. raw_task_data_00.
358
        procedures : list
359
            An optional list of Alyx procedures.
360
        projects : list
361
            An optional list of Alyx protocols.
362
        hardware_settings : dict
363
            An optional dict of hardware devices, loaded from the hardware_settings.yaml file.
364
        stub : dict
365
            An optional experiment description stub to update.
366
        extractors: list
367
            An optional list of extractor names for the task.
368
        camera_config : str
369
            The camera configuration name in the hardware settings. Defaults to the first key in
370
            'device_cameras'.
371

372
        Returns
373
        -------
374
        dict
375
            The experiment description.
376
        """
377
        description = ses_params.read_params(stub) if stub else {}
2✔
378

379
        # Add hardware devices
380
        if hardware_settings is not None:
2✔
381
            if isinstance(hardware_settings, HardwareSettings):
2✔
382
                hardware_settings = hardware_settings.model_dump()
2✔
383
            devices = {}
2✔
384
            cams = hardware_settings.get('device_cameras', None)
2✔
385
            if cams:
2✔
386
                devices['cameras'] = {}
2✔
387
                camera_config = camera_config or next((k for k in cams), {})
2✔
388
                devices.update(VideoCopier.config2stub(cams[camera_config])['devices'])
2✔
389
            if hardware_settings.get('device_microphone', None):
2✔
390
                devices['microphone'] = {'microphone': {'collection': task_collection, 'sync_label': 'audio'}}
2✔
391
            ses_params.merge_params(description, {'devices': devices})
2✔
392

393
        # Add projects and procedures
394
        description['procedures'] = list(set(description.get('procedures', []) + (procedures or [])))
2✔
395
        description['projects'] = list(set(description.get('projects', []) + (projects or [])))
2✔
396
        is_main_sync = (hardware_settings or {}).get('MAIN_SYNC', False)
2✔
397
        # Add sync key if required
398
        if is_main_sync and 'sync' not in description:
2✔
399
            description['sync'] = {
2✔
400
                'bpod': {'collection': task_collection, 'acquisition_software': 'pybpod', 'extension': '.jsonable'}
401
            }
402
        # Add task
403
        task = {task_protocol: {'collection': task_collection}}
2✔
404
        if not is_main_sync:
2✔
405
            task[task_protocol]['sync_label'] = 'bpod'
2✔
406
        if extractors:
2✔
407
            assert isinstance(extractors, list), 'extractors parameter must be a list of strings'
2✔
408
            task[task_protocol].update({'extractors': extractors})
2✔
409
        if 'tasks' not in description:
2✔
410
            description['tasks'] = [task]
2✔
411
        else:
412
            description['tasks'].append(task)
2✔
413
        return description
2✔
414

415
    def _make_task_parameters_dict(self):
2✔
416
        """
417
        Create dictionary that will be saved to the settings json file for extraction.
418

419
        Returns
420
        -------
421
        dict
422
            A dictionary that will be saved to the settings json file for extraction.
423
        """
424
        output_dict = dict(self.task_params)  # Grab parameters from task_params session
2✔
425
        output_dict.update(self.hardware_settings.model_dump())  # Update dict with hardware settings from session
2✔
426
        output_dict.update(dict(self.session_info))  # Update dict with session_info (subject, procedure, projects)
2✔
427
        patch_dict = {  # Various values added to ease transition from iblrig v7 to v8, different home may be desired
2✔
428
            'IBLRIG_VERSION': iblrig.__version__,
429
            'PYBPOD_PROTOCOL': self.protocol_name,
430
            'ALYX_USER': self.iblrig_settings.ALYX_USER,
431
            'ALYX_LAB': self.iblrig_settings.ALYX_LAB,
432
        }
433
        with contextlib.suppress(importlib.metadata.PackageNotFoundError):
2✔
434
            patch_dict['PROJECT_EXTRACTION_VERSION'] = importlib.metadata.version('project_extraction')
2✔
435
        output_dict.update(patch_dict)
2✔
436
        return output_dict
2✔
437

438
    def save_task_parameters_to_json_file(self, destination_folder: Path | None = None) -> Path:
2✔
439
        """
440
        Collects the various settings and parameters of the session and outputs them to a JSON file.
441

442
        Returns
443
        -------
444
        Path
445
            Path to the resultant JSON file
446
        """
447
        output_dict = self._make_task_parameters_dict()
2✔
448
        if destination_folder:
2✔
UNCOV
449
            json_file = destination_folder.joinpath('_iblrig_taskSettings.raw.json')
×
450
        else:
451
            json_file = self.paths['SETTINGS_FILE_PATH']
2✔
452
        json_file.parent.mkdir(parents=True, exist_ok=True)
2✔
453
        with open(json_file, 'w') as outfile:
2✔
454
            json.dump(output_dict, outfile, indent=4, sort_keys=True, default=str)  # converts datetime objects to string
2✔
455
        return json_file  # PosixPath
2✔
456

457
    @final
2✔
458
    def save_trial_data_to_json(self, bpod_data: dict):
2✔
459
        """Validate and save trial data.
460

461
        This method retrieve's the current trial's data from the trial_table and validates it using a Pydantic model
462
        (self.TrialDataDefinition). In merges in the trial's bpod_data dict and appends everything to the session's
463
        JSON data file.
464

465
        Parameters
466
        ----------
467
        bpod_data : dict
468
            Trial data returned from pybpod.
469
        """
470
        # get trial's data as a dict
471
        trial_data = self.trials_table.iloc[self.trial_num].to_dict()
2✔
472

473
        # warn about entries not covered by pydantic model
474
        if trial_data.get('trial_num', 1) == 0:
2✔
475
            for key in set(trial_data.keys()) - set(self.TrialDataModel.model_fields) - {'index'}:
2✔
476
                log.warning(
2✔
477
                    f'Key "{key}" in trial_data is missing from TrialDataModel - '
478
                    f'its value ({trial_data[key]}) will not be validated.'
479
                )
480

481
        # validate by passing through pydantic model
482
        trial_data = self.TrialDataModel.model_validate(trial_data).model_dump()
2✔
483

484
        # add bpod_data as 'behavior_data'
485
        trial_data['behavior_data'] = bpod_data
2✔
486

487
        # write json data to file
488
        with open(self.paths['DATA_FILE_PATH'], 'a') as fp:
2✔
489
            fp.write(json.dumps(trial_data) + '\n')
2✔
490

491
    @property
2✔
492
    def one(self):
2✔
493
        """ONE getter."""
494
        if self._one is None:
2✔
495
            if self.iblrig_settings['ALYX_URL'] is None:
2✔
496
                return
2✔
UNCOV
497
            info_str = (
×
498
                f"alyx client with user name {self.iblrig_settings['ALYX_USER']} "
499
                + f"and url: {self.iblrig_settings['ALYX_URL']}"
500
            )
501
            try:
×
502
                self._one = ONE(
×
503
                    base_url=str(self.iblrig_settings['ALYX_URL']),
504
                    username=self.iblrig_settings['ALYX_USER'],
505
                    mode='remote',
506
                    cache_rest=None,
507
                )
UNCOV
508
                log.info('instantiated ' + info_str)
×
UNCOV
509
            except Exception:
×
UNCOV
510
                log.error(traceback.format_exc())
×
UNCOV
511
                log.error('could not connect to ' + info_str)
×
512
        return self._one
2✔
513

514
    def register_to_alyx(self):
2✔
515
        """
516
        Registers the session to Alyx.
517

518
        This registers the session using the IBLRegistrationClient class.  This uses the settings
519
        file(s) and experiment description file to extract the session data.  This may be called
520
        any number of times and if the session record already exists in Alyx it will be updated.
521
        If session registration fails, it will be done before extraction in the ibllib pipeline.
522

523
        Note that currently the subject weight is registered once and only once.  The recorded
524
        weight of the first protocol run is used.
525

526
        Water administrations are added separately by this method: it is expected that
527
        `register_session` is first called with no recorded total water. This method will then add
528
        a water administration each time it is called, and should therefore be called only once
529
        after each protocol is run. If water administration registration fails for all protocols,
530
        this will be done before extraction in the ibllib pipline, however, if a water
531
        administration is successfully registered for one protocol and subsequent ones fail to
532
        register, these will not be added before extraction in ibllib and therefore must be
533
        manually added to Alyx.
534

535
        Returns
536
        -------
537
        dict
538
            The registered session record.
539

540
        See Also
541
        --------
542
        :external+iblenv:meth:`ibllib.oneibl.registration.IBLRegistrationClient.register_session` - The registration method.
543
        """
544
        if self.session_info['SUBJECT_NAME'] in ('iblrig_test_subject', 'test', 'test_subject'):
2✔
545
            log.warning('Not registering test subject to Alyx')
2✔
546
            return
2✔
547
        if not self.one or self.one.offline:
2✔
548
            return
2✔
549
        try:
2✔
550
            client = IBLRegistrationClient(self.one)
2✔
551
            ses, _ = client.register_session(self.paths.SESSION_FOLDER, register_reward=False)
2✔
552
        except Exception:
2✔
553
            log.error(traceback.format_exc())
2✔
554
            log.error('Could not register session to Alyx')
2✔
555
            return
2✔
556
        # add the water administration if there was water administered
557
        try:
2✔
558
            if self.session_info['TOTAL_WATER_DELIVERED']:
2✔
559
                wa = client.register_water_administration(
2✔
560
                    self.session_info.SUBJECT_NAME,
561
                    self.session_info['TOTAL_WATER_DELIVERED'] / 1000,
562
                    session=ses['url'][-36:],
563
                    water_type=self.task_params.get('REWARD_TYPE', None),
564
                )
565
                log.info(f"Water administered registered in Alyx database: {ses['subject']}, " f"{wa['water_administered']}mL")
2✔
566
        except Exception:
2✔
567
            log.error(traceback.format_exc())
2✔
568
            log.error('Could not register water administration to Alyx')
2✔
569
            return
2✔
570
        return ses
2✔
571

572
    def _execute_mixins_shared_function(self, pattern):
2✔
573
        """
574
        Loop over all methods of the class that start with pattern and execute them.
575

576
        Parameters
577
        ----------
578
        pattern : str
579
            'init_mixin', 'start_mixin', 'stop_mixin', or 'cleanup_mixin'
580
        """
581
        method_names = [method for method in dir(self) if method.startswith(pattern)]
2✔
582
        methods = [getattr(self, method) for method in method_names if inspect.ismethod(getattr(self, method))]
2✔
583
        for meth in methods:
2✔
584
            meth()
2✔
585

586
    @property
2✔
587
    def time_elapsed(self):
2✔
588
        return datetime.datetime.now() - self.init_datetime
2✔
589

590
    def mock(self):
2✔
591
        self.is_mock = True
2✔
592

593
    def create_session(self):
2✔
594
        """
595
        Create the session path and save json parameters in the task collection folder.
596

597
        This will also create the protocol folder.
598
        """
599
        self.paths['TASK_PARAMETERS_FILE'] = self.save_task_parameters_to_json_file()
2✔
600
        # enable file logging
601
        logfile = self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_ibl_log.info-acquisition.log')
2✔
602
        self._setup_loggers(level=self._logger.level, file=logfile)
2✔
603
        # copy the acquisition stub to the remote session folder
604
        sc = BehaviorCopier(self.paths.SESSION_FOLDER, remote_subjects_folder=self.paths['REMOTE_SUBJECT_FOLDER'])
2✔
605
        sc.initialize_experiment(self.experiment_description, overwrite=False)
2✔
606
        self.register_to_alyx()
2✔
607

608
    def run(self):
2✔
609
        """
610
        Common pre-run instructions for all tasks.
611

612
        Defines sigint handler for a graceful exit.
613
        """
614
        # here we make sure we connect to the hardware before writing the session to disk
615
        # this prevents from incrementing endlessly the session number if the hardware fails to connect
616
        self.start_hardware()
2✔
617
        self.create_session()
2✔
618
        # When not running the first chained protocol, we can skip the weighing dialog
619
        first_protocol = int(self.paths.SESSION_RAW_DATA_FOLDER.name.split('_')[-1]) == 0
2✔
620
        if self.session_info.SUBJECT_WEIGHT is None and self.interactive and first_protocol:
2✔
621
            self.session_info.SUBJECT_WEIGHT = graph.numinput(
2✔
622
                'Subject weighing (gr)', f'{self.session_info.SUBJECT_NAME} weight (gr):', nullable=False
623
            )
624

625
        def sigint_handler(*args, **kwargs):
2✔
626
            # create a signal handler for a graceful exit: create a stop flag in the session folder
UNCOV
627
            self.paths.SESSION_FOLDER.joinpath('.stop').touch()
×
UNCOV
628
            log.critical('SIGINT signal detected, will exit at the end of the trial')
×
629

630
        # if upon starting there is a flag just remove it, this is to prevent killing a session in the egg
631
        if self.paths.SESSION_FOLDER.joinpath('.stop').exists():
2✔
UNCOV
632
            self.paths.SESSION_FOLDER.joinpath('.stop').unlink()
×
633

634
        signal.signal(signal.SIGINT, sigint_handler)
2✔
635
        self._run()  # runs the specific task logic i.e. trial loop etc...
2✔
636
        # post task instructions
637
        log.critical('Graceful exit')
2✔
638
        log.info(f'Session {self.paths.SESSION_RAW_DATA_FOLDER}')
2✔
639
        self.session_info.SESSION_END_TIME = datetime.datetime.now().isoformat()
2✔
640
        if self.interactive and not self.wizard:
2✔
641
            self.session_info.POOP_COUNT = graph.numinput(
2✔
642
                'Poop count', f'{self.session_info.SUBJECT_NAME} droppings count:', nullable=True, askint=True
643
            )
644
        self.save_task_parameters_to_json_file()
2✔
645
        self.register_to_alyx()
2✔
646
        self._execute_mixins_shared_function('stop_mixin')
2✔
647
        self._execute_mixins_shared_function('cleanup_mixin')
2✔
648

649
    @abstractmethod
2✔
650
    def start_hardware(self):
2✔
651
        """
652
        Start the hardware.
653

654
        This method doesn't explicitly start the mixins as the order has to be defined in the child classes.
655
        This needs to be implemented in the child classes, and should start and connect to all hardware pieces.
656
        """
UNCOV
657
        ...
×
658

659
    @abstractmethod
2✔
660
    def _run(self): ...
2✔
661

662
    @staticmethod
2✔
663
    def extra_parser():
2✔
664
        """
665
        Specify extra kwargs arguments to expose to the user prior running the task.
666

667
        Make sure you instantiate the parser.
668

669
        Returns
670
        -------
671
        argparse.ArgumentParser
672
            The extra parser instance.
673
        """
674
        parser = argparse.ArgumentParser(add_help=False)
2✔
675
        return parser
2✔
676

677

678
# this class gets called to get the path constructor utility to predict the session path
679
class EmptySession(BaseSession):
2✔
680
    protocol_name = 'empty'
2✔
681

682
    def _run(self):
2✔
683
        pass
2✔
684

685
    def start_hardware(self):
2✔
686
        pass
2✔
687

688

689
class OSCClient(udp_client.SimpleUDPClient):
2✔
690
    """
691
    Handles communication to Bonsai using a UDP Client
692
    OSC channels:
693
        USED:
694
        /t  -> (int)    trial number current
695
        /p  -> (int)    position of stimulus init for current trial
696
        /h  -> (float)  phase of gabor for current trial
697
        /c  -> (float)  contrast of stimulus for current trial
698
        /f  -> (float)  frequency of gabor patch for current trial
699
        /a  -> (float)  angle of gabor patch for current trial
700
        /g  -> (float)  gain of RE to visual stim displacement
701
        /s  -> (float)  sigma of the 2D gaussian of gabor
702
        /e  -> (int)    events transitions  USED BY SOFTCODE HANDLER FUNC
703
        /r  -> (int)    whether to reverse the side contingencies (0, 1)
704
    """
705

706
    OSC_PROTOCOL = {
2✔
707
        'trial_num': dict(mess='/t', type=int),
708
        'position': dict(mess='/p', type=int),
709
        'stim_phase': dict(mess='/h', type=float),
710
        'contrast': dict(mess='/c', type=float),
711
        'stim_freq': dict(mess='/f', type=float),
712
        'stim_angle': dict(mess='/a', type=float),
713
        'stim_gain': dict(mess='/g', type=float),
714
        'stim_sigma': dict(mess='/s', type=float),
715
        # 'stim_reverse': dict(mess='/r', type=int),  # this is not handled by Bonsai
716
    }
717

718
    def __init__(self, port, ip='127.0.0.1'):
2✔
719
        super().__init__(ip, port)
2✔
720

721
    def __del__(self):
2✔
722
        self._sock.close()
2✔
723

724
    def send2bonsai(self, **kwargs):
2✔
725
        """
726
        :param see list of keys in OSC_PROTOCOL
727
        :example: client.send2bonsai(trial_num=6, sim_freq=50)
728
        :return:
729
        """
730
        for k in kwargs:
2✔
731
            if k in self.OSC_PROTOCOL:
2✔
732
                # need to convert basic numpy types to low-level python types for
733
                # punch card generation OSC module, I might as well have written C code
734
                value = kwargs[k].item() if isinstance(kwargs[k], np.generic) else kwargs[k]
2✔
735
                self.send_message(self.OSC_PROTOCOL[k]['mess'], self.OSC_PROTOCOL[k]['type'](value))
2✔
736

737
    def exit(self):
2✔
738
        self.send_message('/x', 1)
2✔
739

740

741
class BonsaiRecordingMixin(BaseSession):
2✔
742
    config: dict
2✔
743

744
    def init_mixin_bonsai_recordings(self, *args, **kwargs):
2✔
745
        self.bonsai_camera = Bunch({'udp_client': OSCClient(port=7111)})
2✔
746
        self.bonsai_microphone = Bunch({'udp_client': OSCClient(port=7112)})
2✔
747
        self.config = None  # the name of the configuration to run
2✔
748

749
    def stop_mixin_bonsai_recordings(self):
2✔
750
        log.info('Stopping Bonsai recordings')
2✔
751
        self.bonsai_camera.udp_client.exit()
2✔
752
        self.bonsai_microphone.udp_client.exit()
2✔
753

754
    def start_mixin_bonsai_microphone(self):
2✔
755
        if not self.config:
2✔
756
            # Use the first key in the device_cameras map
UNCOV
757
            self.config = next((k for k in self.hardware_settings.device_cameras), None)
×
758
        # The camera workflow on the behaviour computer already contains the microphone recording
759
        # so the device camera workflow and the microphone one are exclusive
760
        if self.config:
2✔
761
            return  # Camera workflow defined; so no need to separately start microphone.
2✔
762
        if not self.task_params.RECORD_SOUND:
×
UNCOV
763
            return  # Sound should not be recorded
×
UNCOV
764
        workflow_file = self.paths.IBLRIG_FOLDER.joinpath(*self.hardware_settings.device_microphone['BONSAI_WORKFLOW'].parts)
×
UNCOV
765
        parameters = {
×
766
            'FileNameMic': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_micData.raw.wav'),
767
            'RecordSound': self.task_params.RECORD_SOUND,
768
        }
769
        call_bonsai(workflow_file, parameters, wait=False, editor=False)
×
UNCOV
770
        log.info('Bonsai microphone recording module loaded: OK')
×
771

772
    @staticmethod
2✔
773
    def _camera_mixin_bonsai_get_workflow_file(cameras: dict | None, name: str) -> Path | None:
2✔
774
        """
775
        Returns the bonsai workflow file for the cameras from the hardware_settings.yaml file.
776

777
        Parameters
778
        ----------
779
        cameras : dict
780
            The hardware settings configuration.
781
        name : {'setup', 'recording'} str
782
            The workflow type.
783

784
        Returns
785
        -------
786
        Path
787
            The workflow path.
788
        """
789
        if cameras is None:
2✔
790
            return None
2✔
791
        return cameras['BONSAI_WORKFLOW'][name]
2✔
792

793
    def start_mixin_bonsai_cameras(self):
2✔
794
        """
795
        Prepare the cameras.
796

797
        Starts the pipeline that aligns the camera focus with the desired borders of rig features.
798
        The actual triggering of the cameras is done in the trigger_bonsai_cameras method.
799
        """
800
        if not self.config:
2✔
801
            # Use the first key in the device_cameras map
802
            try:
2✔
803
                self.config = next(k for k in self.hardware_settings.device_cameras)
2✔
UNCOV
804
            except StopIteration:
×
805
                return
×
806
        configuration = self.hardware_settings.device_cameras[self.config]
2✔
807
        if (workflow_file := self._camera_mixin_bonsai_get_workflow_file(configuration, 'setup')) is None:
2✔
UNCOV
808
            return
×
809

810
        # enable trigger of cameras (so Bonsai can disable it again ... sigh)
811
        if PYSPIN_AVAILABLE:
2✔
812
            from iblrig.video_pyspin import enable_camera_trigger
×
813

UNCOV
814
            enable_camera_trigger(True)
×
815

816
        call_bonsai(workflow_file, wait=True)  # TODO Parameterize using configuration cameras
2✔
817
        log.info('Bonsai cameras setup module loaded: OK')
2✔
818

819
    def trigger_bonsai_cameras(self):
2✔
820
        if not self.config:
2✔
821
            # Use the first key in the device_cameras map
UNCOV
822
            try:
×
UNCOV
823
                self.config = next(k for k in self.hardware_settings.device_cameras)
×
UNCOV
824
            except StopIteration:
×
UNCOV
825
                return
×
826
        configuration = self.hardware_settings.device_cameras[self.config]
2✔
827
        if set(configuration.keys()) != {'BONSAI_WORKFLOW', 'left'}:
2✔
828
            raise NotImplementedError
×
829
        workflow_file = self._camera_mixin_bonsai_get_workflow_file(configuration, 'recording')
2✔
830
        if workflow_file is None:
2✔
UNCOV
831
            return
×
832
        iblrig.path_helper.create_bonsai_layout_from_template(workflow_file)
2✔
833
        # FIXME Use parameters in configuration map
834
        parameters = {
2✔
835
            'FileNameLeft': self.paths.SESSION_FOLDER.joinpath('raw_video_data', '_iblrig_leftCamera.raw.avi'),
836
            'FileNameLeftData': self.paths.SESSION_FOLDER.joinpath('raw_video_data', '_iblrig_leftCamera.frameData.bin'),
837
            'FileNameMic': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_micData.raw.wav'),
838
            'RecordSound': self.task_params.RECORD_SOUND,
839
        }
840
        call_bonsai(workflow_file, parameters, wait=False, editor=False)
2✔
841
        log.info('Bonsai camera recording process started')
2✔
842

843

844
class BonsaiVisualStimulusMixin(BaseSession):
2✔
845
    def init_mixin_bonsai_visual_stimulus(self, *args, **kwargs):
2✔
846
        # camera 7111, microphone 7112
847
        self.bonsai_visual_udp_client = OSCClient(port=7110)
2✔
848

849
    def start_mixin_bonsai_visual_stimulus(self):
2✔
850
        self.choice_world_visual_stimulus()
2✔
851

852
    def stop_mixin_bonsai_visual_stimulus(self):
2✔
853
        log.info('Stopping Bonsai visual stimulus')
2✔
854
        self.bonsai_visual_udp_client.exit()
2✔
855

856
    def send_trial_info_to_bonsai(self):
2✔
857
        """
858
        Send the trial information to Bonsai via UDP.
859

860
        The OSC protocol is documented in iblrig.base_tasks.BonsaiVisualStimulusMixin
861
        """
862
        bonsai_dict = {
2✔
863
            k: self.trials_table[k][self.trial_num]
864
            for k in self.bonsai_visual_udp_client.OSC_PROTOCOL
865
            if k in self.trials_table.columns
866
        }
867

868
        # reverse wheel contingency: if stim_reverse is True we invert stim_gain
869
        if self.trials_table.get('stim_reverse', {}).get(self.trial_num, False):
2✔
870
            bonsai_dict['stim_gain'] = -bonsai_dict['stim_gain']
×
871

872
        self.bonsai_visual_udp_client.send2bonsai(**bonsai_dict)
2✔
873
        log.debug(bonsai_dict)
2✔
874

875
    def run_passive_visual_stim(self, map_time='00:05:00', rate=0.1, sa_time='00:05:00'):
2✔
876
        workflow_file = self.paths.VISUAL_STIM_FOLDER.joinpath('passiveChoiceWorld', 'passiveChoiceWorld_passive.bonsai')
2✔
877
        file_output_rfm = self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_RFMapStim.raw.bin')
2✔
878
        parameters = {
2✔
879
            'Stim.DisplayIndex': self.hardware_settings.device_screen['DISPLAY_IDX'],
880
            'Stim.SpontaneousActivity0.DueTime': sa_time,
881
            'Stim.ReceptiveFieldMappingStim.FileNameRFMapStim': file_output_rfm,
882
            'Stim.ReceptiveFieldMappingStim.MappingTime': map_time,
883
            'Stim.ReceptiveFieldMappingStim.Rate': rate,
884
        }
885
        map_seconds = pd.to_timedelta(map_time).seconds
2✔
886
        sa_seconds = pd.to_timedelta(sa_time).seconds
2✔
887
        log.info(f'Starting spontaneous activity ({sa_seconds} s) and RF mapping stims ({map_seconds} s)')
2✔
888
        s = call_bonsai(workflow_file, parameters, editor=False)
2✔
889
        log.info('Spontaneous activity and RF mapping stims finished')
2✔
890
        return s
2✔
891

892
    def choice_world_visual_stimulus(self):
2✔
893
        if self.task_params.VISUAL_STIMULUS is None:
2✔
UNCOV
894
            return
×
895
        workflow_file = self.paths.VISUAL_STIM_FOLDER.joinpath(self.task_params.VISUAL_STIMULUS)
2✔
896
        parameters = {
2✔
897
            'Stim.DisplayIndex': self.hardware_settings.device_screen['DISPLAY_IDX'],
898
            'Stim.FileNameStimPositionScreen': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_stimPositionScreen.raw.csv'),
899
            'Stim.FileNameSyncSquareUpdate': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_syncSquareUpdate.raw.csv'),
900
            'Stim.FileNamePositions': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_encoderPositions.raw.ssv'),
901
            'Stim.FileNameEvents': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_encoderEvents.raw.ssv'),
902
            'Stim.FileNameTrialInfo': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_encoderTrialInfo.raw.ssv'),
903
            'Stim.REPortName': self.hardware_settings.device_rotary_encoder['COM_ROTARY_ENCODER'],
904
            'Stim.sync_x': self.task_params.SYNC_SQUARE_X,
905
            'Stim.sync_y': self.task_params.SYNC_SQUARE_Y,
906
            'Stim.TranslationZ': -self.task_params.STIM_TRANSLATION_Z,  # MINUS!!
907
        }
908
        call_bonsai(workflow_file, parameters, wait=False, editor=self.task_params.BONSAI_EDITOR, bootstrap=False)
2✔
909
        log.info('Bonsai visual stimulus module loaded: OK')
2✔
910

911

912
class BpodMixin(BaseSession):
2✔
913
    def _raise_on_undefined_softcode_handler(self, byte: int):
2✔
914
        raise ValueError(f'No handler defined for softcode #{byte}')
2✔
915

916
    def softcode_dictionary(self) -> OrderedDict[int, Callable]:
2✔
917
        """
918
        Returns a softcode handler dict where each key corresponds to the softcode and each value to the
919
        function to be called.
920

921
        This needs to be wrapped this way because
922
            1) we want to be able to inherit this and dynamically add softcode to the dictionry
923
            2) we need to provide the Task object (self) at run time to have the functions with static args
924
        This is tricky as it is unclear if the task object is a copy or a reference when passed here.
925

926

927
        Returns
928
        -------
929
        OrderedDict[int, Callable]
930
            Softcode dictionary
931
        """
932
        softcode_dict = OrderedDict(
2✔
933
            {
934
                SOFTCODE.STOP_SOUND: self.sound['sd'].stop,
935
                SOFTCODE.PLAY_TONE: lambda: self.sound['sd'].play(self.sound['GO_TONE'], self.sound['samplerate']),
936
                SOFTCODE.PLAY_NOISE: lambda: self.sound['sd'].play(self.sound['WHITE_NOISE'], self.sound['samplerate']),
937
                SOFTCODE.TRIGGER_CAMERA: getattr(
938
                    self, 'trigger_bonsai_cameras', lambda: self._raise_on_undefined_softcode_handler(SOFTCODE.TRIGGER_CAMERA)
939
                ),
940
            }
941
        )
942
        return softcode_dict
2✔
943

944
    def init_mixin_bpod(self, *args, **kwargs):
2✔
945
        self.bpod = Bpod()
2✔
946

947
    def stop_mixin_bpod(self):
2✔
948
        self.bpod.close()
2✔
949

950
    def start_mixin_bpod(self):
2✔
951
        if self.hardware_settings['device_bpod']['COM_BPOD'] is None:
2✔
952
            raise ValueError(
2✔
953
                'The value for device_bpod:COM_BPOD in '
954
                'settings/hardware_settings.yaml is null. Please '
955
                'provide a valid port name.'
956
            )
UNCOV
957
        disabled_ports = [x - 1 for x in self.hardware_settings['device_bpod']['DISABLE_BEHAVIOR_INPUT_PORTS']]
×
UNCOV
958
        self.bpod = Bpod(self.hardware_settings['device_bpod']['COM_BPOD'], disable_behavior_ports=disabled_ports)
×
UNCOV
959
        self.bpod.define_rotary_encoder_actions()
×
UNCOV
960
        self.bpod.set_status_led(False)
×
UNCOV
961
        assert self.bpod.is_connected
×
UNCOV
962
        log.info('Bpod hardware module loaded: OK')
×
963
        # make the bpod send spacer signals to the main sync clock for protocol discovery
UNCOV
964
        self.send_spacers()
×
965

966
    def send_spacers(self):
2✔
UNCOV
967
        log.info('Starting task by sending a spacer signal on BNC1')
×
UNCOV
968
        sma = StateMachine(self.bpod)
×
UNCOV
969
        Spacer().add_spacer_states(sma, next_state='exit')
×
UNCOV
970
        self.bpod.send_state_machine(sma)
×
UNCOV
971
        self.bpod.run_state_machine(sma)  # Locks until state machine 'exit' is reached
×
UNCOV
972
        return self.bpod.session.current_trial.export()
×
973

974

975
class Frame2TTLMixin(BaseSession):
2✔
976
    """Frame 2 TTL interface for state machine."""
977

978
    def init_mixin_frame2ttl(self, *args, **kwargs):
2✔
979
        pass
2✔
980

981
    def start_mixin_frame2ttl(self):
2✔
982
        # todo assert calibration
983
        if self.hardware_settings['device_frame2ttl']['COM_F2TTL'] is None:
2✔
984
            raise ValueError(
2✔
985
                'The value for device_frame2ttl:COM_F2TTL in '
986
                'settings/hardware_settings.yaml is null. Please '
987
                'provide a valid port name.'
988
            )
UNCOV
989
        Frame2TTL(
×
990
            port=self.hardware_settings['device_frame2ttl']['COM_F2TTL'],
991
            threshold_dark=self.hardware_settings['device_frame2ttl']['F2TTL_DARK_THRESH'],
992
            threshold_light=self.hardware_settings['device_frame2ttl']['F2TTL_LIGHT_THRESH'],
993
        ).close()
UNCOV
994
        log.info('Frame2TTL: Thresholds set.')
×
995

996

997
class RotaryEncoderMixin(BaseSession, HasBpod):
2✔
998
    """Rotary encoder interface for state machine."""
999

1000
    device_rotary_encoder: RotaryEncoderModule
2✔
1001

1002
    @property
2✔
1003
    def stimulus_gain(self) -> float:
2✔
1004
        return self.task_params.STIM_GAIN
2✔
1005

1006
    def init_mixin_rotary_encoder(self):
2✔
1007
        thresholds_deg = self.task_params.STIM_POSITIONS + self.task_params.QUIESCENCE_THRESHOLDS
2✔
1008
        self.device_rotary_encoder = RotaryEncoderModule(
2✔
1009
            self.hardware_settings.device_rotary_encoder, thresholds_deg, self.stimulus_gain
1010
        )
1011

1012
    def start_mixin_rotary_encoder(self):
2✔
1013
        self.device_rotary_encoder.gain = self.stimulus_gain
2✔
1014
        self.device_rotary_encoder.open()
2✔
NEW
1015
        self.device_rotary_encoder.write_parameters()
×
NEW
1016
        self.device_rotary_encoder.close()
×
NEW
1017
        log.info('Rotary Encoder Module loaded: OK')
×
1018

1019

1020
class ValveMixin(BaseSession, HasBpod):
2✔
1021
    def init_mixin_valve(self: object):
2✔
1022
        self.valve = Valve(self.hardware_settings.device_valve)
2✔
1023

1024
    def start_mixin_valve(self):
2✔
1025
        # assert that valve has been calibrated
UNCOV
1026
        assert self.valve.is_calibrated, """VALVE IS NOT CALIBRATED - PLEASE CALIBRATE THE VALVE"""
×
1027

1028
        # regardless of the calibration method, the reward valve time has to be lower than 1 second
UNCOV
1029
        assert self.compute_reward_time(amount_ul=1.5) < 1, """VALVE IS NOT PROPERLY CALIBRATED - PLEASE RECALIBRATE"""
×
UNCOV
1030
        log.info('Water valve module loaded: OK')
×
1031

1032
    def compute_reward_time(self, amount_ul: float | None = None) -> float:
2✔
1033
        """
1034
        Converts the valve opening time from a given volume.
1035

1036
        Parameters
1037
        ----------
1038
        amount_ul : float, optional
1039
            The volume of liquid (μl) to be dispensed from the valve. Defaults to task_params.REWARD_AMOUNT_UL.
1040

1041
        Returns
1042
        -------
1043
        float
1044
            Valve opening time in seconds.
1045
        """
1046
        amount_ul = self.task_params.REWARD_AMOUNT_UL if amount_ul is None else amount_ul
2✔
1047
        return self.valve.values.ul2ms(amount_ul) / 1e3
2✔
1048

1049
    def valve_open(self, reward_valve_time):
2✔
1050
        """
1051
        Open the reward valve for a given amount of time and return bpod data.
1052

1053
        Parameters
1054
        ----------
1055
        reward_valve_time : float
1056
            Amount of time in seconds to open the reward valve.
1057
        """
UNCOV
1058
        sma = StateMachine(self.bpod)
×
UNCOV
1059
        sma.add_state(
×
1060
            state_name='valve_open',
1061
            state_timer=reward_valve_time,
1062
            output_actions=[('Valve1', 255), ('BNC1', 255)],  # To FPGA
1063
            state_change_conditions={'Tup': 'exit'},
1064
        )
UNCOV
1065
        self.bpod.send_state_machine(sma)
×
UNCOV
1066
        self.bpod.run_state_machine(sma)  # Locks until state machine 'exit' is reached
×
UNCOV
1067
        return self.bpod.session.current_trial.export()
×
1068

1069

1070
class SoundMixin(BaseSession, HasBpod):
2✔
1071
    """Sound interface methods for state machine."""
1072

1073
    def init_mixin_sound(self):
2✔
1074
        self.sound = Bunch({'GO_TONE': None, 'WHITE_NOISE': None})
2✔
1075
        sound_output = self.hardware_settings.device_sound['OUTPUT']
2✔
1076

1077
        # additional gain factor for bringing the different combinations of sound-cards and amps to the same output level
1078
        # TODO: this needs proper calibration and refactoring
1079
        if self.hardware_settings.device_sound.OUTPUT == 'hifi' and self.hardware_settings.device_sound.AMP_TYPE == 'AMP2X15':
2✔
UNCOV
1080
            amp_gain_factor = 0.25
×
1081
        else:
1082
            amp_gain_factor = 1.0
2✔
1083
        self.task_params.GO_TONE_AMPLITUDE *= amp_gain_factor
2✔
1084
        self.task_params.WHITE_NOISE_AMPLITUDE *= amp_gain_factor
2✔
1085

1086
        # sound device sd is actually the module soundevice imported above.
1087
        # not sure how this plays out when referenced outside of this python file
1088
        self.sound['sd'], self.sound['samplerate'], self.sound['channels'] = sound_device_factory(output=sound_output)
2✔
1089
        # Create sounds and output actions of state machine
1090
        self.sound['GO_TONE'] = iblrig.sound.make_sound(
2✔
1091
            rate=self.sound['samplerate'],
1092
            frequency=self.task_params.GO_TONE_FREQUENCY,
1093
            duration=self.task_params.GO_TONE_DURATION,
1094
            amplitude=self.task_params.GO_TONE_AMPLITUDE * amp_gain_factor,
1095
            fade=0.01,
1096
            chans=self.sound['channels'],
1097
        )
1098
        self.sound['WHITE_NOISE'] = iblrig.sound.make_sound(
2✔
1099
            rate=self.sound['samplerate'],
1100
            frequency=-1,
1101
            duration=self.task_params.WHITE_NOISE_DURATION,
1102
            amplitude=self.task_params.WHITE_NOISE_AMPLITUDE * amp_gain_factor,
1103
            fade=0.01,
1104
            chans=self.sound['channels'],
1105
        )
1106

1107
    def start_mixin_sound(self):
2✔
1108
        """
1109
        Depends on bpod mixin start for hard sound card
1110
        :return:
1111
        """
UNCOV
1112
        assert self.bpod.is_connected, 'The sound mixin depends on the bpod mixin being connected'
×
1113
        # SoundCard config params
UNCOV
1114
        match self.hardware_settings.device_sound['OUTPUT']:
×
UNCOV
1115
            case 'harp':
×
UNCOV
1116
                assert self.bpod.sound_card is not None, 'No harp sound-card connected to Bpod'
×
UNCOV
1117
                sound.configure_sound_card(
×
1118
                    sounds=[self.sound.GO_TONE, self.sound.WHITE_NOISE],
1119
                    indexes=[self.task_params.GO_TONE_IDX, self.task_params.WHITE_NOISE_IDX],
1120
                    sample_rate=self.sound['samplerate'],
1121
                )
UNCOV
1122
                self.bpod.define_harp_sounds_actions(
×
1123
                    module=self.bpod.sound_card,
1124
                    go_tone_index=self.task_params.GO_TONE_IDX,
1125
                    noise_index=self.task_params.WHITE_NOISE_IDX,
1126
                )
UNCOV
1127
            case 'hifi':
×
UNCOV
1128
                module = self.bpod.get_module('^HiFi')
×
UNCOV
1129
                assert module is not None, 'No HiFi module connected to Bpod'
×
UNCOV
1130
                assert self.hardware_settings.device_sound.COM_SOUND is not None
×
UNCOV
1131
                hifi = HiFi(port=self.hardware_settings.device_sound.COM_SOUND, sampling_rate_hz=self.sound['samplerate'])
×
UNCOV
1132
                hifi.load(index=self.task_params.GO_TONE_IDX, data=self.sound.GO_TONE)
×
UNCOV
1133
                hifi.load(index=self.task_params.WHITE_NOISE_IDX, data=self.sound.WHITE_NOISE)
×
UNCOV
1134
                hifi.push()
×
UNCOV
1135
                hifi.close()
×
UNCOV
1136
                self.bpod.define_harp_sounds_actions(
×
1137
                    module=module,
1138
                    go_tone_index=self.task_params.GO_TONE_IDX,
1139
                    noise_index=self.task_params.WHITE_NOISE_IDX,
1140
                )
UNCOV
1141
            case _:
×
UNCOV
1142
                self.bpod.define_xonar_sounds_actions()
×
UNCOV
1143
        log.info(f"Sound module loaded: OK: {self.hardware_settings.device_sound['OUTPUT']}")
×
1144

1145
    def sound_play_noise(self, state_timer=0.510, state_name='play_noise'):
2✔
1146
        """
1147
        Play the noise sound for the error feedback using bpod state machine.
1148
        :return: bpod current trial export
1149
        """
UNCOV
1150
        return self._sound_play(state_name=state_name, output_actions=[self.bpod.actions.play_tone], state_timer=state_timer)
×
1151

1152
    def sound_play_tone(self, state_timer=0.102, state_name='play_tone'):
2✔
1153
        """
1154
        Play the ready tone beep using bpod state machine.
1155
        :return: bpod current trial export
1156
        """
UNCOV
1157
        return self._sound_play(state_name=state_name, output_actions=[self.bpod.actions.play_tone], state_timer=state_timer)
×
1158

1159
    def _sound_play(self, state_timer=None, output_actions=None, state_name='play_sound'):
2✔
1160
        """Plays a sound using bpod state machine.
1161

1162
        The sound must be defined in the init_mixin_sound method.
1163
        """
UNCOV
1164
        assert state_timer is not None, 'state_timer must be defined'
×
UNCOV
1165
        assert output_actions is not None, 'output_actions must be defined'
×
UNCOV
1166
        sma = StateMachine(self.bpod)
×
UNCOV
1167
        sma.add_state(
×
1168
            state_name=state_name,
1169
            state_timer=state_timer,
1170
            output_actions=[self.bpod.actions.play_tone],
1171
            state_change_conditions={'BNC2Low': 'exit', 'Tup': 'exit'},
1172
        )
UNCOV
1173
        self.bpod.send_state_machine(sma)
×
UNCOV
1174
        self.bpod.run_state_machine(sma)  # Locks until state machine 'exit' is reached
×
UNCOV
1175
        return self.bpod.session.current_trial.export()
×
1176

1177

1178
class NetworkSession(BaseSession):
2✔
1179
    """A mixin for communicating to auxiliary acquisition PC over a network."""
1180

1181
    remote_rigs = None
2✔
1182
    """net.Auxiliaries: An auxiliary services object for communicating with remote devices."""
2✔
1183
    exp_ref = None
2✔
1184
    """dict: The experiment reference (i.e. subject, date, sequence) as returned by main remote service."""
2✔
1185

1186
    def __init__(self, *_, remote_rigs=None, **kwargs):
2✔
1187
        """
1188
        A mixin for communicating to auxiliary acquisition PC over a network.
1189

1190
        This should retrieve the services list, i.e. the list of available auxiliary rigs,
1191
        and determine which is the main sync. The main sync is the rig that determines the
1192
        experiment.
1193

1194
        The services list is in a yaml file somewhere, called 'remote_rigs.yaml' and should
1195
        be a map of device name to URI. These are then selectable in the GUI and the URI of
1196
        those selected are added to the experiment description.
1197

1198
        Subclasses should add their callbacks within init by calling :meth:`self.remote_rigs.services.assign_callback`.
1199

1200
        Parameters
1201
        ----------
1202
        remote_rigs : list, dict
1203
            Either a list of remote device names (in which case URI is looked up from remote devices
1204
            file), or a map of device name to URI.
1205
        kwargs
1206
            Optional args such as 'file_iblrig_settings' for defining location of remote data folder
1207
            when loading remote devices file.
1208
        """
1209
        if isinstance(remote_rigs, list):
2✔
1210
            # For now we flatten to list of remote rig names but could permit list of (name, URI) tuples
1211
            remote_rigs = list(filter(None, flatten(remote_rigs)))
2✔
1212
            all_remote_rigs = net.get_remote_devices(iblrig_settings=kwargs.get('iblrig_settings'))
2✔
1213
            if not set(remote_rigs).issubset(all_remote_rigs.keys()):
2✔
1214
                raise ValueError('Selected remote rigs not in remote rigs list')
2✔
UNCOV
1215
            remote_rigs = {k: v for k, v in all_remote_rigs.items() if k in remote_rigs}
×
1216
        # Load and connect to remote services
1217
        self.connect(remote_rigs)
2✔
1218
        self.exp_ref = {}
2✔
1219
        try:
2✔
1220
            super().__init__(**kwargs)
2✔
1221
        except Exception as ex:
2✔
1222
            self.cleanup_mixin_network()
2✔
1223
            raise ex
2✔
1224

1225
    @property
2✔
1226
    def one(self):
2✔
1227
        """Return ONE instance.
1228

1229
        Unlike super class getter, this method will always instantiate ONE, allowing subclasses to update with an Alyx
1230
        token from a remotely connected rig.  This instance is used for formatting the experiment reference string.
1231

1232
        Returns
1233
        -------
1234
        one.api.One
1235
            An instance of ONE.
1236
        """
1237
        if super().one is None:
2✔
UNCOV
1238
            self._one = OneAlyx(silent=True, mode='local')
×
1239
        return self._one
2✔
1240

1241
    def connect(self, remote_rigs):
2✔
1242
        """
1243
        Connect to remote services.
1244

1245
        Instantiates the Communicator objects that establish connections with each remote device.
1246
        This also creates the thread that uses asynchronous callbacks.
1247

1248
        Parameters
1249
        ----------
1250
        remote_rigs : dict
1251
            A map of name to URI.
1252
        """
1253
        self.remote_rigs = net.Auxiliaries(remote_rigs or {})
2✔
1254
        assert not remote_rigs or self.remote_rigs.is_connected
2✔
1255
        # Handle termination event by graciously completing thread
1256
        signal.signal(signal.SIGTERM, lambda sig, frame: self.cleanup_mixin_network())
2✔
1257

1258
    def _init_paths(self, append: bool = False):
2✔
1259
        """
1260
        Determine session paths.
1261

1262
        Unlike :meth:`BaseSession._init_paths`, this method determines the session number from the remote main sync if
1263
        connected.
1264

1265
        Parameters
1266
        ----------
1267
        append : bool
1268
            Iterate task collection within today's most recent session folder for the selected subject, instead of
1269
            iterating session number.
1270

1271
        Returns
1272
        -------
1273
        iblutil.util.Bunch
1274
            A bunch of paths.
1275
        """
1276
        if self.hardware_settings.MAIN_SYNC:
2✔
1277
            return BaseSession._init_paths(self, append)
2✔
1278
        # Check if we have rigs connected
1279
        if not self.remote_rigs.is_connected:
2✔
1280
            log.warning('No remote rigs; experiment reference may not match the main sync.')
2✔
1281
            return BaseSession._init_paths(self, append)
2✔
1282
        # Set paths in a similar way to the super class
1283
        rig_computer_paths = iblrig.path_helper.get_local_and_remote_paths(
2✔
1284
            local_path=self.iblrig_settings['iblrig_local_data_path'],
1285
            remote_path=self.iblrig_settings['iblrig_remote_data_path'],
1286
            lab=self.iblrig_settings['ALYX_LAB'],
1287
            iblrig_settings=self.iblrig_settings,
1288
        )
1289
        paths = Bunch({'IBLRIG_FOLDER': BASE_PATH})
2✔
1290
        paths.BONSAI = BONSAI_EXE
2✔
1291
        paths.VISUAL_STIM_FOLDER = paths.IBLRIG_FOLDER.joinpath('visual_stim')
2✔
1292
        paths.LOCAL_SUBJECT_FOLDER = rig_computer_paths['local_subjects_folder']
2✔
1293
        paths.REMOTE_SUBJECT_FOLDER = rig_computer_paths['remote_subjects_folder']
2✔
1294
        date_folder = paths.LOCAL_SUBJECT_FOLDER.joinpath(
2✔
1295
            self.session_info.SUBJECT_NAME, self.session_info.SESSION_START_TIME[:10]
1296
        )
1297
        assert self.exp_ref
2✔
1298
        paths.SESSION_FOLDER = date_folder / f'{self.exp_ref["sequence"]:03}'
2✔
1299
        paths.TASK_COLLECTION = iblrig.path_helper.iterate_collection(paths.SESSION_FOLDER)
2✔
1300
        if append == paths.TASK_COLLECTION.endswith('00'):
2✔
1301
            raise ValueError(
2✔
1302
                f'Append value incorrect. Either remove previous task collections from '
1303
                f'{paths.SESSION_FOLDER}, or select append in GUI (--append arg in cli)'
1304
            )
1305

1306
        paths.SESSION_RAW_DATA_FOLDER = paths.SESSION_FOLDER.joinpath(paths.TASK_COLLECTION)
2✔
1307
        paths.DATA_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskData.raw.jsonable')
2✔
1308
        paths.SETTINGS_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskSettings.raw.json')
2✔
1309
        self.session_info.SESSION_NUMBER = int(paths.SESSION_FOLDER.name)
2✔
1310
        return paths
2✔
1311

1312
    def run(self):
2✔
1313
        """Run session and report exceptions to remote services."""
1314
        self.start_mixin_network()
2✔
1315
        try:
2✔
1316
            return super().run()
2✔
1317
        except Exception as e:
2✔
1318
            # Communicate error to services
1319
            if self.remote_rigs.is_connected:
2✔
1320
                tb = e.__traceback__  # TODO maybe go one level down with tb_next?
2✔
1321
                details = {
2✔
1322
                    'error': e.__class__.__name__,  # exception name str
1323
                    'message': str(e),  # error str
1324
                    'traceback': traceback.format_exc(),  # stack str
1325
                    'file': tb.tb_frame.f_code.co_filename,  # filename str
1326
                    'line_no': (tb.tb_lineno, tb.tb_lasti),  # (int, int)
1327
                }
1328
                self.remote_rigs.push(ExpMessage.EXPINTERRUPT, details, wait=True)
2✔
1329
                self.cleanup_mixin_network()
2✔
1330
            raise e
2✔
1331

1332
    def communicate(self, message, *args, raise_on_exception=True):
2✔
1333
        """
1334
        Communicate message to remote services.
1335

1336
        This method is blocking and by default will raise if not all responses received in time.
1337

1338
        Parameters
1339
        ----------
1340
        message : iblutil.io.net.base.ExpMessage, str, int
1341
            An experiment message to send to remote services.
1342
        args
1343
            One or more optional variables to send.
1344
        raise_on_exception : bool
1345
            If true, exceptions arising from message timeouts will be re-raised in main thread and
1346
            services will be cleaned up. Only applies when wait is true.
1347

1348
        Returns
1349
        -------
1350
        Exception | dict
1351
            If raise_on_exception is False, returns an exception if failed to receive all responses
1352
            in time, otherwise a map of service name to response is returned.
1353
        """
1354
        r = self.remote_rigs.push(message, *args, wait=True)
2✔
1355
        if isinstance(r, Exception):
2✔
1356
            log.error('Error on %s network mixin: %s', message, r)
2✔
1357
            if raise_on_exception:
2✔
1358
                self.cleanup_mixin_network()
2✔
1359
                raise r
2✔
1360
        return r
2✔
1361

1362
    def get_exp_info(self):
2✔
1363
        ref = self.exp_ref or None
2✔
1364
        if isinstance(ref, dict) and self.one:
2✔
UNCOV
1365
            ref = self.one.dict2ref(ref)
×
1366
        is_main_sync = self.hardware_settings.get('MAIN_SYNC', False)
2✔
1367
        info = net.ExpInfo(ref, is_main_sync, self.experiment_description, master=is_main_sync)
2✔
1368
        return info.to_dict()
2✔
1369

1370
    def init_mixin_network(self):
2✔
1371
        """Initialize remote services.
1372

1373
        This method sends an EXPINFO message to all services, expecting exactly one of the responses
1374
        to contain main_sync: True, along with the experiment reference to use. It then sends an
1375
        EXPINIT message to all services.
1376
        """
1377
        if not self.remote_rigs.is_connected:
2✔
1378
            return
2✔
1379
        # Determine experiment reference from main sync
1380
        is_main_sync = self.hardware_settings.get('MAIN_SYNC', False)
2✔
1381
        if is_main_sync:
2✔
1382
            raise NotImplementedError
2✔
1383
        assert self.one
2✔
1384

1385
        expinfo = self.get_exp_info() | {'subject': self.session_info['SUBJECT_NAME']}
2✔
1386
        r = self.communicate('EXPINFO', 'CONNECTED', expinfo)
2✔
1387
        assert sum(x[-1]['main_sync'] for x in r.values()) == 1, 'one main sync expected'
2✔
1388
        main_rig_name, (status, info) = next((k, v) for k, v in r.items() if v[-1]['main_sync'])
2✔
1389
        self.exp_ref = self.one.ref2dict(info['exp_ref']) if isinstance(info['exp_ref'], str) else info['exp_ref']
2✔
1390
        if self.exp_ref['subject'] != self.session_info['SUBJECT_NAME']:
2✔
1391
            log.error(
2✔
1392
                'Running task for "%s" but main sync returned exp ref for "%s".',
1393
                self.session_info['SUBJECT_NAME'],
1394
                self.exp_ref['subject'],
1395
            )
1396
            raise ValueError("Subject name doesn't match remote session on " + main_rig_name)
2✔
1397
        if str(self.exp_ref['date']) != self.session_info['SESSION_START_TIME'][:10]:
2✔
1398
            raise RuntimeError(
2✔
1399
                f'Session dates do not match between this rig and {main_rig_name}. \n'
1400
                f'Running past or future sessions not currently supported. \n'
1401
                f'Please check the system date time settings on each rig.'
1402
            )
1403

1404
        # exp_ref = ConversionMixin.path2ref(self.paths['SESSION_FOLDER'], as_dict=False)
1405
        exp_ref = self.one.dict2ref(self.exp_ref)
2✔
1406
        self.communicate('EXPINIT', {'exp_ref': exp_ref})
2✔
1407

1408
    def start_mixin_network(self):
2✔
1409
        """Start remote services.
1410

1411
        This method sends an EXPSTART message to all services, along with an exp_ref string.
1412
        Responses are required but ignored.
1413
        """
1414
        if not self.remote_rigs.is_connected:
2✔
1415
            return
2✔
1416
        self.communicate('EXPSTART', self.exp_ref)
2✔
1417

1418
    def stop_mixin_network(self):
2✔
1419
        """Start remote services.
1420

1421
        This method sends an EXPEND message to all services. Responses are required but ignored.
1422
        """
1423
        if not self.remote_rigs.is_connected:
2✔
1424
            return
2✔
1425
        self.communicate('EXPEND')
2✔
1426

1427
    def cleanup_mixin_network(self):
2✔
1428
        """Clean up services."""
1429
        self.remote_rigs.close()
2✔
1430
        if self.remote_rigs.is_connected:
2✔
1431
            log.warning('Failed to properly clean up network mixin')
2✔
1432

1433

1434
class SpontaneousSession(BaseSession):
2✔
1435
    """
1436
    A Spontaneous task doesn't have trials, it just runs until the user stops it.
1437

1438
    It is used to get extraction structure for data streams
1439
    """
1440

1441
    def __init__(self, duration_secs=None, **kwargs):
2✔
1442
        super().__init__(**kwargs)
2✔
1443
        self.duration_secs = duration_secs
2✔
1444

1445
    def start_hardware(self):
2✔
1446
        pass  # no mixin here, life is but a dream
2✔
1447

1448
    def _run(self):
2✔
1449
        """Run the task with the actual state machine."""
1450
        log.info('Starting spontaneous acquisition')
2✔
1451
        while True:
2✔
1452
            time.sleep(1.5)
2✔
1453
            if self.duration_secs is not None and self.time_elapsed.seconds > self.duration_secs:
2✔
1454
                break
2✔
UNCOV
1455
            if self.paths.SESSION_FOLDER.joinpath('.stop').exists():
×
UNCOV
1456
                self.paths.SESSION_FOLDER.joinpath('.stop').unlink()
×
UNCOV
1457
                break
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc