• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 11177497220

04 Oct 2024 09:27AM UTC coverage: 48.017% (+1.2%) from 46.79%
11177497220

Pull #724

github

76a728
web-flow
Merge d585cdc2b into 646ec9386
Pull Request #724: 8.24.3

5 of 6 new or added lines in 4 files covered. (83.33%)

1030 existing lines in 22 files now uncovered.

4188 of 8722 relevant lines covered (48.02%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.29
/iblrig/base_tasks.py
1
"""
2
Commonalities for all tasks.
3

4
This module provides hardware mixins that can be used together with BaseSession to compose tasks.
5
This module tries to exclude task related logic.
6
"""
7

8
import argparse
2✔
9
import contextlib
2✔
10
import datetime
2✔
11
import importlib.metadata
2✔
12
import inspect
2✔
13
import json
2✔
14
import logging
2✔
15
import signal
2✔
16
import sys
2✔
17
import time
2✔
18
import traceback
2✔
19
from abc import ABC, abstractmethod
2✔
20
from collections import OrderedDict
2✔
21
from collections.abc import Callable
2✔
22
from pathlib import Path
2✔
23
from typing import Protocol, final
2✔
24

25
import numpy as np
2✔
26
import pandas as pd
2✔
27
import serial
2✔
28
import yaml
2✔
29
from pythonosc import udp_client
2✔
30

31
import ibllib.io.session_params as ses_params
2✔
32
import iblrig.graphic as graph
2✔
33
import iblrig.path_helper
2✔
34
import pybpodapi
2✔
35
from ibllib.oneibl.registration import IBLRegistrationClient
2✔
36
from iblrig import net, path_helper, sound
2✔
37
from iblrig.constants import BASE_PATH, BONSAI_EXE, PYSPIN_AVAILABLE
2✔
38
from iblrig.frame2ttl import Frame2TTL
2✔
39
from iblrig.hardware import SOFTCODE, Bpod, MyRotaryEncoder, sound_device_factory
2✔
40
from iblrig.hifi import HiFi
2✔
41
from iblrig.path_helper import load_pydantic_yaml
2✔
42
from iblrig.pydantic_definitions import HardwareSettings, RigSettings, TrialDataModel
2✔
43
from iblrig.tools import call_bonsai
2✔
44
from iblrig.transfer_experiments import BehaviorCopier, VideoCopier
2✔
45
from iblrig.valve import Valve
2✔
46
from iblutil.io.net.base import ExpMessage
2✔
47
from iblutil.spacer import Spacer
2✔
48
from iblutil.util import Bunch, flatten, setup_logger
2✔
49
from one.alf.io import next_num_folder
2✔
50
from one.api import ONE, OneAlyx
2✔
51
from pybpodapi.protocol import StateMachine
2✔
52

53
OSC_CLIENT_IP = '127.0.0.1'
2✔
54

55
log = logging.getLogger(__name__)
2✔
56

57

58
class HasBpod(Protocol):
2✔
59
    bpod: Bpod
2✔
60

61

62
class BaseSession(ABC):
2✔
63
    version = None
2✔
64
    """str: !!CURRENTLY UNUSED!! task version string."""
2✔
65
    # protocol_name: str | None = None
66
    """str: The name of the task protocol (NB: avoid spaces)."""
2✔
67
    base_parameters_file: Path | None = None
2✔
68
    """Path: A YAML file containing base, default task parameters."""
2✔
69
    is_mock = False
2✔
70
    """list of str: One or more ibllib.pipes.tasks.Task names for task extraction."""
2✔
71
    logger: logging.Logger = None
2✔
72
    """logging.Logger: Log instance used solely to keep track of log level passed to constructor."""
2✔
73
    experiment_description: dict = {}
2✔
74
    """dict: The experiment description."""
2✔
75
    extractor_tasks: list | None = None
2✔
76
    """list of str: An optional list of pipeline task class names to instantiate when preprocessing task data."""
2✔
77

78
    TrialDataModel: type[TrialDataModel]
2✔
79

80
    @property
2✔
81
    @abstractmethod
2✔
82
    def protocol_name(self) -> str: ...
2✔
83

84
    def __init__(
2✔
85
        self,
86
        subject=None,
87
        task_parameter_file=None,
88
        file_hardware_settings=None,
89
        hardware_settings: HardwareSettings = None,
90
        file_iblrig_settings=None,
91
        iblrig_settings: RigSettings = None,
92
        one=None,
93
        interactive=True,
94
        projects=None,
95
        procedures=None,
96
        stub=None,
97
        subject_weight_grams=None,
98
        append=False,
99
        wizard=False,
100
        log_level='INFO',
101
        **kwargs,
102
    ):
103
        """
104
        :param subject: The subject nickname. Required.
105
        :param task_parameter_file: an optional path to the task_parameters.yaml file
106
        :param file_hardware_settings: name of the hardware file in the settings folder, or full file path
107
        :param hardware_settings: an optional dictionary of hardware settings. Keys will override any keys in the file
108
        :param file_iblrig_settings: name of the iblrig file in the settings folder, or full file path
109
        :param iblrig_settings: an optional dictionary of iblrig settings. Keys will override any keys in the file
110
        :param one: an optional instance of ONE
111
        :param interactive:
112
        :param projects: An optional list of Alyx protocols.
113
        :param procedures: An optional list of Alyx procedures.
114
        :param subject_weight_grams: weight of the subject
115
        :param stub: A full path to an experiment description file containing experiment information.
116
        :param append: bool, if True, append to the latest existing session of the same subject for the same day
117
        """
118
        self.extractor_tasks = getattr(self, 'extractor_tasks', None)
2✔
119
        self._logger = None
2✔
120
        self._setup_loggers(level=log_level)
2✔
121
        if not isinstance(self, EmptySession):
2✔
122
            log.info(f'Running iblrig {iblrig.__version__}, pybpod version {pybpodapi.__version__}')
2✔
123
        log.info(f'Session call: {" ".join(sys.argv)}')
2✔
124
        self.interactive = interactive
2✔
125
        self._one = one
2✔
126
        self.init_datetime = datetime.datetime.now()
2✔
127

128
        # loads in the settings: first load the files, then update with the input argument if provided
129
        self.hardware_settings: HardwareSettings = load_pydantic_yaml(HardwareSettings, file_hardware_settings)
2✔
130
        if hardware_settings is not None:
2✔
131
            self.hardware_settings.update(hardware_settings)
2✔
132
            HardwareSettings.model_validate(self.hardware_settings)
2✔
133
        self.iblrig_settings: RigSettings = load_pydantic_yaml(RigSettings, file_iblrig_settings)
2✔
134
        if iblrig_settings is not None:
2✔
135
            self.iblrig_settings.update(iblrig_settings)
2✔
136
            RigSettings.model_validate(self.iblrig_settings)
2✔
137

138
        self.wizard = wizard
2✔
139

140
        # Load the tasks settings, from the task folder or override with the input argument
141
        self.task_params = self.read_task_parameter_files(task_parameter_file)
2✔
142

143
        self.session_info = Bunch(
2✔
144
            {
145
                'NTRIALS': 0,
146
                'NTRIALS_CORRECT': 0,
147
                'PROCEDURES': procedures,
148
                'PROJECTS': projects,
149
                'SESSION_START_TIME': self.init_datetime.isoformat(),
150
                'SESSION_END_TIME': None,
151
                'SESSION_NUMBER': 0,
152
                'SUBJECT_NAME': subject,
153
                'SUBJECT_WEIGHT': subject_weight_grams,
154
                'TOTAL_WATER_DELIVERED': 0,
155
            }
156
        )
157
        # Executes mixins init methods
158
        self._execute_mixins_shared_function('init_mixin')
2✔
159
        self.paths = self._init_paths(append=append)
2✔
160
        if not isinstance(self, EmptySession):
2✔
161
            log.info(f'Session raw data: {self.paths.SESSION_RAW_DATA_FOLDER}')
2✔
162
        # Prepare the experiment description dictionary
163
        self.experiment_description = self.make_experiment_description_dict(
2✔
164
            self.protocol_name,
165
            self.paths.get('TASK_COLLECTION'),
166
            procedures,
167
            projects,
168
            self.hardware_settings,
169
            stub,
170
            extractors=self.extractor_tasks,
171
        )
172

173
    @classmethod
2✔
174
    def get_task_file(cls) -> Path:
2✔
175
        """
176
        Get the path to the task's python file.
177

178
        Returns
179
        -------
180
        Path
181
            The path to the task file.
182
        """
183
        return Path(inspect.getfile(cls))
2✔
184

185
    @classmethod
2✔
186
    def get_task_directory(cls) -> Path:
2✔
187
        """
188
        Get the path to the task's directory.
189

190
        Returns
191
        -------
192
        Path
193
            The path to the task's directory.
194
        """
195
        return cls.get_task_file().parent
2✔
196

197
    @classmethod
2✔
198
    def read_task_parameter_files(cls, task_parameter_file: str | Path | None = None) -> Bunch:
2✔
199
        """
200
        Get the task's parameters from the various YAML files in the hierarchy.
201

202
        Parameters
203
        ----------
204
        task_parameter_file : str or Path, optional
205
            Path to override the task parameter file
206

207
        Returns
208
        -------
209
        Bunch
210
            Task parameters
211
        """
212
        # Load the tasks settings, from the task folder or override with the input argument
213
        base_parameters_files = [task_parameter_file or cls.get_task_directory().joinpath('task_parameters.yaml')]
2✔
214

215
        # loop through the task hierarchy to gather parameter files
216
        for c in cls.__mro__:
2✔
217
            base_file = getattr(c, 'base_parameters_file', None)
2✔
218
            if base_file is not None:
2✔
219
                base_parameters_files.append(base_file)
2✔
220

221
        # remove list duplicates while preserving order, we want the highest order first
222
        base_parameters_files = list(reversed(list(dict.fromkeys(base_parameters_files))))
2✔
223

224
        # loop through files and update the dictionary, the latest files in the hierarchy have precedence
225
        task_params = dict()
2✔
226
        for param_file in base_parameters_files:
2✔
227
            if Path(param_file).exists():
2✔
228
                with open(param_file) as fp:
2✔
229
                    params = yaml.safe_load(fp)
2✔
230
                if params is not None:
2✔
231
                    task_params.update(params)
2✔
232

233
        # at last sort the dictionary so itÅ› easier for a human to navigate the many keys, return as a Bunch
234
        return Bunch(sorted(task_params.items()))
2✔
235

236
    def _init_paths(self, append: bool = False) -> Bunch:
2✔
237
        r"""
238
        Initialize session paths.
239

240
        Parameters
241
        ----------
242
        append : bool
243
            Iterate task collection within today's most recent session folder for the selected subject, instead of
244
            iterating session number.
245

246
        Returns
247
        -------
248
        Bunch
249
            Bunch with keys:
250

251
            *   BONSAI: full path to the bonsai executable
252
                `C:\iblrigv8\Bonsai\Bonsai.exe`
253
            *   VISUAL_STIM_FOLDER: full path to the visual stimulus folder
254
                `C:\iblrigv8\visual_stim`
255
            *   LOCAL_SUBJECT_FOLDER: full path to the local subject folder
256
                `C:\iblrigv8_data\mainenlab\Subjects`
257
            *   REMOTE_SUBJECT_FOLDER: full path to the remote subject folder
258
                `Y:\Subjects`
259
            *   SESSION_FOLDER: full path to the current session:
260
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001`
261
            *   TASK_COLLECTION: folder name of the current task
262
                `raw_task_data_00`
263
            *   SESSION_RAW_DATA_FOLDER: concatenation of the session folder and the task collection.
264
                This is where the task data gets written
265
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001\raw_task_data_00`
266
            *   DATA_FILE_PATH: contains the bpod trials
267
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001\raw_task_data_00\_iblrig_taskData.raw.jsonable`
268
            *   SETTINGS_FILE_PATH: contains the task settings
269
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001\raw_task_data_00\_iblrig_taskSettings.raw.json`
270
        """
271
        rig_computer_paths = path_helper.get_local_and_remote_paths(
2✔
272
            local_path=self.iblrig_settings.iblrig_local_data_path,
273
            remote_path=self.iblrig_settings.iblrig_remote_data_path,
274
            lab=self.iblrig_settings.ALYX_LAB,
275
            iblrig_settings=self.iblrig_settings,
276
        )
277
        paths = Bunch({'IBLRIG_FOLDER': BASE_PATH})
2✔
278
        paths.BONSAI = BONSAI_EXE
2✔
279
        paths.VISUAL_STIM_FOLDER = BASE_PATH.joinpath('visual_stim')
2✔
280
        paths.LOCAL_SUBJECT_FOLDER = rig_computer_paths['local_subjects_folder']
2✔
281
        paths.REMOTE_SUBJECT_FOLDER = rig_computer_paths['remote_subjects_folder']
2✔
282
        # initialize the session path
283
        date_folder = paths.LOCAL_SUBJECT_FOLDER.joinpath(
2✔
284
            self.session_info.SUBJECT_NAME, self.session_info.SESSION_START_TIME[:10]
285
        )
286
        if append:
2✔
287
            # this is the case where we append a new protocol to an existing session
288
            todays_sessions = sorted(filter(Path.is_dir, date_folder.glob('*')), reverse=True)
2✔
289
            assert len(todays_sessions) > 0, f'Trying to chain a protocol, but no session folder found in {date_folder}'
2✔
290
            paths.SESSION_FOLDER = todays_sessions[0]
2✔
291
            paths.TASK_COLLECTION = iblrig.path_helper.iterate_collection(paths.SESSION_FOLDER)
2✔
292
            if self.hardware_settings.get('MAIN_SYNC', False) and not paths.TASK_COLLECTION.endswith('00'):
2✔
UNCOV
293
                """
×
294
                Chained protocols make little sense when Bpod is the main sync as there is no
295
                continuous acquisition between protocols.  Only one sync collection can be defined in
296
                the experiment description file.
297
                If you are running experiments with an ephys rig (nidq) or an external daq, you should
298
                correct the MAIN_SYNC parameter in the hardware settings file in ./settings/hardware_settings.yaml
299
                """
UNCOV
300
                raise RuntimeError('Chained protocols not supported for bpod-only sessions')
×
301
        else:
302
            # in this case the session path is created from scratch
303
            paths.SESSION_FOLDER = date_folder / next_num_folder(date_folder)
2✔
304
            paths.TASK_COLLECTION = iblrig.path_helper.iterate_collection(paths.SESSION_FOLDER)
2✔
305

306
        self.session_info.SESSION_NUMBER = int(paths.SESSION_FOLDER.name)
2✔
307
        paths.SESSION_RAW_DATA_FOLDER = paths.SESSION_FOLDER.joinpath(paths.TASK_COLLECTION)
2✔
308
        paths.DATA_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskData.raw.jsonable')
2✔
309
        paths.SETTINGS_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskSettings.raw.json')
2✔
310
        return paths
2✔
311

312
    @property
2✔
313
    def exp_ref(self):
2✔
314
        """Construct an experiment reference string from the session info attribute."""
315
        subject, date, number = (self.session_info[k] for k in ('SUBJECT_NAME', 'SESSION_START_TIME', 'SESSION_NUMBER'))
2✔
316
        if not all([subject, date, number]):
2✔
317
            return None
2✔
318
        return self.one.dict2ref(dict(subject=subject, date=date[:10], sequence=str(number)))
2✔
319

320
    def _setup_loggers(self, level='INFO', level_bpod='WARNING', file=None):
2✔
321
        self._logger = setup_logger('iblrig', level=level, file=file)  # logger attr used by create_session to determine log level
2✔
322
        setup_logger('pybpodapi', level=level_bpod, file=file)
2✔
323

324
    @staticmethod
2✔
325
    def _remove_file_loggers():
2✔
326
        for logger_name in ['iblrig', 'pybpodapi']:
2✔
327
            logger = logging.getLogger(logger_name)
2✔
328
            file_handlers = [fh for fh in logger.handlers if isinstance(fh, logging.FileHandler)]
2✔
329
            for fh in file_handlers:
2✔
330
                fh.close()
2✔
331
                logger.removeHandler(fh)
2✔
332

333
    @staticmethod
2✔
334
    def make_experiment_description_dict(
2✔
335
        task_protocol: str,
336
        task_collection: str,
337
        procedures: list = None,
338
        projects: list = None,
339
        hardware_settings: dict | HardwareSettings = None,
340
        stub: Path = None,
341
        extractors: list = None,
342
        camera_config: str = None,
343
    ):
344
        """
345
        Construct an experiment description dictionary.
346

347
        Parameters
348
        ----------
349
        task_protocol : str
350
            The task protocol name, e.g. _ibl_trainingChoiceWorld2.0.0.
351
        task_collection : str
352
            The task collection name, e.g. raw_task_data_00.
353
        procedures : list
354
            An optional list of Alyx procedures.
355
        projects : list
356
            An optional list of Alyx protocols.
357
        hardware_settings : dict
358
            An optional dict of hardware devices, loaded from the hardware_settings.yaml file.
359
        stub : dict
360
            An optional experiment description stub to update.
361
        extractors: list
362
            An optional list of extractor names for the task.
363
        camera_config : str
364
            The camera configuration name in the hardware settings. Defaults to the first key in
365
            'device_cameras'.
366

367
        Returns
368
        -------
369
        dict
370
            The experiment description.
371
        """
372
        description = ses_params.read_params(stub) if stub else {}
2✔
373

374
        # Add hardware devices
375
        if hardware_settings is not None:
2✔
376
            if isinstance(hardware_settings, HardwareSettings):
2✔
377
                hardware_settings = hardware_settings.model_dump()
2✔
378
            devices = {}
2✔
379
            cams = hardware_settings.get('device_cameras', None)
2✔
380
            if cams:
2✔
381
                devices['cameras'] = {}
2✔
382
                camera_config = camera_config or next((k for k in cams), {})
2✔
383
                devices.update(VideoCopier.config2stub(cams[camera_config])['devices'])
2✔
384
            if hardware_settings.get('device_microphone', None):
2✔
385
                devices['microphone'] = {'microphone': {'collection': task_collection, 'sync_label': 'audio'}}
2✔
386
            ses_params.merge_params(description, {'devices': devices})
2✔
387

388
        # Add projects and procedures
389
        description['procedures'] = list(set(description.get('procedures', []) + (procedures or [])))
2✔
390
        description['projects'] = list(set(description.get('projects', []) + (projects or [])))
2✔
391
        is_main_sync = (hardware_settings or {}).get('MAIN_SYNC', False)
2✔
392
        # Add sync key if required
393
        if is_main_sync and 'sync' not in description:
2✔
394
            description['sync'] = {
2✔
395
                'bpod': {'collection': task_collection, 'acquisition_software': 'pybpod', 'extension': '.jsonable'}
396
            }
397
        # Add task
398
        task = {task_protocol: {'collection': task_collection}}
2✔
399
        if not is_main_sync:
2✔
400
            task[task_protocol]['sync_label'] = 'bpod'
2✔
401
        if extractors:
2✔
402
            assert isinstance(extractors, list), 'extractors parameter must be a list of strings'
2✔
403
            task[task_protocol].update({'extractors': extractors})
2✔
404
        if 'tasks' not in description:
2✔
405
            description['tasks'] = [task]
2✔
406
        else:
407
            description['tasks'].append(task)
2✔
408
        return description
2✔
409

410
    def _make_task_parameters_dict(self):
2✔
411
        """
412
        Create dictionary that will be saved to the settings json file for extraction.
413

414
        Returns
415
        -------
416
        dict
417
            A dictionary that will be saved to the settings json file for extraction.
418
        """
419
        output_dict = dict(self.task_params)  # Grab parameters from task_params session
2✔
420
        output_dict.update(self.hardware_settings.model_dump())  # Update dict with hardware settings from session
2✔
421
        output_dict.update(dict(self.session_info))  # Update dict with session_info (subject, procedure, projects)
2✔
422
        patch_dict = {  # Various values added to ease transition from iblrig v7 to v8, different home may be desired
2✔
423
            'IBLRIG_VERSION': iblrig.__version__,
424
            'PYBPOD_PROTOCOL': self.protocol_name,
425
            'ALYX_USER': self.iblrig_settings.ALYX_USER,
426
            'ALYX_LAB': self.iblrig_settings.ALYX_LAB,
427
        }
428
        with contextlib.suppress(importlib.metadata.PackageNotFoundError):
2✔
429
            patch_dict['PROJECT_EXTRACTION_VERSION'] = importlib.metadata.version('project_extraction')
2✔
430
        output_dict.update(patch_dict)
2✔
431
        return output_dict
2✔
432

433
    def save_task_parameters_to_json_file(self, destination_folder: Path | None = None) -> Path:
2✔
434
        """
435
        Collects the various settings and parameters of the session and outputs them to a JSON file.
436

437
        Returns
438
        -------
439
        Path
440
            Path to the resultant JSON file
441
        """
442
        output_dict = self._make_task_parameters_dict()
2✔
443
        if destination_folder:
2✔
UNCOV
444
            json_file = destination_folder.joinpath('_iblrig_taskSettings.raw.json')
×
445
        else:
446
            json_file = self.paths['SETTINGS_FILE_PATH']
2✔
447
        json_file.parent.mkdir(parents=True, exist_ok=True)
2✔
448
        with open(json_file, 'w') as outfile:
2✔
449
            json.dump(output_dict, outfile, indent=4, sort_keys=True, default=str)  # converts datetime objects to string
2✔
450
        return json_file  # PosixPath
2✔
451

452
    @final
2✔
453
    def save_trial_data_to_json(self, bpod_data: dict):
2✔
454
        """Validate and save trial data.
455

456
        This method retrieve's the current trial's data from the trial_table and validates it using a Pydantic model
457
        (self.TrialDataDefinition). In merges in the trial's bpod_data dict and appends everything to the session's
458
        JSON data file.
459

460
        Parameters
461
        ----------
462
        bpod_data : dict
463
            Trial data returned from pybpod.
464
        """
465
        # get trial's data as a dict
466
        trial_data = self.trials_table.iloc[self.trial_num].to_dict()
2✔
467

468
        # warn about entries not covered by pydantic model
469
        if trial_data.get('trial_num', 1) == 0:
2✔
470
            for key in set(trial_data.keys()) - set(self.TrialDataModel.model_fields) - {'index'}:
2✔
471
                log.warning(
2✔
472
                    f'Key "{key}" in trial_data is missing from TrialDataModel - '
473
                    f'its value ({trial_data[key]}) will not be validated.'
474
                )
475

476
        # validate by passing through pydantic model
477
        trial_data = self.TrialDataModel.model_validate(trial_data).model_dump()
2✔
478

479
        # add bpod_data as 'behavior_data'
480
        trial_data['behavior_data'] = bpod_data
2✔
481

482
        # write json data to file
483
        with open(self.paths['DATA_FILE_PATH'], 'a') as fp:
2✔
484
            fp.write(json.dumps(trial_data) + '\n')
2✔
485

486
    @property
2✔
487
    def one(self):
2✔
488
        """ONE getter."""
489
        if self._one is None:
2✔
490
            if self.iblrig_settings['ALYX_URL'] is None:
2✔
491
                return
2✔
UNCOV
492
            info_str = (
×
493
                f"alyx client with user name {self.iblrig_settings['ALYX_USER']} "
494
                + f"and url: {self.iblrig_settings['ALYX_URL']}"
495
            )
496
            try:
×
497
                self._one = ONE(
×
498
                    base_url=str(self.iblrig_settings['ALYX_URL']),
499
                    username=self.iblrig_settings['ALYX_USER'],
500
                    mode='remote',
501
                    cache_rest=None,
502
                )
UNCOV
503
                log.info('instantiated ' + info_str)
×
UNCOV
504
            except Exception:
×
UNCOV
505
                log.error(traceback.format_exc())
×
UNCOV
506
                log.error('could not connect to ' + info_str)
×
507
        return self._one
2✔
508

509
    def register_to_alyx(self):
2✔
510
        """
511
        Registers the session to Alyx.
512

513
        This registers the session using the IBLRegistrationClient class.  This uses the settings
514
        file(s) and experiment description file to extract the session data.  This may be called
515
        any number of times and if the session record already exists in Alyx it will be updated.
516
        If session registration fails, it will be done before extraction in the ibllib pipeline.
517

518
        Note that currently the subject weight is registered once and only once.  The recorded
519
        weight of the first protocol run is used.
520

521
        Water administrations are added separately by this method: it is expected that
522
        `register_session` is first called with no recorded total water. This method will then add
523
        a water administration each time it is called, and should therefore be called only once
524
        after each protocol is run. If water administration registration fails for all protocols,
525
        this will be done before extraction in the ibllib pipline, however, if a water
526
        administration is successfully registered for one protocol and subsequent ones fail to
527
        register, these will not be added before extraction in ibllib and therefore must be
528
        manually added to Alyx.
529

530
        Returns
531
        -------
532
        dict
533
            The registered session record.
534

535
        See Also
536
        --------
537
        :external+iblenv:meth:`ibllib.oneibl.registration.IBLRegistrationClient.register_session` - The registration method.
538
        """
539
        if self.session_info['SUBJECT_NAME'] in ('iblrig_test_subject', 'test', 'test_subject'):
2✔
540
            log.warning('Not registering test subject to Alyx')
2✔
541
            return
2✔
542
        if not self.one or self.one.offline:
2✔
543
            return
2✔
544
        try:
2✔
545
            client = IBLRegistrationClient(self.one)
2✔
546
            ses, _ = client.register_session(self.paths.SESSION_FOLDER, register_reward=False)
2✔
547
        except Exception:
2✔
548
            log.error(traceback.format_exc())
2✔
549
            log.error('Could not register session to Alyx')
2✔
550
            return
2✔
551
        # add the water administration if there was water administered
552
        try:
2✔
553
            if self.session_info['TOTAL_WATER_DELIVERED']:
2✔
554
                wa = client.register_water_administration(
2✔
555
                    self.session_info.SUBJECT_NAME,
556
                    self.session_info['TOTAL_WATER_DELIVERED'] / 1000,
557
                    session=ses['url'][-36:],
558
                    water_type=self.task_params.get('REWARD_TYPE', None),
559
                )
560
                log.info(f"Water administered registered in Alyx database: {ses['subject']}, " f"{wa['water_administered']}mL")
2✔
561
        except Exception:
2✔
562
            log.error(traceback.format_exc())
2✔
563
            log.error('Could not register water administration to Alyx')
2✔
564
            return
2✔
565
        return ses
2✔
566

567
    def _execute_mixins_shared_function(self, pattern):
2✔
568
        """
569
        Loop over all methods of the class that start with pattern and execute them.
570

571
        Parameters
572
        ----------
573
        pattern : str
574
            'init_mixin', 'start_mixin', 'stop_mixin', or 'cleanup_mixin'
575
        """
576
        method_names = [method for method in dir(self) if method.startswith(pattern)]
2✔
577
        methods = [getattr(self, method) for method in method_names if inspect.ismethod(getattr(self, method))]
2✔
578
        for meth in methods:
2✔
579
            meth()
2✔
580

581
    @property
2✔
582
    def time_elapsed(self):
2✔
583
        return datetime.datetime.now() - self.init_datetime
2✔
584

585
    def mock(self):
2✔
586
        self.is_mock = True
2✔
587

588
    def create_session(self):
2✔
589
        """
590
        Create the session path and save json parameters in the task collection folder.
591

592
        This will also create the protocol folder.
593
        """
594
        self.paths['TASK_PARAMETERS_FILE'] = self.save_task_parameters_to_json_file()
2✔
595
        # enable file logging
596
        logfile = self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_ibl_log.info-acquisition.log')
2✔
597
        self._setup_loggers(level=self._logger.level, file=logfile)
2✔
598
        # copy the acquisition stub to the remote session folder
599
        sc = BehaviorCopier(self.paths.SESSION_FOLDER, remote_subjects_folder=self.paths['REMOTE_SUBJECT_FOLDER'])
2✔
600
        sc.initialize_experiment(self.experiment_description, overwrite=False)
2✔
601
        self.register_to_alyx()
2✔
602

603
    def run(self):
2✔
604
        """
605
        Common pre-run instructions for all tasks.
606

607
        Defines sigint handler for a graceful exit.
608
        """
609
        # here we make sure we connect to the hardware before writing the session to disk
610
        # this prevents from incrementing endlessly the session number if the hardware fails to connect
611
        self.start_hardware()
2✔
612
        self.create_session()
2✔
613
        # When not running the first chained protocol, we can skip the weighing dialog
614
        first_protocol = int(self.paths.SESSION_RAW_DATA_FOLDER.name.split('_')[-1]) == 0
2✔
615
        if self.session_info.SUBJECT_WEIGHT is None and self.interactive and first_protocol:
2✔
616
            self.session_info.SUBJECT_WEIGHT = graph.numinput(
2✔
617
                'Subject weighing (gr)', f'{self.session_info.SUBJECT_NAME} weight (gr):', nullable=False
618
            )
619

620
        def sigint_handler(*args, **kwargs):
2✔
621
            # create a signal handler for a graceful exit: create a stop flag in the session folder
UNCOV
622
            self.paths.SESSION_FOLDER.joinpath('.stop').touch()
×
UNCOV
623
            log.critical('SIGINT signal detected, will exit at the end of the trial')
×
624

625
        # if upon starting there is a flag just remove it, this is to prevent killing a session in the egg
626
        if self.paths.SESSION_FOLDER.joinpath('.stop').exists():
2✔
UNCOV
627
            self.paths.SESSION_FOLDER.joinpath('.stop').unlink()
×
628

629
        signal.signal(signal.SIGINT, sigint_handler)
2✔
630
        self._run()  # runs the specific task logic i.e. trial loop etc...
2✔
631
        # post task instructions
632
        log.critical('Graceful exit')
2✔
633
        log.info(f'Session {self.paths.SESSION_RAW_DATA_FOLDER}')
2✔
634
        self.session_info.SESSION_END_TIME = datetime.datetime.now().isoformat()
2✔
635
        if self.interactive and not self.wizard:
2✔
636
            self.session_info.POOP_COUNT = graph.numinput(
2✔
637
                'Poop count', f'{self.session_info.SUBJECT_NAME} droppings count:', nullable=True, askint=True
638
            )
639
        self.save_task_parameters_to_json_file()
2✔
640
        self.register_to_alyx()
2✔
641
        self._execute_mixins_shared_function('stop_mixin')
2✔
642
        self._execute_mixins_shared_function('cleanup_mixin')
2✔
643

644
    @abstractmethod
2✔
645
    def start_hardware(self):
2✔
646
        """
647
        Start the hardware.
648

649
        This method doesn't explicitly start the mixins as the order has to be defined in the child classes.
650
        This needs to be implemented in the child classes, and should start and connect to all hardware pieces.
651
        """
UNCOV
652
        ...
×
653

654
    @abstractmethod
2✔
655
    def _run(self): ...
2✔
656

657
    @staticmethod
2✔
658
    def extra_parser():
2✔
659
        """
660
        Specify extra kwargs arguments to expose to the user prior running the task.
661

662
        Make sure you instantiate the parser.
663

664
        Returns
665
        -------
666
        argparse.ArgumentParser
667
            The extra parser instance.
668
        """
669
        parser = argparse.ArgumentParser(add_help=False)
2✔
670
        return parser
2✔
671

672

673
# this class gets called to get the path constructor utility to predict the session path
674
class EmptySession(BaseSession):
2✔
675
    protocol_name = 'empty'
2✔
676

677
    def _run(self):
2✔
678
        pass
2✔
679

680
    def start_hardware(self):
2✔
681
        pass
2✔
682

683

684
class OSCClient(udp_client.SimpleUDPClient):
2✔
685
    """
686
    Handles communication to Bonsai using a UDP Client
687
    OSC channels:
688
        USED:
689
        /t  -> (int)    trial number current
690
        /p  -> (int)    position of stimulus init for current trial
691
        /h  -> (float)  phase of gabor for current trial
692
        /c  -> (float)  contrast of stimulus for current trial
693
        /f  -> (float)  frequency of gabor patch for current trial
694
        /a  -> (float)  angle of gabor patch for current trial
695
        /g  -> (float)  gain of RE to visual stim displacement
696
        /s  -> (float)  sigma of the 2D gaussian of gabor
697
        /e  -> (int)    events transitions  USED BY SOFTCODE HANDLER FUNC
698
        /r  -> (int)    whether to reverse the side contingencies (0, 1)
699
    """
700

701
    OSC_PROTOCOL = {
2✔
702
        'trial_num': dict(mess='/t', type=int),
703
        'position': dict(mess='/p', type=int),
704
        'stim_phase': dict(mess='/h', type=float),
705
        'contrast': dict(mess='/c', type=float),
706
        'stim_freq': dict(mess='/f', type=float),
707
        'stim_angle': dict(mess='/a', type=float),
708
        'stim_gain': dict(mess='/g', type=float),
709
        'stim_sigma': dict(mess='/s', type=float),
710
        # 'stim_reverse': dict(mess='/r', type=int),  # this is not handled by Bonsai
711
    }
712

713
    def __init__(self, port, ip='127.0.0.1'):
2✔
714
        super().__init__(ip, port)
2✔
715

716
    def __del__(self):
2✔
717
        self._sock.close()
2✔
718

719
    def send2bonsai(self, **kwargs):
2✔
720
        """
721
        :param see list of keys in OSC_PROTOCOL
722
        :example: client.send2bonsai(trial_num=6, sim_freq=50)
723
        :return:
724
        """
725
        for k in kwargs:
2✔
726
            if k in self.OSC_PROTOCOL:
2✔
727
                # need to convert basic numpy types to low-level python types for
728
                # punch card generation OSC module, I might as well have written C code
729
                value = kwargs[k].item() if isinstance(kwargs[k], np.generic) else kwargs[k]
2✔
730
                self.send_message(self.OSC_PROTOCOL[k]['mess'], self.OSC_PROTOCOL[k]['type'](value))
2✔
731

732
    def exit(self):
2✔
733
        self.send_message('/x', 1)
2✔
734

735

736
class BonsaiRecordingMixin(BaseSession):
2✔
737
    config: dict
2✔
738

739
    def init_mixin_bonsai_recordings(self, *args, **kwargs):
2✔
740
        self.bonsai_camera = Bunch({'udp_client': OSCClient(port=7111)})
2✔
741
        self.bonsai_microphone = Bunch({'udp_client': OSCClient(port=7112)})
2✔
742
        self.config = None  # the name of the configuration to run
2✔
743

744
    def stop_mixin_bonsai_recordings(self):
2✔
745
        log.info('Stopping Bonsai recordings')
2✔
746
        self.bonsai_camera.udp_client.exit()
2✔
747
        self.bonsai_microphone.udp_client.exit()
2✔
748

749
    def start_mixin_bonsai_microphone(self):
2✔
750
        if not self.config:
2✔
751
            # Use the first key in the device_cameras map
UNCOV
752
            self.config = next((k for k in self.hardware_settings.device_cameras), None)
×
753
        # The camera workflow on the behaviour computer already contains the microphone recording
754
        # so the device camera workflow and the microphone one are exclusive
755
        if self.config:
2✔
756
            return  # Camera workflow defined; so no need to separately start microphone.
2✔
757
        if not self.task_params.RECORD_SOUND:
×
UNCOV
758
            return  # Sound should not be recorded
×
UNCOV
759
        workflow_file = self.paths.IBLRIG_FOLDER.joinpath(*self.hardware_settings.device_microphone['BONSAI_WORKFLOW'].parts)
×
UNCOV
760
        parameters = {
×
761
            'FileNameMic': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_micData.raw.wav'),
762
            'RecordSound': self.task_params.RECORD_SOUND,
763
        }
764
        call_bonsai(workflow_file, parameters, wait=False, editor=False)
×
UNCOV
765
        log.info('Bonsai microphone recording module loaded: OK')
×
766

767
    @staticmethod
2✔
768
    def _camera_mixin_bonsai_get_workflow_file(cameras: dict | None, name: str) -> Path | None:
2✔
769
        """
770
        Returns the bonsai workflow file for the cameras from the hardware_settings.yaml file.
771

772
        Parameters
773
        ----------
774
        cameras : dict
775
            The hardware settings configuration.
776
        name : {'setup', 'recording'} str
777
            The workflow type.
778

779
        Returns
780
        -------
781
        Path
782
            The workflow path.
783
        """
784
        if cameras is None:
2✔
785
            return None
2✔
786
        return cameras['BONSAI_WORKFLOW'][name]
2✔
787

788
    def start_mixin_bonsai_cameras(self):
2✔
789
        """
790
        Prepare the cameras.
791

792
        Starts the pipeline that aligns the camera focus with the desired borders of rig features.
793
        The actual triggering of the cameras is done in the trigger_bonsai_cameras method.
794
        """
795
        if not self.config:
2✔
796
            # Use the first key in the device_cameras map
797
            try:
2✔
798
                self.config = next(k for k in self.hardware_settings.device_cameras)
2✔
UNCOV
799
            except StopIteration:
×
800
                return
×
801
        configuration = self.hardware_settings.device_cameras[self.config]
2✔
802
        if (workflow_file := self._camera_mixin_bonsai_get_workflow_file(configuration, 'setup')) is None:
2✔
UNCOV
803
            return
×
804

805
        # enable trigger of cameras (so Bonsai can disable it again ... sigh)
806
        if PYSPIN_AVAILABLE:
2✔
807
            from iblrig.video_pyspin import enable_camera_trigger
×
808

UNCOV
809
            enable_camera_trigger(True)
×
810

811
        call_bonsai(workflow_file, wait=True)  # TODO Parameterize using configuration cameras
2✔
812
        log.info('Bonsai cameras setup module loaded: OK')
2✔
813

814
    def trigger_bonsai_cameras(self):
2✔
815
        if not self.config:
2✔
816
            # Use the first key in the device_cameras map
UNCOV
817
            try:
×
UNCOV
818
                self.config = next(k for k in self.hardware_settings.device_cameras)
×
UNCOV
819
            except StopIteration:
×
UNCOV
820
                return
×
821
        configuration = self.hardware_settings.device_cameras[self.config]
2✔
822
        if set(configuration.keys()) != {'BONSAI_WORKFLOW', 'left'}:
2✔
823
            raise NotImplementedError
×
824
        workflow_file = self._camera_mixin_bonsai_get_workflow_file(configuration, 'recording')
2✔
825
        if workflow_file is None:
2✔
UNCOV
826
            return
×
827
        iblrig.path_helper.create_bonsai_layout_from_template(workflow_file)
2✔
828
        # FIXME Use parameters in configuration map
829
        parameters = {
2✔
830
            'FileNameLeft': self.paths.SESSION_FOLDER.joinpath('raw_video_data', '_iblrig_leftCamera.raw.avi'),
831
            'FileNameLeftData': self.paths.SESSION_FOLDER.joinpath('raw_video_data', '_iblrig_leftCamera.frameData.bin'),
832
            'FileNameMic': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_micData.raw.wav'),
833
            'RecordSound': self.task_params.RECORD_SOUND,
834
        }
835
        call_bonsai(workflow_file, parameters, wait=False, editor=False)
2✔
836
        log.info('Bonsai camera recording process started')
2✔
837

838

839
class BonsaiVisualStimulusMixin(BaseSession):
2✔
840
    def init_mixin_bonsai_visual_stimulus(self, *args, **kwargs):
2✔
841
        # camera 7111, microphone 7112
842
        self.bonsai_visual_udp_client = OSCClient(port=7110)
2✔
843

844
    def start_mixin_bonsai_visual_stimulus(self):
2✔
845
        self.choice_world_visual_stimulus()
2✔
846

847
    def stop_mixin_bonsai_visual_stimulus(self):
2✔
848
        log.info('Stopping Bonsai visual stimulus')
2✔
849
        self.bonsai_visual_udp_client.exit()
2✔
850

851
    def send_trial_info_to_bonsai(self):
2✔
852
        """
853
        Send the trial information to Bonsai via UDP.
854

855
        The OSC protocol is documented in iblrig.base_tasks.BonsaiVisualStimulusMixin
856
        """
857
        bonsai_dict = {
2✔
858
            k: self.trials_table[k][self.trial_num]
859
            for k in self.bonsai_visual_udp_client.OSC_PROTOCOL
860
            if k in self.trials_table.columns
861
        }
862

863
        # reverse wheel contingency: if stim_reverse is True we invert stim_gain
864
        if self.trials_table.get('stim_reverse', {}).get(self.trial_num, False):
2✔
865
            bonsai_dict['stim_gain'] = -bonsai_dict['stim_gain']
×
866

867
        self.bonsai_visual_udp_client.send2bonsai(**bonsai_dict)
2✔
868
        log.debug(bonsai_dict)
2✔
869

870
    def run_passive_visual_stim(self, map_time='00:05:00', rate=0.1, sa_time='00:05:00'):
2✔
871
        workflow_file = self.paths.VISUAL_STIM_FOLDER.joinpath('passiveChoiceWorld', 'passiveChoiceWorld_passive.bonsai')
2✔
872
        file_output_rfm = self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_RFMapStim.raw.bin')
2✔
873
        parameters = {
2✔
874
            'Stim.DisplayIndex': self.hardware_settings.device_screen['DISPLAY_IDX'],
875
            'Stim.SpontaneousActivity0.DueTime': sa_time,
876
            'Stim.ReceptiveFieldMappingStim.FileNameRFMapStim': file_output_rfm,
877
            'Stim.ReceptiveFieldMappingStim.MappingTime': map_time,
878
            'Stim.ReceptiveFieldMappingStim.Rate': rate,
879
        }
880
        map_seconds = pd.to_timedelta(map_time).seconds
2✔
881
        sa_seconds = pd.to_timedelta(sa_time).seconds
2✔
882
        log.info(f'Starting spontaneous activity ({sa_seconds} s) and RF mapping stims ({map_seconds} s)')
2✔
883
        s = call_bonsai(workflow_file, parameters, editor=False)
2✔
884
        log.info('Spontaneous activity and RF mapping stims finished')
2✔
885
        return s
2✔
886

887
    def choice_world_visual_stimulus(self):
2✔
888
        if self.task_params.VISUAL_STIMULUS is None:
2✔
UNCOV
889
            return
×
890
        workflow_file = self.paths.VISUAL_STIM_FOLDER.joinpath(self.task_params.VISUAL_STIMULUS)
2✔
891
        parameters = {
2✔
892
            'Stim.DisplayIndex': self.hardware_settings.device_screen['DISPLAY_IDX'],
893
            'Stim.FileNameStimPositionScreen': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_stimPositionScreen.raw.csv'),
894
            'Stim.FileNameSyncSquareUpdate': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_syncSquareUpdate.raw.csv'),
895
            'Stim.FileNamePositions': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_encoderPositions.raw.ssv'),
896
            'Stim.FileNameEvents': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_encoderEvents.raw.ssv'),
897
            'Stim.FileNameTrialInfo': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_encoderTrialInfo.raw.ssv'),
898
            'Stim.REPortName': self.hardware_settings.device_rotary_encoder['COM_ROTARY_ENCODER'],
899
            'Stim.sync_x': self.task_params.SYNC_SQUARE_X,
900
            'Stim.sync_y': self.task_params.SYNC_SQUARE_Y,
901
            'Stim.TranslationZ': -self.task_params.STIM_TRANSLATION_Z,  # MINUS!!
902
        }
903
        call_bonsai(workflow_file, parameters, wait=False, editor=self.task_params.BONSAI_EDITOR, bootstrap=False)
2✔
904
        log.info('Bonsai visual stimulus module loaded: OK')
2✔
905

906

907
class BpodMixin(BaseSession):
2✔
908
    def _raise_on_undefined_softcode_handler(self, byte: int):
2✔
909
        raise ValueError(f'No handler defined for softcode #{byte}')
2✔
910

911
    def softcode_dictionary(self) -> OrderedDict[int, Callable]:
2✔
912
        """
913
        Returns a softcode handler dict where each key corresponds to the softcode and each value to the
914
        function to be called.
915

916
        This needs to be wrapped this way because
917
            1) we want to be able to inherit this and dynamically add softcode to the dictionry
918
            2) we need to provide the Task object (self) at run time to have the functions with static args
919
        This is tricky as it is unclear if the task object is a copy or a reference when passed here.
920

921

922
        Returns
923
        -------
924
        OrderedDict[int, Callable]
925
            Softcode dictionary
926
        """
927
        softcode_dict = OrderedDict(
2✔
928
            {
929
                SOFTCODE.STOP_SOUND: self.sound['sd'].stop,
930
                SOFTCODE.PLAY_TONE: lambda: self.sound['sd'].play(self.sound['GO_TONE'], self.sound['samplerate']),
931
                SOFTCODE.PLAY_NOISE: lambda: self.sound['sd'].play(self.sound['WHITE_NOISE'], self.sound['samplerate']),
932
                SOFTCODE.TRIGGER_CAMERA: getattr(
933
                    self, 'trigger_bonsai_cameras', lambda: self._raise_on_undefined_softcode_handler(SOFTCODE.TRIGGER_CAMERA)
934
                ),
935
            }
936
        )
937
        return softcode_dict
2✔
938

939
    def init_mixin_bpod(self, *args, **kwargs):
2✔
940
        self.bpod = Bpod()
2✔
941

942
    def stop_mixin_bpod(self):
2✔
943
        self.bpod.close()
2✔
944

945
    def start_mixin_bpod(self):
2✔
946
        if self.hardware_settings['device_bpod']['COM_BPOD'] is None:
2✔
947
            raise ValueError(
2✔
948
                'The value for device_bpod:COM_BPOD in '
949
                'settings/hardware_settings.yaml is null. Please '
950
                'provide a valid port name.'
951
            )
UNCOV
952
        disabled_ports = [x - 1 for x in self.hardware_settings['device_bpod']['DISABLE_BEHAVIOR_INPUT_PORTS']]
×
UNCOV
953
        self.bpod = Bpod(self.hardware_settings['device_bpod']['COM_BPOD'], disable_behavior_ports=disabled_ports)
×
UNCOV
954
        self.bpod.define_rotary_encoder_actions()
×
UNCOV
955
        self.bpod.set_status_led(False)
×
UNCOV
956
        assert self.bpod.is_connected
×
UNCOV
957
        log.info('Bpod hardware module loaded: OK')
×
958
        # make the bpod send spacer signals to the main sync clock for protocol discovery
UNCOV
959
        self.send_spacers()
×
960

961
    def send_spacers(self):
2✔
UNCOV
962
        log.info('Starting task by sending a spacer signal on BNC1')
×
UNCOV
963
        sma = StateMachine(self.bpod)
×
UNCOV
964
        Spacer().add_spacer_states(sma, next_state='exit')
×
UNCOV
965
        self.bpod.send_state_machine(sma)
×
UNCOV
966
        self.bpod.run_state_machine(sma)  # Locks until state machine 'exit' is reached
×
UNCOV
967
        return self.bpod.session.current_trial.export()
×
968

969

970
class Frame2TTLMixin(BaseSession):
2✔
971
    """Frame 2 TTL interface for state machine."""
972

973
    def init_mixin_frame2ttl(self, *args, **kwargs):
2✔
974
        pass
2✔
975

976
    def start_mixin_frame2ttl(self):
2✔
977
        # todo assert calibration
978
        if self.hardware_settings['device_frame2ttl']['COM_F2TTL'] is None:
2✔
979
            raise ValueError(
2✔
980
                'The value for device_frame2ttl:COM_F2TTL in '
981
                'settings/hardware_settings.yaml is null. Please '
982
                'provide a valid port name.'
983
            )
UNCOV
984
        Frame2TTL(
×
985
            port=self.hardware_settings['device_frame2ttl']['COM_F2TTL'],
986
            threshold_dark=self.hardware_settings['device_frame2ttl']['F2TTL_DARK_THRESH'],
987
            threshold_light=self.hardware_settings['device_frame2ttl']['F2TTL_LIGHT_THRESH'],
988
        ).close()
UNCOV
989
        log.info('Frame2TTL: Thresholds set.')
×
990

991

992
class RotaryEncoderMixin(BaseSession):
2✔
993
    """Rotary encoder interface for state machine."""
994

995
    def init_mixin_rotary_encoder(self, *args, **kwargs):
2✔
996
        self.device_rotary_encoder = MyRotaryEncoder(
2✔
997
            all_thresholds=self.task_params.STIM_POSITIONS + self.task_params.QUIESCENCE_THRESHOLDS,
998
            gain=self.task_params.STIM_GAIN,
999
            com=self.hardware_settings.device_rotary_encoder['COM_ROTARY_ENCODER'],
1000
            connect=False,
1001
        )
1002

1003
    def start_mixin_rotary_encoder(self):
2✔
1004
        if self.hardware_settings['device_rotary_encoder']['COM_ROTARY_ENCODER'] is None:
2✔
1005
            raise ValueError(
2✔
1006
                'The value for device_rotary_encoder:COM_ROTARY_ENCODER in '
1007
                'settings/hardware_settings.yaml is null. Please '
1008
                'provide a valid port name.'
1009
            )
UNCOV
1010
        try:
×
UNCOV
1011
            self.device_rotary_encoder.connect()
×
UNCOV
1012
        except serial.serialutil.SerialException as e:
×
UNCOV
1013
            raise serial.serialutil.SerialException(
×
1014
                f'The rotary encoder COM port {self.device_rotary_encoder.RE_PORT} is already in use. This is usually'
1015
                f' due to a Bonsai process currently running on the computer. Make sure all Bonsai windows are'
1016
                f' closed prior to running the task'
1017
            ) from e
UNCOV
1018
        except Exception as e:
×
UNCOV
1019
            raise Exception(
×
1020
                "The rotary encoder couldn't connect. If the bpod is glowing in green,"
1021
                'disconnect and reconnect bpod from the computer'
1022
            ) from e
UNCOV
1023
        log.info('Rotary encoder module loaded: OK')
×
1024

1025

1026
class ValveMixin(BaseSession, HasBpod):
2✔
1027
    def init_mixin_valve(self: object):
2✔
1028
        self.valve = Valve(self.hardware_settings.device_valve)
2✔
1029

1030
    def start_mixin_valve(self):
2✔
1031
        # assert that valve has been calibrated
UNCOV
1032
        assert self.valve.is_calibrated, """VALVE IS NOT CALIBRATED - PLEASE CALIBRATE THE VALVE"""
×
1033

1034
        # regardless of the calibration method, the reward valve time has to be lower than 1 second
UNCOV
1035
        assert self.compute_reward_time(amount_ul=1.5) < 1, """VALVE IS NOT PROPERLY CALIBRATED - PLEASE RECALIBRATE"""
×
UNCOV
1036
        log.info('Water valve module loaded: OK')
×
1037

1038
    def compute_reward_time(self, amount_ul: float | None = None) -> float:
2✔
1039
        """
1040
        Converts the valve opening time from a given volume.
1041

1042
        Parameters
1043
        ----------
1044
        amount_ul : float, optional
1045
            The volume of liquid (μl) to be dispensed from the valve. Defaults to task_params.REWARD_AMOUNT_UL.
1046

1047
        Returns
1048
        -------
1049
        float
1050
            Valve opening time in seconds.
1051
        """
1052
        amount_ul = self.task_params.REWARD_AMOUNT_UL if amount_ul is None else amount_ul
2✔
1053
        return self.valve.values.ul2ms(amount_ul) / 1e3
2✔
1054

1055
    def valve_open(self, reward_valve_time):
2✔
1056
        """
1057
        Open the reward valve for a given amount of time and return bpod data.
1058

1059
        Parameters
1060
        ----------
1061
        reward_valve_time : float
1062
            Amount of time in seconds to open the reward valve.
1063
        """
UNCOV
1064
        sma = StateMachine(self.bpod)
×
UNCOV
1065
        sma.add_state(
×
1066
            state_name='valve_open',
1067
            state_timer=reward_valve_time,
1068
            output_actions=[('Valve1', 255), ('BNC1', 255)],  # To FPGA
1069
            state_change_conditions={'Tup': 'exit'},
1070
        )
UNCOV
1071
        self.bpod.send_state_machine(sma)
×
UNCOV
1072
        self.bpod.run_state_machine(sma)  # Locks until state machine 'exit' is reached
×
UNCOV
1073
        return self.bpod.session.current_trial.export()
×
1074

1075

1076
class SoundMixin(BaseSession, HasBpod):
2✔
1077
    """Sound interface methods for state machine."""
1078

1079
    def init_mixin_sound(self):
2✔
1080
        self.sound = Bunch({'GO_TONE': None, 'WHITE_NOISE': None})
2✔
1081
        sound_output = self.hardware_settings.device_sound['OUTPUT']
2✔
1082

1083
        # additional gain factor for bringing the different combinations of sound-cards and amps to the same output level
1084
        # TODO: this needs proper calibration and refactoring
1085
        if self.hardware_settings.device_sound.OUTPUT == 'hifi' and self.hardware_settings.device_sound.AMP_TYPE == 'AMP2X15':
2✔
UNCOV
1086
            amp_gain_factor = 0.25
×
1087
        else:
1088
            amp_gain_factor = 1.0
2✔
1089
        self.task_params.GO_TONE_AMPLITUDE *= amp_gain_factor
2✔
1090
        self.task_params.WHITE_NOISE_AMPLITUDE *= amp_gain_factor
2✔
1091

1092
        # sound device sd is actually the module soundevice imported above.
1093
        # not sure how this plays out when referenced outside of this python file
1094
        self.sound['sd'], self.sound['samplerate'], self.sound['channels'] = sound_device_factory(output=sound_output)
2✔
1095
        # Create sounds and output actions of state machine
1096
        self.sound['GO_TONE'] = iblrig.sound.make_sound(
2✔
1097
            rate=self.sound['samplerate'],
1098
            frequency=self.task_params.GO_TONE_FREQUENCY,
1099
            duration=self.task_params.GO_TONE_DURATION,
1100
            amplitude=self.task_params.GO_TONE_AMPLITUDE * amp_gain_factor,
1101
            fade=0.01,
1102
            chans=self.sound['channels'],
1103
        )
1104
        self.sound['WHITE_NOISE'] = iblrig.sound.make_sound(
2✔
1105
            rate=self.sound['samplerate'],
1106
            frequency=-1,
1107
            duration=self.task_params.WHITE_NOISE_DURATION,
1108
            amplitude=self.task_params.WHITE_NOISE_AMPLITUDE * amp_gain_factor,
1109
            fade=0.01,
1110
            chans=self.sound['channels'],
1111
        )
1112

1113
    def start_mixin_sound(self):
2✔
1114
        """
1115
        Depends on bpod mixin start for hard sound card
1116
        :return:
1117
        """
UNCOV
1118
        assert self.bpod.is_connected, 'The sound mixin depends on the bpod mixin being connected'
×
1119
        # SoundCard config params
UNCOV
1120
        match self.hardware_settings.device_sound['OUTPUT']:
×
UNCOV
1121
            case 'harp':
×
UNCOV
1122
                assert self.bpod.sound_card is not None, 'No harp sound-card connected to Bpod'
×
UNCOV
1123
                sound.configure_sound_card(
×
1124
                    sounds=[self.sound.GO_TONE, self.sound.WHITE_NOISE],
1125
                    indexes=[self.task_params.GO_TONE_IDX, self.task_params.WHITE_NOISE_IDX],
1126
                    sample_rate=self.sound['samplerate'],
1127
                )
UNCOV
1128
                self.bpod.define_harp_sounds_actions(
×
1129
                    module=self.bpod.sound_card,
1130
                    go_tone_index=self.task_params.GO_TONE_IDX,
1131
                    noise_index=self.task_params.WHITE_NOISE_IDX,
1132
                )
UNCOV
1133
            case 'hifi':
×
UNCOV
1134
                module = self.bpod.get_module('^HiFi')
×
UNCOV
1135
                assert module is not None, 'No HiFi module connected to Bpod'
×
UNCOV
1136
                assert self.hardware_settings.device_sound.COM_SOUND is not None
×
UNCOV
1137
                hifi = HiFi(port=self.hardware_settings.device_sound.COM_SOUND, sampling_rate_hz=self.sound['samplerate'])
×
UNCOV
1138
                hifi.load(index=self.task_params.GO_TONE_IDX, data=self.sound.GO_TONE)
×
UNCOV
1139
                hifi.load(index=self.task_params.WHITE_NOISE_IDX, data=self.sound.WHITE_NOISE)
×
UNCOV
1140
                hifi.push()
×
UNCOV
1141
                hifi.close()
×
UNCOV
1142
                self.bpod.define_harp_sounds_actions(
×
1143
                    module=module,
1144
                    go_tone_index=self.task_params.GO_TONE_IDX,
1145
                    noise_index=self.task_params.WHITE_NOISE_IDX,
1146
                )
UNCOV
1147
            case _:
×
UNCOV
1148
                self.bpod.define_xonar_sounds_actions()
×
UNCOV
1149
        log.info(f"Sound module loaded: OK: {self.hardware_settings.device_sound['OUTPUT']}")
×
1150

1151
    def sound_play_noise(self, state_timer=0.510, state_name='play_noise'):
2✔
1152
        """
1153
        Play the noise sound for the error feedback using bpod state machine.
1154
        :return: bpod current trial export
1155
        """
UNCOV
1156
        return self._sound_play(state_name=state_name, output_actions=[self.bpod.actions.play_tone], state_timer=state_timer)
×
1157

1158
    def sound_play_tone(self, state_timer=0.102, state_name='play_tone'):
2✔
1159
        """
1160
        Play the ready tone beep using bpod state machine.
1161
        :return: bpod current trial export
1162
        """
UNCOV
1163
        return self._sound_play(state_name=state_name, output_actions=[self.bpod.actions.play_tone], state_timer=state_timer)
×
1164

1165
    def _sound_play(self, state_timer=None, output_actions=None, state_name='play_sound'):
2✔
1166
        """Plays a sound using bpod state machine.
1167

1168
        The sound must be defined in the init_mixin_sound method.
1169
        """
UNCOV
1170
        assert state_timer is not None, 'state_timer must be defined'
×
UNCOV
1171
        assert output_actions is not None, 'output_actions must be defined'
×
UNCOV
1172
        sma = StateMachine(self.bpod)
×
UNCOV
1173
        sma.add_state(
×
1174
            state_name=state_name,
1175
            state_timer=state_timer,
1176
            output_actions=[self.bpod.actions.play_tone],
1177
            state_change_conditions={'BNC2Low': 'exit', 'Tup': 'exit'},
1178
        )
UNCOV
1179
        self.bpod.send_state_machine(sma)
×
UNCOV
1180
        self.bpod.run_state_machine(sma)  # Locks until state machine 'exit' is reached
×
UNCOV
1181
        return self.bpod.session.current_trial.export()
×
1182

1183

1184
class NetworkSession(BaseSession):
2✔
1185
    """A mixin for communicating to auxiliary acquisition PC over a network."""
1186

1187
    remote_rigs = None
2✔
1188
    """net.Auxiliaries: An auxiliary services object for communicating with remote devices."""
2✔
1189
    exp_ref = None
2✔
1190
    """dict: The experiment reference (i.e. subject, date, sequence) as returned by main remote service."""
2✔
1191

1192
    def __init__(self, *_, remote_rigs=None, **kwargs):
2✔
1193
        """
1194
        A mixin for communicating to auxiliary acquisition PC over a network.
1195

1196
        This should retrieve the services list, i.e. the list of available auxiliary rigs,
1197
        and determine which is the main sync. The main sync is the rig that determines the
1198
        experiment.
1199

1200
        The services list is in a yaml file somewhere, called 'remote_rigs.yaml' and should
1201
        be a map of device name to URI. These are then selectable in the GUI and the URI of
1202
        those selected are added to the experiment description.
1203

1204
        Subclasses should add their callbacks within init by calling :meth:`self.remote_rigs.services.assign_callback`.
1205

1206
        Parameters
1207
        ----------
1208
        remote_rigs : list, dict
1209
            Either a list of remote device names (in which case URI is looked up from remote devices
1210
            file), or a map of device name to URI.
1211
        kwargs
1212
            Optional args such as 'file_iblrig_settings' for defining location of remote data folder
1213
            when loading remote devices file.
1214
        """
1215
        if isinstance(remote_rigs, list):
2✔
1216
            # For now we flatten to list of remote rig names but could permit list of (name, URI) tuples
1217
            remote_rigs = list(filter(None, flatten(remote_rigs)))
2✔
1218
            all_remote_rigs = net.get_remote_devices(iblrig_settings=kwargs.get('iblrig_settings'))
2✔
1219
            if not set(remote_rigs).issubset(all_remote_rigs.keys()):
2✔
1220
                raise ValueError('Selected remote rigs not in remote rigs list')
2✔
UNCOV
1221
            remote_rigs = {k: v for k, v in all_remote_rigs.items() if k in remote_rigs}
×
1222
        # Load and connect to remote services
1223
        self.connect(remote_rigs)
2✔
1224
        self.exp_ref = {}
2✔
1225
        try:
2✔
1226
            super().__init__(**kwargs)
2✔
1227
        except Exception as ex:
2✔
1228
            self.cleanup_mixin_network()
2✔
1229
            raise ex
2✔
1230

1231
    @property
2✔
1232
    def one(self):
2✔
1233
        """Return ONE instance.
1234

1235
        Unlike super class getter, this method will always instantiate ONE, allowing subclasses to update with an Alyx
1236
        token from a remotely connected rig.  This instance is used for formatting the experiment reference string.
1237

1238
        Returns
1239
        -------
1240
        one.api.One
1241
            An instance of ONE.
1242
        """
1243
        if super().one is None:
2✔
UNCOV
1244
            self._one = OneAlyx(silent=True, mode='local')
×
1245
        return self._one
2✔
1246

1247
    def connect(self, remote_rigs):
2✔
1248
        """
1249
        Connect to remote services.
1250

1251
        Instantiates the Communicator objects that establish connections with each remote device.
1252
        This also creates the thread that uses asynchronous callbacks.
1253

1254
        Parameters
1255
        ----------
1256
        remote_rigs : dict
1257
            A map of name to URI.
1258
        """
1259
        self.remote_rigs = net.Auxiliaries(remote_rigs or {})
2✔
1260
        assert not remote_rigs or self.remote_rigs.is_connected
2✔
1261
        # Handle termination event by graciously completing thread
1262
        signal.signal(signal.SIGTERM, lambda sig, frame: self.cleanup_mixin_network())
2✔
1263

1264
    def _init_paths(self, append: bool = False):
2✔
1265
        """
1266
        Determine session paths.
1267

1268
        Unlike :meth:`BaseSession._init_paths`, this method determines the session number from the remote main sync if
1269
        connected.
1270

1271
        Parameters
1272
        ----------
1273
        append : bool
1274
            Iterate task collection within today's most recent session folder for the selected subject, instead of
1275
            iterating session number.
1276

1277
        Returns
1278
        -------
1279
        iblutil.util.Bunch
1280
            A bunch of paths.
1281
        """
1282
        if self.hardware_settings.MAIN_SYNC:
2✔
1283
            return BaseSession._init_paths(self, append)
2✔
1284
        # Check if we have rigs connected
1285
        if not self.remote_rigs.is_connected:
2✔
1286
            log.warning('No remote rigs; experiment reference may not match the main sync.')
2✔
1287
            return BaseSession._init_paths(self, append)
2✔
1288
        # Set paths in a similar way to the super class
1289
        rig_computer_paths = iblrig.path_helper.get_local_and_remote_paths(
2✔
1290
            local_path=self.iblrig_settings['iblrig_local_data_path'],
1291
            remote_path=self.iblrig_settings['iblrig_remote_data_path'],
1292
            lab=self.iblrig_settings['ALYX_LAB'],
1293
            iblrig_settings=self.iblrig_settings,
1294
        )
1295
        paths = Bunch({'IBLRIG_FOLDER': BASE_PATH})
2✔
1296
        paths.BONSAI = BONSAI_EXE
2✔
1297
        paths.VISUAL_STIM_FOLDER = paths.IBLRIG_FOLDER.joinpath('visual_stim')
2✔
1298
        paths.LOCAL_SUBJECT_FOLDER = rig_computer_paths['local_subjects_folder']
2✔
1299
        paths.REMOTE_SUBJECT_FOLDER = rig_computer_paths['remote_subjects_folder']
2✔
1300
        date_folder = paths.LOCAL_SUBJECT_FOLDER.joinpath(
2✔
1301
            self.session_info.SUBJECT_NAME, self.session_info.SESSION_START_TIME[:10]
1302
        )
1303
        assert self.exp_ref
2✔
1304
        paths.SESSION_FOLDER = date_folder / f'{self.exp_ref["sequence"]:03}'
2✔
1305
        paths.TASK_COLLECTION = iblrig.path_helper.iterate_collection(paths.SESSION_FOLDER)
2✔
1306
        if append == paths.TASK_COLLECTION.endswith('00'):
2✔
1307
            raise ValueError(
2✔
1308
                f'Append value incorrect. Either remove previous task collections from '
1309
                f'{paths.SESSION_FOLDER}, or select append in GUI (--append arg in cli)'
1310
            )
1311

1312
        paths.SESSION_RAW_DATA_FOLDER = paths.SESSION_FOLDER.joinpath(paths.TASK_COLLECTION)
2✔
1313
        paths.DATA_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskData.raw.jsonable')
2✔
1314
        paths.SETTINGS_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskSettings.raw.json')
2✔
1315
        self.session_info.SESSION_NUMBER = int(paths.SESSION_FOLDER.name)
2✔
1316
        return paths
2✔
1317

1318
    def run(self):
2✔
1319
        """Run session and report exceptions to remote services."""
1320
        self.start_mixin_network()
2✔
1321
        try:
2✔
1322
            return super().run()
2✔
1323
        except Exception as e:
2✔
1324
            # Communicate error to services
1325
            if self.remote_rigs.is_connected:
2✔
1326
                tb = e.__traceback__  # TODO maybe go one level down with tb_next?
2✔
1327
                details = {
2✔
1328
                    'error': e.__class__.__name__,  # exception name str
1329
                    'message': str(e),  # error str
1330
                    'traceback': traceback.format_exc(),  # stack str
1331
                    'file': tb.tb_frame.f_code.co_filename,  # filename str
1332
                    'line_no': (tb.tb_lineno, tb.tb_lasti),  # (int, int)
1333
                }
1334
                self.remote_rigs.push(ExpMessage.EXPINTERRUPT, details, wait=True)
2✔
1335
                self.cleanup_mixin_network()
2✔
1336
            raise e
2✔
1337

1338
    def communicate(self, message, *args, raise_on_exception=True):
2✔
1339
        """
1340
        Communicate message to remote services.
1341

1342
        This method is blocking and by default will raise if not all responses received in time.
1343

1344
        Parameters
1345
        ----------
1346
        message : iblutil.io.net.base.ExpMessage, str, int
1347
            An experiment message to send to remote services.
1348
        args
1349
            One or more optional variables to send.
1350
        raise_on_exception : bool
1351
            If true, exceptions arising from message timeouts will be re-raised in main thread and
1352
            services will be cleaned up. Only applies when wait is true.
1353

1354
        Returns
1355
        -------
1356
        Exception | dict
1357
            If raise_on_exception is False, returns an exception if failed to receive all responses
1358
            in time, otherwise a map of service name to response is returned.
1359
        """
1360
        r = self.remote_rigs.push(message, *args, wait=True)
2✔
1361
        if isinstance(r, Exception):
2✔
1362
            log.error('Error on %s network mixin: %s', message, r)
2✔
1363
            if raise_on_exception:
2✔
1364
                self.cleanup_mixin_network()
2✔
1365
                raise r
2✔
1366
        return r
2✔
1367

1368
    def get_exp_info(self):
2✔
1369
        ref = self.exp_ref or None
2✔
1370
        if isinstance(ref, dict) and self.one:
2✔
UNCOV
1371
            ref = self.one.dict2ref(ref)
×
1372
        is_main_sync = self.hardware_settings.get('MAIN_SYNC', False)
2✔
1373
        info = net.ExpInfo(ref, is_main_sync, self.experiment_description, master=is_main_sync)
2✔
1374
        return info.to_dict()
2✔
1375

1376
    def init_mixin_network(self):
2✔
1377
        """Initialize remote services.
1378

1379
        This method sends an EXPINFO message to all services, expecting exactly one of the responses
1380
        to contain main_sync: True, along with the experiment reference to use. It then sends an
1381
        EXPINIT message to all services.
1382
        """
1383
        if not self.remote_rigs.is_connected:
2✔
1384
            return
2✔
1385
        # Determine experiment reference from main sync
1386
        is_main_sync = self.hardware_settings.get('MAIN_SYNC', False)
2✔
1387
        if is_main_sync:
2✔
1388
            raise NotImplementedError
2✔
1389
        assert self.one
2✔
1390

1391
        expinfo = self.get_exp_info() | {'subject': self.session_info['SUBJECT_NAME']}
2✔
1392
        r = self.communicate('EXPINFO', 'CONNECTED', expinfo)
2✔
1393
        assert sum(x[-1]['main_sync'] for x in r.values()) == 1, 'one main sync expected'
2✔
1394
        main_rig_name, (status, info) = next((k, v) for k, v in r.items() if v[-1]['main_sync'])
2✔
1395
        self.exp_ref = self.one.ref2dict(info['exp_ref']) if isinstance(info['exp_ref'], str) else info['exp_ref']
2✔
1396
        if self.exp_ref['subject'] != self.session_info['SUBJECT_NAME']:
2✔
1397
            log.error(
2✔
1398
                'Running task for "%s" but main sync returned exp ref for "%s".',
1399
                self.session_info['SUBJECT_NAME'],
1400
                self.exp_ref['subject'],
1401
            )
1402
            raise ValueError("Subject name doesn't match remote session on " + main_rig_name)
2✔
1403
        if str(self.exp_ref['date']) != self.session_info['SESSION_START_TIME'][:10]:
2✔
1404
            raise RuntimeError(
2✔
1405
                f'Session dates do not match between this rig and {main_rig_name}. \n'
1406
                f'Running past or future sessions not currently supported. \n'
1407
                f'Please check the system date time settings on each rig.'
1408
            )
1409

1410
        # exp_ref = ConversionMixin.path2ref(self.paths['SESSION_FOLDER'], as_dict=False)
1411
        exp_ref = self.one.dict2ref(self.exp_ref)
2✔
1412
        self.communicate('EXPINIT', {'exp_ref': exp_ref})
2✔
1413

1414
    def start_mixin_network(self):
2✔
1415
        """Start remote services.
1416

1417
        This method sends an EXPSTART message to all services, along with an exp_ref string.
1418
        Responses are required but ignored.
1419
        """
1420
        if not self.remote_rigs.is_connected:
2✔
1421
            return
2✔
1422
        self.communicate('EXPSTART', self.exp_ref)
2✔
1423

1424
    def stop_mixin_network(self):
2✔
1425
        """Start remote services.
1426

1427
        This method sends an EXPEND message to all services. Responses are required but ignored.
1428
        """
1429
        if not self.remote_rigs.is_connected:
2✔
1430
            return
2✔
1431
        self.communicate('EXPEND')
2✔
1432

1433
    def cleanup_mixin_network(self):
2✔
1434
        """Clean up services."""
1435
        self.remote_rigs.close()
2✔
1436
        if self.remote_rigs.is_connected:
2✔
1437
            log.warning('Failed to properly clean up network mixin')
2✔
1438

1439

1440
class SpontaneousSession(BaseSession):
2✔
1441
    """
1442
    A Spontaneous task doesn't have trials, it just runs until the user stops it.
1443

1444
    It is used to get extraction structure for data streams
1445
    """
1446

1447
    def __init__(self, duration_secs=None, **kwargs):
2✔
1448
        super().__init__(**kwargs)
2✔
1449
        self.duration_secs = duration_secs
2✔
1450

1451
    def start_hardware(self):
2✔
1452
        pass  # no mixin here, life is but a dream
2✔
1453

1454
    def _run(self):
2✔
1455
        """Run the task with the actual state machine."""
1456
        log.info('Starting spontaneous acquisition')
2✔
1457
        while True:
2✔
1458
            time.sleep(1.5)
2✔
1459
            if self.duration_secs is not None and self.time_elapsed.seconds > self.duration_secs:
2✔
1460
                break
2✔
UNCOV
1461
            if self.paths.SESSION_FOLDER.joinpath('.stop').exists():
×
UNCOV
1462
                self.paths.SESSION_FOLDER.joinpath('.stop').unlink()
×
UNCOV
1463
                break
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc