• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 9031936551

10 May 2024 12:05PM UTC coverage: 48.538% (+1.7%) from 46.79%
9031936551

Pull #643

github

53c3e3
web-flow
Merge 3c8214f78 into ec2d8e4fe
Pull Request #643: 8.19.0

377 of 1073 new or added lines in 38 files covered. (35.14%)

977 existing lines in 19 files now uncovered.

3253 of 6702 relevant lines covered (48.54%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.49
/iblrig/base_tasks.py
1
"""
2
This module is intended to provide commonalities for all tasks.
3

4
It provides hardware mixins that can be used together with BaseSession to compose tasks.
5
This module tries to exclude task related logic.
6
"""
7

8
import abc
2✔
9
import argparse
2✔
10
import contextlib
2✔
11
import datetime
2✔
12
import importlib.metadata
2✔
13
import inspect
2✔
14
import json
2✔
15
import logging
2✔
16
import signal
2✔
17
import time
2✔
18
import traceback
2✔
19
from abc import ABC
2✔
20
from collections import OrderedDict
2✔
21
from collections.abc import Callable
2✔
22
from pathlib import Path
2✔
23

24
import numpy as np
2✔
25
import scipy.interpolate
2✔
26
import serial
2✔
27
import yaml
2✔
28
from pythonosc import udp_client
2✔
29

30
import ibllib.io.session_params as ses_params
2✔
31
import iblrig
2✔
32
import iblrig.graphic as graph
2✔
33
import iblrig.path_helper
2✔
34
import pybpodapi
2✔
35
from ibllib.oneibl.registration import IBLRegistrationClient
2✔
36
from iblrig import sound
2✔
37
from iblrig.constants import BASE_PATH, BONSAI_EXE, PYSPIN_AVAILABLE
2✔
38
from iblrig.frame2ttl import Frame2TTL
2✔
39
from iblrig.hardware import SOFTCODE, Bpod, MyRotaryEncoder, sound_device_factory
2✔
40
from iblrig.hifi import HiFi
2✔
41
from iblrig.path_helper import load_pydantic_yaml
2✔
42
from iblrig.pydantic_definitions import HardwareSettings, RigSettings
2✔
43
from iblrig.tools import call_bonsai
2✔
44
from iblrig.transfer_experiments import BehaviorCopier, VideoCopier
2✔
45
from iblutil.spacer import Spacer
2✔
46
from iblutil.util import Bunch, setup_logger
2✔
47
from one.alf.io import next_num_folder
2✔
48
from one.api import ONE
2✔
49
from pybpodapi.protocol import StateMachine
2✔
50

51
OSC_CLIENT_IP = '127.0.0.1'
2✔
52

53
log = logging.getLogger(__name__)
2✔
54

55

56
class BaseSession(ABC):
2✔
57
    version = None
2✔
58
    """str: !!CURRENTLY UNUSED!! task version string."""
2✔
59
    protocol_name: str | None = None
2✔
60
    base_parameters_file: Path | None = None
2✔
61
    is_mock = False
2✔
62
    """list of str: One or more ibllib.pipes.tasks.Task names for task extraction."""
2✔
63
    logger: logging.Logger = None
2✔
64
    """logging.Logger: Log instance used solely to keep track of log level passed to constructor."""
2✔
65

66
    def __init__(
2✔
67
        self,
68
        subject=None,
69
        task_parameter_file=None,
70
        file_hardware_settings=None,
71
        hardware_settings: HardwareSettings = None,
72
        file_iblrig_settings=None,
73
        iblrig_settings: RigSettings = None,
74
        one=None,
75
        interactive=True,
76
        projects=None,
77
        procedures=None,
78
        stub=None,
79
        subject_weight_grams=None,
80
        append=False,
81
        wizard=False,
82
        log_level='INFO',
83
        **kwargs,
84
    ):
85
        """
86
        :param subject: The subject nickname. Required.
87
        :param task_parameter_file: an optional path to the task_parameters.yaml file
88
        :param file_hardware_settings: name of the hardware file in the settings folder, or full file path
89
        :param hardware_settings: an optional dictionary of hardware settings. Keys will override any keys in the file
90
        :param file_iblrig_settings: name of the iblrig file in the settings folder, or full file path
91
        :param iblrig_settings: an optional dictionary of iblrig settings. Keys will override any keys in the file
92
        :param one: an optional instance of ONE
93
        :param interactive:
94
        :param projects: An optional list of Alyx protocols.
95
        :param procedures: An optional list of Alyx procedures.
96
        :param subject_weight_grams: weight of the subject
97
        :param stub: A full path to an experiment description file containing experiment information.
98
        :param append: bool, if True, append to the latest existing session of the same subject for the same day
99
        """
100
        self.extractor_tasks = getattr(self, 'extractor_tasks', None)
2✔
101
        assert self.protocol_name is not None, 'Protocol name must be defined by the child class'
2✔
102
        self._logger = None
2✔
103
        self._setup_loggers(level=log_level)
2✔
104
        if not isinstance(self, EmptySession):
2✔
105
            log.info(f'Running iblrig {iblrig.__version__}, pybpod version {pybpodapi.__version__}')
2✔
106
        self.interactive = interactive
2✔
107
        self._one = one
2✔
108
        self.init_datetime = datetime.datetime.now()
2✔
109

110
        # loads in the settings: first load the files, then update with the input argument if provided
111
        self.hardware_settings: HardwareSettings = load_pydantic_yaml(HardwareSettings, file_hardware_settings)
2✔
112
        if hardware_settings is not None:
2✔
113
            self.hardware_settings.update(hardware_settings)
2✔
114
            HardwareSettings.model_validate(self.hardware_settings)
2✔
115
        self.iblrig_settings = load_pydantic_yaml(RigSettings, file_iblrig_settings)
2✔
116
        if iblrig_settings is not None:
2✔
117
            self.iblrig_settings.update(iblrig_settings)
2✔
118
            RigSettings.model_validate(self.iblrig_settings)
2✔
119

120
        self.wizard = wizard
2✔
121
        # Load the tasks settings, from the task folder or override with the input argument
122
        base_parameters_files = [
2✔
123
            task_parameter_file or Path(inspect.getfile(self.__class__)).parent.joinpath('task_parameters.yaml')
124
        ]
125
        # loop through the task hierarchy to gather parameter files
126
        for cls in self.__class__.__mro__:
2✔
127
            base_file = getattr(cls, 'base_parameters_file', None)
2✔
128
            if base_file is not None:
2✔
129
                base_parameters_files.append(base_file)
2✔
130
        # this is a trick to remove list duplicates while preserving order, we want the highest order first
131
        base_parameters_files = list(reversed(list(dict.fromkeys(base_parameters_files))))
2✔
132
        # now we loop into the files and update the dictionary, the latest files in the hierarchy have precedence
133
        self.task_params = Bunch({})
2✔
134
        for param_file in base_parameters_files:
2✔
135
            if Path(param_file).exists():
2✔
136
                with open(param_file) as fp:
2✔
137
                    params = yaml.safe_load(fp)
2✔
138
                if params is not None:
2✔
139
                    self.task_params.update(Bunch(params))
2✔
140
        # at last sort the dictionary so itÅ› easier for a human to navigate the many keys
141
        self.task_params = Bunch(dict(sorted(self.task_params.items())))
2✔
142
        self.session_info = Bunch(
2✔
143
            {
144
                'NTRIALS': 0,
145
                'NTRIALS_CORRECT': 0,
146
                'PROCEDURES': procedures,
147
                'PROJECTS': projects,
148
                'SESSION_START_TIME': self.init_datetime.isoformat(),
149
                'SESSION_END_TIME': None,
150
                'SESSION_NUMBER': 0,
151
                'SUBJECT_NAME': subject,
152
                'SUBJECT_WEIGHT': subject_weight_grams,
153
                'TOTAL_WATER_DELIVERED': 0,
154
            }
155
        )
156
        # Executes mixins init methods
157
        self._execute_mixins_shared_function('init_mixin')
2✔
158
        self.paths = self._init_paths(append=append)
2✔
159
        if not isinstance(self, EmptySession):
2✔
160
            log.info(f'Session {self.paths.SESSION_RAW_DATA_FOLDER}')
2✔
161
        # Prepare the experiment description dictionary
162
        self.experiment_description = self.make_experiment_description_dict(
2✔
163
            self.protocol_name,
164
            self.paths.TASK_COLLECTION,
165
            procedures,
166
            projects,
167
            self.hardware_settings,
168
            stub,
169
            extractors=self.extractor_tasks,
170
        )
171

172
    def _init_paths(self, append: bool = False):
2✔
173
        """
174
        :param existing_session_path: if we append a protocol to an existing session, this is the path
175
        of the session in the form of /path/to/./lab/Subjects/[subject]/[date]/[number]
176
        :return: Bunch with keys:
177
        BONSAI: full path to the bonsai executable
178
            >>> C:\iblrigv8\Bonsai\Bonsai.exe  # noqa
179
        VISUAL_STIM_FOLDER: full path to the visual stim
180
            >>> C:\iblrigv8\visual_stim  # noqa
181
        LOCAL_SUBJECT_FOLDER: full path to the local subject folder
182
            >>> C:\iblrigv8_data\mainenlab\Subjects  # noqa
183
        REMOTE_SUBJECT_FOLDER: full path to the remote subject folder
184
            >>> Y:\Subjects  # noqa
185
        SESSION_FOLDER: full path to the current session:
186
            >>> C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001  # noqa
187
        TASK_COLLECTION: folder name of the current task
188
            >>> raw_task_data_00  # noqa
189
        SESSION_RAW_DATA_FOLDER: concatenation of the session folder and the task collection. This is where
190
        the task data gets written
191
            >>> C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001\raw_task_data_00  # noqa
192
        DATA_FILE_PATH: contains the bpod trials
193
            >>> C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001\raw_task_data_00\_iblrig_taskData.raw.jsonable  # noqa
194
        SETTINGS_FILE_PATH: contains the task settings
195
            >>>C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001\raw_task_data_00\_iblrig_taskSettings.raw.json  # noqa
196
        """
197
        rig_computer_paths = iblrig.path_helper.get_local_and_remote_paths(
2✔
198
            local_path=self.iblrig_settings['iblrig_local_data_path'],
199
            remote_path=self.iblrig_settings['iblrig_remote_data_path'],
200
            lab=self.iblrig_settings['ALYX_LAB'],
201
            iblrig_settings=self.iblrig_settings,
202
        )
203
        paths = Bunch({'IBLRIG_FOLDER': BASE_PATH})
2✔
204
        paths.BONSAI = BONSAI_EXE
2✔
205
        paths.VISUAL_STIM_FOLDER = paths.IBLRIG_FOLDER.joinpath('visual_stim')
2✔
206
        paths.LOCAL_SUBJECT_FOLDER = rig_computer_paths['local_subjects_folder']
2✔
207
        paths.REMOTE_SUBJECT_FOLDER = rig_computer_paths['remote_subjects_folder']
2✔
208
        # initialize the session path
209
        date_folder = paths.LOCAL_SUBJECT_FOLDER.joinpath(
2✔
210
            self.session_info.SUBJECT_NAME, self.session_info.SESSION_START_TIME[:10]
211
        )
212
        if append:
2✔
213
            # this is the case where we append a new protocol to an existing session
214
            todays_sessions = sorted(filter(Path.is_dir, date_folder.glob('*')), reverse=True)
2✔
215
            assert len(todays_sessions) > 0, f'Trying to chain a protocol, but no session folder found in {date_folder}'
2✔
216
            paths.SESSION_FOLDER = todays_sessions[0]
2✔
217
            paths.TASK_COLLECTION = iblrig.path_helper.iterate_collection(paths.SESSION_FOLDER)
2✔
218
            if self.hardware_settings.get('MAIN_SYNC', False) and not paths.TASK_COLLECTION.endswith('00'):
2✔
219
                """
220
                Chained protocols make little sense when Bpod is the main sync as there is no
221
                continuous acquisition between protocols.  Only one sync collection can be defined in
222
                the experiment description file.
223
                If you are running experiments with an ephys rig (nidq) or an external daq, you should
224
                correct the MAIN_SYNC parameter in the hardware settings file in ./settings/hardware_settings.yaml
225
                """
UNCOV
226
                raise RuntimeError('Chained protocols not supported for bpod-only sessions')
×
227
        else:
228
            # in this case the session path is created from scratch
229
            paths.SESSION_FOLDER = date_folder / next_num_folder(date_folder)
2✔
230
            paths.TASK_COLLECTION = iblrig.path_helper.iterate_collection(paths.SESSION_FOLDER)
2✔
231

232
        self.session_info.SESSION_NUMBER = int(paths.SESSION_FOLDER.name)
2✔
233
        paths.SESSION_RAW_DATA_FOLDER = paths.SESSION_FOLDER.joinpath(paths.TASK_COLLECTION)
2✔
234
        paths.DATA_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskData.raw.jsonable')
2✔
235
        paths.SETTINGS_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskSettings.raw.json')
2✔
236
        return paths
2✔
237

238
    def _setup_loggers(self, level='INFO', level_bpod='WARNING', file=None):
2✔
239
        self._logger = setup_logger('iblrig', level=level, file=file)  # logger attr used by create_session to determine log level
2✔
240
        setup_logger('pybpodapi', level=level_bpod, file=file)
2✔
241

242
    def _remove_file_loggers(self):
2✔
243
        for logger_name in ['iblrig', 'pybpodapi']:
2✔
244
            logger = logging.getLogger(logger_name)
2✔
245
            file_handlers = [fh for fh in logger.handlers if isinstance(fh, logging.FileHandler)]
2✔
246
            for fh in file_handlers:
2✔
247
                logger.removeHandler(fh)
2✔
248

249
    @staticmethod
2✔
250
    def make_experiment_description_dict(
2✔
251
        task_protocol: str,
252
        task_collection: str,
253
        procedures: list = None,
254
        projects: list = None,
255
        hardware_settings: dict | HardwareSettings = None,
256
        stub: Path = None,
257
        extractors: list = None,
258
        camera_config: str = None,
259
    ):
260
        """
261
        Construct an experiment description dictionary.
262

263
        Parameters
264
        ----------
265
        task_protocol : str
266
            The task protocol name, e.g. _ibl_trainingChoiceWorld2.0.0.
267
        task_collection : str
268
            The task collection name, e.g. raw_task_data_00.
269
        procedures : list
270
            An optional list of Alyx procedures.
271
        projects : list
272
            An optional list of Alyx protocols.
273
        hardware_settings : dict
274
            An optional dict of hardware devices, loaded from the hardware_settings.yaml file.
275
        stub : dict
276
            An optional experiment description stub to update.
277
        extractors: list
278
            An optional list of extractor names for the task.
279
        camera_config : str
280
            The camera configuration name in the hardware settings. Defaults to the first key in
281
            'device_cameras'.
282

283
        Returns
284
        -------
285
        dict
286
            The experiment description.
287
        """
288
        description = ses_params.read_params(stub) if stub else {}
2✔
289

290
        # Add hardware devices
291
        if hardware_settings is not None:
2✔
292
            if isinstance(hardware_settings, HardwareSettings):
2✔
293
                hardware_settings = hardware_settings.model_dump()
2✔
294
            devices = {}
2✔
295
            cams = hardware_settings.get('device_cameras', None)
2✔
296
            if cams:
2✔
297
                devices['cameras'] = {}
2✔
298
                camera_config = camera_config or next((k for k in cams), {})
2✔
299
                devices.update(VideoCopier.config2stub(cams[camera_config])['devices'])
2✔
300
            if hardware_settings.get('device_microphone', None):
2✔
301
                devices['microphone'] = {'microphone': {'collection': task_collection, 'sync_label': 'audio'}}
2✔
302
            ses_params.merge_params(description, {'devices': devices})
2✔
303

304
        # Add projects and procedures
305
        description['procedures'] = list(set(description.get('procedures', []) + (procedures or [])))
2✔
306
        description['projects'] = list(set(description.get('projects', []) + (projects or [])))
2✔
307
        # Add sync key if required
308
        if (hardware_settings or {}).get('MAIN_SYNC', False) and 'sync' not in description:
2✔
309
            description['sync'] = {
2✔
310
                'bpod': {'collection': task_collection, 'acquisition_software': 'pybpod', 'extension': '.jsonable'}
311
            }
312
        # Add task
313
        task = {task_protocol: {'collection': task_collection, 'sync_label': 'bpod'}}
2✔
314
        if extractors:
2✔
315
            assert isinstance(extractors, list), 'extractors parameter must be a list of strings'
2✔
316
            task[task_protocol].update({'extractors': extractors})
2✔
317
        if 'tasks' not in description:
2✔
318
            description['tasks'] = [task]
2✔
319
        else:
320
            description['tasks'].append(task)
2✔
321
        return description
2✔
322

323
    def _make_task_parameters_dict(self):
2✔
324
        """
325
        This makes the dictionary that will be saved to the settings json file for extraction
326
        :return:
327
        """
328
        output_dict = dict(self.task_params)  # Grab parameters from task_params session
2✔
329
        output_dict.update(self.hardware_settings.model_dump())  # Update dict with hardware settings from session
2✔
330
        output_dict.update(dict(self.session_info))  # Update dict with session_info (subject, procedure, projects)
2✔
331
        patch_dict = {  # Various values added to ease transition from iblrig v7 to v8, different home may be desired
2✔
332
            'IBLRIG_VERSION': iblrig.__version__,
333
            'PYBPOD_PROTOCOL': self.protocol_name,
334
            'ALYX_USER': self.iblrig_settings.ALYX_USER,
335
            'ALYX_LAB': self.iblrig_settings.ALYX_LAB,
336
        }
337
        with contextlib.suppress(importlib.metadata.PackageNotFoundError):
2✔
338
            patch_dict['PROJECT_EXTRACTION_VERSION'] = importlib.metadata.version('project_extraction')
2✔
339
        output_dict.update(patch_dict)
2✔
340
        return output_dict
2✔
341

342
    def save_task_parameters_to_json_file(self, destination_folder: Path | None = None) -> Path:
2✔
343
        """
344
        Collects the various settings and parameters of the session and outputs them to a JSON file
345

346
        Returns
347
        -------
348
        Path to the resultant JSON file
349
        """
350
        output_dict = self._make_task_parameters_dict()
2✔
351
        if destination_folder:
2✔
NEW
352
            json_file = destination_folder.joinpath('_iblrig_taskSettings.raw.json')
×
353
        else:
354
            json_file = self.paths['SETTINGS_FILE_PATH']
2✔
355
        json_file.parent.mkdir(parents=True, exist_ok=True)
2✔
356
        with open(json_file, 'w') as outfile:
2✔
357
            json.dump(output_dict, outfile, indent=4, sort_keys=True, default=str)  # converts datetime objects to string
2✔
358
        return json_file  # PosixPath
2✔
359

360
    @property
2✔
361
    def one(self):
2✔
362
        """
363
        One getter
364
        :return:
365
        """
366
        if self._one is None:
2✔
367
            if self.iblrig_settings['ALYX_URL'] is None:
2✔
368
                return
2✔
UNCOV
369
            info_str = (
×
370
                f"alyx client with user name {self.iblrig_settings['ALYX_USER']} "
371
                + f"and url: {self.iblrig_settings['ALYX_URL']}"
372
            )
UNCOV
373
            try:
×
UNCOV
374
                self._one = ONE(
×
375
                    base_url=str(self.iblrig_settings['ALYX_URL']),
376
                    username=self.iblrig_settings['ALYX_USER'],
377
                    mode='remote',
378
                    cache_rest=None,
379
                )
UNCOV
380
                log.info('instantiated ' + info_str)
×
UNCOV
381
            except Exception:
×
UNCOV
382
                log.error(traceback.format_exc())
×
UNCOV
383
                log.error('could not connect to ' + info_str)
×
384
        return self._one
2✔
385

386
    def register_to_alyx(self):
2✔
387
        """
388
        Registers the session to Alyx.
389

390
        This registers the session using the IBLRegistrationClient class.  This uses the settings
391
        file(s) and experiment description file to extract the session data.  This may be called
392
        any number of times and if the session record already exists in Alyx it will be updated.
393
        If session registration fails, it will be done before extraction in the ibllib pipeline.
394

395
        Note that currently the subject weight is registered once and only once.  The recorded
396
        weight of the first protocol run is used.
397

398
        Water administrations are added separately by this method: it is expected that
399
        `register_session` is first called with no recorded total water. This method will then add
400
        a water administration each time it is called, and should therefore be called only once
401
        after protocol is run. If water administration registration fails for all protocols, this
402
        will be done before extraction in the ibllib pipline, however, if a water administration is
403
        successfully registered for one protocol and subsequent ones fail to register, these will
404
        not be added before extraction in ibllib and therefore must be manually added to Alyx.
405

406
        Returns
407
        -------
408
        dict
409
            The registered session record.
410

411
        See Also
412
        --------
413
        ibllib.oneibl.IBLRegistrationClient.register_session - The registration method.
414
        """
415
        if not self.one or self.one.offline:
2✔
416
            return
2✔
417
        try:
2✔
418
            ses, _ = IBLRegistrationClient(self.one).register_session(self.paths.SESSION_FOLDER)
2✔
419
        except Exception:
2✔
420
            log.error(traceback.format_exc())
2✔
421
            log.error('Could not register session to Alyx')
2✔
422
            return
2✔
423
        # add the water administration if there was water administered
424
        try:
2✔
425
            if self.session_info['TOTAL_WATER_DELIVERED']:
2✔
426
                wa_data = dict(
2✔
427
                    session=ses['url'][-36:],
428
                    subject=self.session_info.SUBJECT_NAME,
429
                    water_type=self.task_params.get('REWARD_TYPE', None),
430
                    water_administered=self.session_info['TOTAL_WATER_DELIVERED'] / 1000,
431
                )
432
                self.one.alyx.rest('water-administrations', 'create', data=wa_data)
2✔
433
                log.info(
2✔
434
                    f"Water administered registered in Alyx database: {ses['subject']}," f"{wa_data['water_administered']}mL"
435
                )
436
        except Exception:
2✔
437
            log.error(traceback.format_exc())
2✔
438
            log.error('Could not register water administration to Alyx')
2✔
439
            return
2✔
440
        return ses
2✔
441

442
    def _execute_mixins_shared_function(self, pattern):
2✔
443
        """
444
        Loop over all methods of the class that start with pattern and execute them
445
        :param pattern:'init_mixin', 'start_mixin' or 'stop_mixin'
446
        :return:
447
        """
448
        method_names = [method for method in dir(self) if method.startswith(pattern)]
2✔
449
        methods = [getattr(self, method) for method in method_names if inspect.ismethod(getattr(self, method))]
2✔
450
        for meth in methods:
2✔
451
            meth()
2✔
452

453
    @property
2✔
454
    def time_elapsed(self):
2✔
455
        return datetime.datetime.now() - self.init_datetime
2✔
456

457
    def mock(self):
2✔
458
        self.is_mock = True
2✔
459

460
    def create_session(self):
2✔
461
        # create the session path and save json parameters in the task collection folder
462
        # this will also create the protocol folder
463
        self.paths['TASK_PARAMETERS_FILE'] = self.save_task_parameters_to_json_file()
2✔
464
        # enable file logging
465
        logfile = self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_ibl_log.info-acquisition.log')
2✔
466
        self._setup_loggers(level=self._logger.level, file=logfile)
2✔
467
        # copy the acquisition stub to the remote session folder
468
        sc = BehaviorCopier(self.paths.SESSION_FOLDER, remote_subjects_folder=self.paths['REMOTE_SUBJECT_FOLDER'])
2✔
469
        sc.initialize_experiment(self.experiment_description, overwrite=False)
2✔
470
        self.register_to_alyx()
2✔
471

472
    def run(self):
2✔
473
        """
474
        Common pre-run instructions for all tasks: sigint handler for a graceful exit
475
        :return:
476
        """
477
        # here we make sure we connect to the hardware before writing the session to disk
478
        # this prevents from incrementing endlessly the session number if the hardware fails to connect
479
        self.start_hardware()
2✔
480
        self.create_session()
2✔
481
        # When not running the first chained protocol, we can skip the weighing dialog
482
        first_protocol = int(self.paths.SESSION_RAW_DATA_FOLDER.name.split('_')[-1]) == 0
2✔
483
        if self.session_info.SUBJECT_WEIGHT is None and self.interactive and first_protocol:
2✔
484
            self.session_info.SUBJECT_WEIGHT = graph.numinput(
2✔
485
                'Subject weighing (gr)', f'{self.session_info.SUBJECT_NAME} weight (gr):', nullable=False
486
            )
487

488
        def sigint_handler(*args, **kwargs):
2✔
489
            # create a signal handler for a graceful exit: create a stop flag in the session folder
UNCOV
490
            self.paths.SESSION_FOLDER.joinpath('.stop').touch()
×
UNCOV
491
            log.critical('SIGINT signal detected, will exit at the end of the trial')
×
492

493
        # if upon starting there is a flag just remove it, this is to prevent killing a session in the egg
494
        if self.paths.SESSION_FOLDER.joinpath('.stop').exists():
2✔
UNCOV
495
            self.paths.SESSION_FOLDER.joinpath('.stop').unlink()
×
496

497
        signal.signal(signal.SIGINT, sigint_handler)
2✔
498
        self._run()  # runs the specific task logic ie. trial loop etc...
2✔
499
        # post task instructions
500
        log.critical('Graceful exit')
2✔
501
        log.info(f'Session {self.paths.SESSION_RAW_DATA_FOLDER}')
2✔
502
        self.session_info.SESSION_END_TIME = datetime.datetime.now().isoformat()
2✔
503
        if self.interactive and not self.wizard:
2✔
504
            self.session_info.POOP_COUNT = graph.numinput(
2✔
505
                'Poop count', f'{self.session_info.SUBJECT_NAME} droppings count:', nullable=True, askint=True
506
            )
507
        self.save_task_parameters_to_json_file()
2✔
508
        self.register_to_alyx()
2✔
509
        self._execute_mixins_shared_function('stop_mixin')
2✔
510

511
    @abc.abstractmethod
2✔
512
    def start_hardware(self):
2✔
513
        """
514
        This method doesn't explicitly start the mixins as the order has to be defined in the child classes.
515
        This needs to be implemented in the child classes, and should start and connect to all hardware pieces.
516
        """
UNCOV
517
        pass
×
518

519
    @abc.abstractmethod
2✔
520
    def _run(self):
2✔
UNCOV
521
        pass
×
522

523
    @staticmethod
2✔
524
    def extra_parser():
2✔
525
        """
526
        Optional method that specifies extra kwargs arguments to expose to the user prior running the task.
527
        Make sure you instantiate the parser
528
        :return: argparse.parser()
529
        """
530
        parser = argparse.ArgumentParser(add_help=False)
2✔
531
        return parser
2✔
532

533

534
# this class gets called to get the path constructor utility to predict the session path
535
class EmptySession(BaseSession):
2✔
536
    protocol_name = 'empty'
2✔
537

538
    def _run(self):
2✔
UNCOV
539
        pass
×
540

541
    def start_hardware(self):
2✔
UNCOV
542
        pass
×
543

544

545
class OSCClient(udp_client.SimpleUDPClient):
2✔
546
    """
547
    Handles communication to Bonsai using an UDP Client
548
    OSC channels:
549
        USED:
550
        /t  -> (int)    trial number current
551
        /p  -> (int)    position of stimulus init for current trial
552
        /h  -> (float)  phase of gabor for current trial
553
        /c  -> (float)  contrast of stimulus for current trial
554
        /f  -> (float)  frequency of gabor patch for current trial
555
        /a  -> (float)  angle of gabor patch for current trial
556
        /g  -> (float)  gain of RE to visual stim displacement
557
        /s  -> (float)  sigma of the 2D gaussian of gabor
558
        /e  -> (int)    events transitions  USED BY SOFTCODE HANDLER FUNC
559
        /r  -> (int)    whether to reverse the side contingencies (0, 1)
560
    """
561

562
    OSC_PROTOCOL = {
2✔
563
        'trial_num': dict(mess='/t', type=int),
564
        'position': dict(mess='/p', type=int),
565
        'stim_phase': dict(mess='/h', type=float),
566
        'contrast': dict(mess='/c', type=float),
567
        'stim_freq': dict(mess='/f', type=float),
568
        'stim_angle': dict(mess='/a', type=float),
569
        'stim_gain': dict(mess='/g', type=float),
570
        'stim_sigma': dict(mess='/s', type=float),
571
        'stim_reverse': dict(mess='/r', type=int),
572
    }
573

574
    def __init__(self, port, ip='127.0.0.1'):
2✔
575
        super().__init__(ip, port)
2✔
576

577
    def __del__(self):
2✔
578
        self._sock.close()
2✔
579

580
    def send2bonsai(self, **kwargs):
2✔
581
        """
582
        :param see list of keys in OSC_PROTOCOL
583
        :example: client.send2bonsai(trial_num=6, sim_freq=50)
584
        :return:
585
        """
586
        for k in kwargs:
2✔
587
            if k in self.OSC_PROTOCOL:
2✔
588
                # need to convert basic numpy types to low-level python types for
589
                # punch card generation OSC module, I might as well have written C code
590
                value = kwargs[k].item() if isinstance(kwargs[k], np.generic) else kwargs[k]
2✔
591
                self.send_message(self.OSC_PROTOCOL[k]['mess'], self.OSC_PROTOCOL[k]['type'](value))
2✔
592

593
    def exit(self):
2✔
594
        self.send_message('/x', 1)
2✔
595

596

597
class BonsaiRecordingMixin(BaseSession):
2✔
598
    def init_mixin_bonsai_recordings(self, *args, **kwargs):
2✔
599
        self.bonsai_camera = Bunch({'udp_client': OSCClient(port=7111)})
2✔
600
        self.bonsai_microphone = Bunch({'udp_client': OSCClient(port=7112)})
2✔
601
        self.config = None  # the name of the configuration to run
2✔
602

603
    def stop_mixin_bonsai_recordings(self):
2✔
604
        log.info('Stopping Bonsai recordings')
2✔
605
        self.bonsai_camera.udp_client.exit()
2✔
606
        self.bonsai_microphone.udp_client.exit()
2✔
607

608
    def start_mixin_bonsai_microphone(self):
2✔
609
        if not self.config:
2✔
610
            # Use the first key in the device_cameras map
UNCOV
611
            self.config = next((k for k in self.hardware_settings.device_cameras), None)
×
612
        # The camera workflow on the behaviour computer already contains the microphone recording
613
        # so the device camera workflow and the microphone one are exclusive
614
        if self.config:
2✔
615
            return  # Camera workflow defined; so no need to separately start microphone.
2✔
UNCOV
616
        if not self.task_params.RECORD_SOUND:
×
617
            return  # Sound should not be recorded
×
618
        workflow_file = self.paths.IBLRIG_FOLDER.joinpath(*self.hardware_settings.device_microphone['BONSAI_WORKFLOW'].parts)
×
619
        parameters = {
×
620
            'FileNameMic': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_micData.raw.wav'),
621
            'RecordSound': self.task_params.RECORD_SOUND,
622
        }
UNCOV
623
        call_bonsai(workflow_file, parameters, wait=False, editor=False)
×
UNCOV
624
        log.info('Bonsai microphone recording module loaded: OK')
×
625

626
    @staticmethod
2✔
627
    def _camera_mixin_bonsai_get_workflow_file(cameras: dict | None, name: str) -> Path | None:
2✔
628
        """
629
        Returns the bonsai workflow file for the cameras from the hardware_settings.yaml file.
630

631
        Parameters
632
        ----------
633
        cameras : dict
634
            The hardware settings configuration.
635
        name : {'setup', 'recording'} str
636
            The workflow type.
637

638
        Returns
639
        -------
640
        Path
641
            The workflow path.
642
        """
643
        if cameras is None:
2✔
644
            return None
2✔
645
        return cameras['BONSAI_WORKFLOW'][name]
2✔
646

647
    def start_mixin_bonsai_cameras(self):
2✔
648
        """
649
        This prepares the cameras by starting the pipeline that aligns the camera focus with the
650
        desired borders of rig features, the actual triggering of the cameras is done in the trigger_bonsai_cameras method.
651
        """
652
        if not self.config:
2✔
653
            # Use the first key in the device_cameras map
654
            try:
2✔
655
                self.config = next(k for k in self.hardware_settings.device_cameras)
2✔
UNCOV
656
            except StopIteration:
×
UNCOV
657
                return
×
658
        configuration = self.hardware_settings.device_cameras[self.config]
2✔
659
        if (workflow_file := self._camera_mixin_bonsai_get_workflow_file(configuration, 'setup')) is None:
2✔
UNCOV
660
            return
×
661

662
        # enable trigger of cameras (so Bonsai can disable it again ... sigh)
663
        if PYSPIN_AVAILABLE:
2✔
NEW
664
            from iblrig.video_pyspin import enable_camera_trigger
×
665

NEW
666
            enable_camera_trigger(True)
×
667

668
        call_bonsai(workflow_file, wait=True)  # TODO Parameterize using configuration cameras
2✔
669
        log.info('Bonsai cameras setup module loaded: OK')
2✔
670

671
    def trigger_bonsai_cameras(self):
2✔
672
        if not self.config:
2✔
673
            # Use the first key in the device_cameras map
UNCOV
674
            try:
×
675
                self.config = next(k for k in self.hardware_settings.device_cameras)
×
UNCOV
676
            except StopIteration:
×
677
                return
×
678
        configuration = self.hardware_settings.device_cameras[self.config]
2✔
679
        if set(configuration.keys()) != {'BONSAI_WORKFLOW', 'left'}:
2✔
680
            raise NotImplementedError
×
681
        workflow_file = self._camera_mixin_bonsai_get_workflow_file(configuration, 'recording')
2✔
682
        if workflow_file is None:
2✔
683
            return
×
684
        iblrig.path_helper.create_bonsai_layout_from_template(workflow_file)  # FIXME What does this do?
2✔
685
        # FIXME Use parameters in configuration map
686
        parameters = {
2✔
687
            'FileNameLeft': self.paths.SESSION_FOLDER.joinpath('raw_video_data', '_iblrig_leftCamera.raw.avi'),
688
            'FileNameLeftData': self.paths.SESSION_FOLDER.joinpath('raw_video_data', '_iblrig_leftCamera.frameData.bin'),
689
            'FileNameMic': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_micData.raw.wav'),
690
            'RecordSound': self.task_params.RECORD_SOUND,
691
        }
692
        call_bonsai(workflow_file, parameters, wait=False, editor=False)
2✔
693
        log.info('Bonsai camera recording process started')
2✔
694

695

696
class BonsaiVisualStimulusMixin(BaseSession):
2✔
697
    def init_mixin_bonsai_visual_stimulus(self, *args, **kwargs):
2✔
698
        # camera 7111, microphone 7112
699
        self.bonsai_visual_udp_client = OSCClient(port=7110)
2✔
700

701
    def start_mixin_bonsai_visual_stimulus(self):
2✔
702
        self.choice_world_visual_stimulus()
2✔
703

704
    def stop_mixin_bonsai_visual_stimulus(self):
2✔
705
        log.info('Stopping Bonsai visual stimulus')
2✔
706
        self.bonsai_visual_udp_client.exit()
2✔
707

708
    def send_trial_info_to_bonsai(self):
2✔
709
        """
710
        This sends the trial information to the Bonsai UDP port for the stimulus
711
        The OSC protocol is documented in iblrig.base_tasks.BonsaiVisualStimulusMixin
712
        """
713
        bonsai_dict = {
2✔
714
            k: self.trials_table[k][self.trial_num]
715
            for k in self.bonsai_visual_udp_client.OSC_PROTOCOL
716
            if k in self.trials_table.columns
717
        }
718
        self.bonsai_visual_udp_client.send2bonsai(**bonsai_dict)
2✔
719
        log.debug(bonsai_dict)
2✔
720

721
    def run_passive_visual_stim(self, map_time='00:05:00', rate=0.1, sa_time='00:05:00'):
2✔
722
        workflow_file = self.paths.VISUAL_STIM_FOLDER.joinpath('passiveChoiceWorld', 'passiveChoiceWorld_passive.bonsai')
2✔
723
        file_output_rfm = self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_RFMapStim.raw.bin')
2✔
724
        parameters = {
2✔
725
            'Stim.DisplayIndex': self.hardware_settings.device_screen['DISPLAY_IDX'],
726
            'Stim.SpontaneousActivity0.DueTime': sa_time,
727
            'Stim.ReceptiveFieldMappingStim.FileNameRFMapStim': file_output_rfm,
728
            'Stim.ReceptiveFieldMappingStim.MappingTime': map_time,
729
            'Stim.ReceptiveFieldMappingStim.Rate': rate,
730
        }
731
        log.info('Starting spontaneous activity and RF mapping stims')
2✔
732
        s = call_bonsai(workflow_file, parameters, editor=False)
2✔
733
        log.info('Spontaneous activity and RF mapping stims finished')
2✔
734
        return s
2✔
735

736
    def choice_world_visual_stimulus(self):
2✔
737
        if self.task_params.VISUAL_STIMULUS is None:
2✔
738
            return
×
739
        workflow_file = self.paths.VISUAL_STIM_FOLDER.joinpath(self.task_params.VISUAL_STIMULUS)
2✔
740
        parameters = {
2✔
741
            'Stim.DisplayIndex': self.hardware_settings.device_screen['DISPLAY_IDX'],
742
            'Stim.FileNameStimPositionScreen': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_stimPositionScreen.raw.csv'),
743
            'Stim.FileNameSyncSquareUpdate': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_syncSquareUpdate.raw.csv'),
744
            'Stim.FileNamePositions': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_encoderPositions.raw.ssv'),
745
            'Stim.FileNameEvents': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_encoderEvents.raw.ssv'),
746
            'Stim.FileNameTrialInfo': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_encoderTrialInfo.raw.ssv'),
747
            'Stim.REPortName': self.hardware_settings.device_rotary_encoder['COM_ROTARY_ENCODER'],
748
            'Stim.sync_x': self.task_params.SYNC_SQUARE_X,
749
            'Stim.sync_y': self.task_params.SYNC_SQUARE_Y,
750
            'Stim.TranslationZ': -self.task_params.STIM_TRANSLATION_Z,  # MINUS!!
751
        }
752
        call_bonsai(workflow_file, parameters, wait=False, editor=self.task_params.BONSAI_EDITOR, bootstrap=False)
2✔
753
        log.info('Bonsai visual stimulus module loaded: OK')
2✔
754

755

756
class BpodMixin(BaseSession):
2✔
757
    def _raise_on_undefined_softcode_handler(self, byte: int):
2✔
758
        raise ValueError(f'No handler defined for softcode #{byte}')
2✔
759

760
    def softcode_dictionary(self) -> OrderedDict[int, Callable]:
2✔
761
        """
762
        Returns a softcode handler dict where each key corresponds to the softcode and each value to the
763
        function to be called.
764

765
        This needs to be wrapped this way because
766
            1) we want to be able to inherit this and dynamically add softcode to the dictionry
767
            2) we need to provide the Task object (self) at run time to have the functions with static args
768
        This is tricky as it is unclear if the task object is a copy or a reference when passed here.
769

770

771
        Returns
772
        -------
773
        OrderedDict[int, Callable]
774
            Softcode dictionary
775
        """
776
        softcode_dict = OrderedDict(
2✔
777
            {
778
                SOFTCODE.STOP_SOUND: self.sound['sd'].stop,
779
                SOFTCODE.PLAY_TONE: lambda: self.sound['sd'].play(self.sound['GO_TONE'], self.sound['samplerate']),
780
                SOFTCODE.PLAY_NOISE: lambda: self.sound['sd'].play(self.sound['WHITE_NOISE'], self.sound['samplerate']),
781
                SOFTCODE.TRIGGER_CAMERA: getattr(
782
                    self, 'trigger_bonsai_cameras', lambda: self._raise_on_undefined_softcode_handler(SOFTCODE.TRIGGER_CAMERA)
783
                ),
784
            }
785
        )
786
        return softcode_dict
2✔
787

788
    def init_mixin_bpod(self, *args, **kwargs):
2✔
789
        self.bpod = Bpod()
2✔
790

791
    def stop_mixin_bpod(self):
2✔
792
        self.bpod.close()
2✔
793

794
    def start_mixin_bpod(self):
2✔
795
        if self.hardware_settings['device_bpod']['COM_BPOD'] is None:
2✔
796
            raise ValueError(
2✔
797
                'The value for device_bpod:COM_BPOD in '
798
                'settings/hardware_settings.yaml is null. Please '
799
                'provide a valid port name.'
800
            )
UNCOV
801
        self.bpod = Bpod(self.hardware_settings['device_bpod']['COM_BPOD'], disable_behavior_ports=[1, 2, 3])
×
UNCOV
802
        self.bpod.define_rotary_encoder_actions()
×
UNCOV
803
        self.bpod.set_status_led(False)
×
UNCOV
804
        assert self.bpod.is_connected
×
805
        log.info('Bpod hardware module loaded: OK')
×
806
        # make the bpod send spacer signals to the main sync clock for protocol discovery
NEW
807
        self.send_spacers()
×
808

809
    def send_spacers(self):
2✔
UNCOV
810
        log.info('Starting task by sending a spacer signal on BNC1')
×
UNCOV
811
        sma = StateMachine(self.bpod)
×
UNCOV
812
        Spacer().add_spacer_states(sma, next_state='exit')
×
813
        self.bpod.send_state_machine(sma)
×
814
        self.bpod.run_state_machine(sma)  # Locks until state machine 'exit' is reached
×
UNCOV
815
        return self.bpod.session.current_trial.export()
×
816

817

818
class Frame2TTLMixin(BaseSession):
2✔
819
    """
820
    Frame 2 TTL interface for state machine
821
    """
822

823
    def init_mixin_frame2ttl(self, *args, **kwargs):
2✔
824
        pass
2✔
825

826
    def start_mixin_frame2ttl(self):
2✔
827
        # todo assert calibration
828
        if self.hardware_settings['device_frame2ttl']['COM_F2TTL'] is None:
2✔
829
            raise ValueError(
2✔
830
                'The value for device_frame2ttl:COM_F2TTL in '
831
                'settings/hardware_settings.yaml is null. Please '
832
                'provide a valid port name.'
833
            )
UNCOV
834
        Frame2TTL(
×
835
            port=self.hardware_settings['device_frame2ttl']['COM_F2TTL'],
836
            threshold_dark=self.hardware_settings['device_frame2ttl']['F2TTL_DARK_THRESH'],
837
            threshold_light=self.hardware_settings['device_frame2ttl']['F2TTL_LIGHT_THRESH'],
838
        ).close()
UNCOV
839
        log.info('Frame2TTL: Thresholds set.')
×
840

841

842
class RotaryEncoderMixin(BaseSession):
2✔
843
    """
844
    Rotary encoder interface for state machine
845
    """
846

847
    def init_mixin_rotary_encoder(self, *args, **kwargs):
2✔
848
        self.device_rotary_encoder = MyRotaryEncoder(
2✔
849
            all_thresholds=self.task_params.STIM_POSITIONS + self.task_params.QUIESCENCE_THRESHOLDS,
850
            gain=self.task_params.STIM_GAIN,
851
            com=self.hardware_settings.device_rotary_encoder['COM_ROTARY_ENCODER'],
852
            connect=False,
853
        )
854

855
    def start_mixin_rotary_encoder(self):
2✔
856
        if self.hardware_settings['device_rotary_encoder']['COM_ROTARY_ENCODER'] is None:
2✔
857
            raise ValueError(
2✔
858
                'The value for device_rotary_encoder:COM_ROTARY_ENCODER in '
859
                'settings/hardware_settings.yaml is null. Please '
860
                'provide a valid port name.'
861
            )
UNCOV
862
        try:
×
UNCOV
863
            self.device_rotary_encoder.connect()
×
UNCOV
864
        except serial.serialutil.SerialException as e:
×
UNCOV
865
            raise serial.serialutil.SerialException(
×
866
                f'The rotary encoder COM port {self.device_rotary_encoder.RE_PORT} is already in use. This is usually'
867
                f' due to a Bonsai process currently running on the computer. Make sure all Bonsai windows are'
868
                f' closed prior to running the task'
869
            ) from e
UNCOV
870
        except Exception as e:
×
UNCOV
871
            raise Exception(
×
872
                "The rotary encoder couldn't connect. If the bpod is glowing in green,"
873
                'disconnect and reconnect bpod from the computer'
874
            ) from e
875
        log.info('Rotary encoder module loaded: OK')
×
876

877

878
class ValveMixin(BaseSession):
2✔
879
    def init_mixin_valve(self: object):
2✔
880
        self.valve = Bunch({})
2✔
881
        # the template settings files have a date in 2099, so assume that the rig is not calibrated if that is the case
882
        # the assertion on calibration is thrown when starting the device
883
        self.valve['is_calibrated'] = datetime.date.today() >= self.hardware_settings['device_valve']['WATER_CALIBRATION_DATE']
2✔
884
        self.valve['fcn_vol2time'] = scipy.interpolate.pchip(
2✔
885
            self.hardware_settings['device_valve']['WATER_CALIBRATION_WEIGHT_PERDROP'],
886
            self.hardware_settings['device_valve']['WATER_CALIBRATION_OPEN_TIMES'],
887
        )
888

889
    def start_mixin_valve(self):
2✔
890
        # if the rig is not on manual settings, then the reward valve has to be calibrated to run the experiment
UNCOV
891
        assert self.task_params.AUTOMATIC_CALIBRATION is False or self.valve['is_calibrated'], """
×
892
            ##########################################
893
            NO CALIBRATION INFORMATION FOUND IN HARDWARE SETTINGS:
894
            Calibrate the rig or use a manual calibration
895
            PLEASE GO TO the task settings yaml file and set:
896
                'AUTOMATIC_CALIBRATION': false
897
                'CALIBRATION_VALUE' = <MANUAL_CALIBRATION>
898
            ##########################################"""
899
        # regardless of the calibration method, the reward valve time has to be lower than 1 second
UNCOV
900
        assert self.compute_reward_time(amount_ul=1.5) < 1, """
×
901
            ##########################################
902
                REWARD VALVE TIME IS TOO HIGH!
903
            Probably because of a BAD calibration file
904
            Calibrate the rig or use a manual calibration
905
            PLEASE GO TO the task settings yaml file and set:
906
                AUTOMATIC_CALIBRATION = False
907
                CALIBRATION_VALUE = <MANUAL_CALIBRATION>
908
            ##########################################"""
UNCOV
909
        log.info('Water valve module loaded: OK')
×
910

911
    def compute_reward_time(self, amount_ul=None):
2✔
912
        amount_ul = self.task_params.REWARD_AMOUNT_UL if amount_ul is None else amount_ul
2✔
913
        if self.task_params.AUTOMATIC_CALIBRATION:
2✔
914
            return self.valve['fcn_vol2time'](amount_ul) / 1e3
2✔
915
        else:  # this is the manual manual calibration value
916
            return self.task_params.CALIBRATION_VALUE / 3 * amount_ul
×
917

918
    def valve_open(self, reward_valve_time):
2✔
919
        """
920
        Opens the reward valve for a given amount of time and return bpod data
921
        :param reward_valve_time:
922
        :return:
923
        """
924
        sma = StateMachine(self.bpod)
×
925
        sma.add_state(
×
926
            state_name='valve_open',
927
            state_timer=reward_valve_time,
928
            output_actions=[('Valve1', 255), ('BNC1', 255)],  # To FPGA
929
            state_change_conditions={'Tup': 'exit'},
930
        )
UNCOV
931
        self.bpod.send_state_machine(sma)
×
932
        self.bpod.run_state_machine(sma)  # Locks until state machine 'exit' is reached
×
UNCOV
933
        return self.bpod.session.current_trial.export()
×
934

935

936
class SoundMixin(BaseSession):
2✔
937
    """
938
    Sound interface methods for state machine
939
    """
940

941
    def init_mixin_sound(self):
2✔
942
        self.sound = Bunch({'GO_TONE': None, 'WHITE_NOISE': None})
2✔
943
        sound_output = self.hardware_settings.device_sound['OUTPUT']
2✔
944

945
        # additional gain factor for bringing the different combinations of sound-cards and amps to the same output level
946
        # TODO: this needs proper calibration and refactoring
947
        if self.hardware_settings.device_sound.OUTPUT == 'hifi' and self.hardware_settings.device_sound.AMP_TYPE == 'AMP2X15':
2✔
UNCOV
948
            amp_gain_factor = 0.25
×
949
        else:
950
            amp_gain_factor = 1.0
2✔
951
        self.task_params.GO_TONE_AMPLITUDE *= amp_gain_factor
2✔
952
        self.task_params.WHITE_NOISE_AMPLITUDE *= amp_gain_factor
2✔
953

954
        # sound device sd is actually the module soundevice imported above.
955
        # not sure how this plays out when referenced outside of this python file
956
        self.sound['sd'], self.sound['samplerate'], self.sound['channels'] = sound_device_factory(output=sound_output)
2✔
957
        # Create sounds and output actions of state machine
958
        self.sound['GO_TONE'] = iblrig.sound.make_sound(
2✔
959
            rate=self.sound['samplerate'],
960
            frequency=self.task_params.GO_TONE_FREQUENCY,
961
            duration=self.task_params.GO_TONE_DURATION,
962
            amplitude=self.task_params.GO_TONE_AMPLITUDE * amp_gain_factor,
963
            fade=0.01,
964
            chans=self.sound['channels'],
965
        )
966
        self.sound['WHITE_NOISE'] = iblrig.sound.make_sound(
2✔
967
            rate=self.sound['samplerate'],
968
            frequency=-1,
969
            duration=self.task_params.WHITE_NOISE_DURATION,
970
            amplitude=self.task_params.WHITE_NOISE_AMPLITUDE * amp_gain_factor,
971
            fade=0.01,
972
            chans=self.sound['channels'],
973
        )
974

975
    def start_mixin_sound(self):
2✔
976
        """
977
        Depends on bpod mixin start for hard sound card
978
        :return:
979
        """
UNCOV
980
        assert self.bpod.is_connected, 'The sound mixin depends on the bpod mixin being connected'
×
981
        # SoundCard config params
UNCOV
982
        match self.hardware_settings.device_sound['OUTPUT']:
×
UNCOV
983
            case 'harp':
×
UNCOV
984
                assert self.bpod.sound_card is not None, 'No harp sound-card connected to Bpod'
×
UNCOV
985
                sound.configure_sound_card(
×
986
                    sounds=[self.sound.GO_TONE, self.sound.WHITE_NOISE],
987
                    indexes=[self.task_params.GO_TONE_IDX, self.task_params.WHITE_NOISE_IDX],
988
                    sample_rate=self.sound['samplerate'],
989
                )
UNCOV
990
                self.bpod.define_harp_sounds_actions(
×
991
                    module=self.bpod.sound_card,
992
                    go_tone_index=self.task_params.GO_TONE_IDX,
993
                    noise_index=self.task_params.WHITE_NOISE_IDX,
994
                )
UNCOV
995
            case 'hifi':
×
UNCOV
996
                module = self.bpod.get_module('^HiFi')
×
UNCOV
997
                assert module is not None, 'No HiFi module connected to Bpod'
×
UNCOV
998
                assert self.hardware_settings.device_sound.COM_SOUND is not None
×
UNCOV
999
                hifi = HiFi(port=self.hardware_settings.device_sound.COM_SOUND, sampling_rate_hz=self.sound['samplerate'])
×
UNCOV
1000
                hifi.load(index=self.task_params.GO_TONE_IDX, data=self.sound.GO_TONE)
×
UNCOV
1001
                hifi.load(index=self.task_params.WHITE_NOISE_IDX, data=self.sound.WHITE_NOISE)
×
UNCOV
1002
                hifi.push()
×
UNCOV
1003
                hifi.close()
×
UNCOV
1004
                self.bpod.define_harp_sounds_actions(
×
1005
                    module=module,
1006
                    go_tone_index=self.task_params.GO_TONE_IDX,
1007
                    noise_index=self.task_params.WHITE_NOISE_IDX,
1008
                )
UNCOV
1009
            case _:
×
UNCOV
1010
                self.bpod.define_xonar_sounds_actions()
×
UNCOV
1011
        log.info(f"Sound module loaded: OK: {self.hardware_settings.device_sound['OUTPUT']}")
×
1012

1013
    def sound_play_noise(self, state_timer=0.510, state_name='play_noise'):
2✔
1014
        """
1015
        Plays the noise sound for the error feedback using bpod state machine
1016
        :return: bpod current trial export
1017
        """
UNCOV
1018
        return self._sound_play(state_name=state_name, output_actions=[self.bpod.actions.play_tone], state_timer=state_timer)
×
1019

1020
    def sound_play_tone(self, state_timer=0.102, state_name='play_tone'):
2✔
1021
        """
1022
        Plays the ready tone beep using bpod state machine
1023
        :return: bpod current trial export
1024
        """
UNCOV
1025
        return self._sound_play(state_name=state_name, output_actions=[self.bpod.actions.play_tone], state_timer=state_timer)
×
1026

1027
    def _sound_play(self, state_timer=None, output_actions=None, state_name='play_sound'):
2✔
1028
        """
1029
        Plays a sound using bpod state machine - the sound must be defined in the init_mixin_sound method
1030
        """
UNCOV
1031
        assert state_timer is not None, 'state_timer must be defined'
×
UNCOV
1032
        assert output_actions is not None, 'output_actions must be defined'
×
UNCOV
1033
        sma = StateMachine(self.bpod)
×
UNCOV
1034
        sma.add_state(
×
1035
            state_name=state_name,
1036
            state_timer=state_timer,
1037
            output_actions=[self.bpod.actions.play_tone],
1038
            state_change_conditions={'BNC2Low': 'exit', 'Tup': 'exit'},
1039
        )
UNCOV
1040
        self.bpod.send_state_machine(sma)
×
UNCOV
1041
        self.bpod.run_state_machine(sma)  # Locks until state machine 'exit' is reached
×
UNCOV
1042
        return self.bpod.session.current_trial.export()
×
1043

1044

1045
class SpontaneousSession(BaseSession):
2✔
1046
    """
1047
    A Spontaneous task doesn't have trials, it just runs until the user stops it
1048
    It is used to get extraction structure for data streams
1049
    """
1050

1051
    def __init__(self, duration_secs=None, **kwargs):
2✔
1052
        super().__init__(**kwargs)
2✔
1053
        self.duration_secs = duration_secs
2✔
1054

1055
    def start_hardware(self):
2✔
1056
        pass  # no mixin here, life is but a dream
2✔
1057

1058
    def _run(self):
2✔
1059
        """
1060
        This is the method that runs the task with the actual state machine
1061
        :return:
1062
        """
1063
        log.info('Starting spontaneous acquisition')
2✔
1064
        while True:
2✔
1065
            time.sleep(1.5)
2✔
1066
            if self.duration_secs is not None and self.time_elapsed.seconds > self.duration_secs:
2✔
1067
                break
2✔
UNCOV
1068
            if self.paths.SESSION_FOLDER.joinpath('.stop').exists():
×
UNCOV
1069
                self.paths.SESSION_FOLDER.joinpath('.stop').unlink()
×
UNCOV
1070
                break
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc