• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 10568073180

26 Aug 2024 10:13PM UTC coverage: 47.538% (+0.7%) from 46.79%
10568073180

Pull #711

github

eeff82
web-flow
Merge 599c9edfb into ad41db25f
Pull Request #711: 8.23.2

121 of 135 new or added lines in 8 files covered. (89.63%)

1025 existing lines in 22 files now uncovered.

4084 of 8591 relevant lines covered (47.54%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.71
/iblrig/base_tasks.py
1
"""
2
Commonalities for all tasks.
3

4
This module provides hardware mixins that can be used together with BaseSession to compose tasks.
5
This module tries to exclude task related logic.
6
"""
7

8
import abc
2✔
9
import argparse
2✔
10
import contextlib
2✔
11
import datetime
2✔
12
import importlib.metadata
2✔
13
import inspect
2✔
14
import json
2✔
15
import logging
2✔
16
import signal
2✔
17
import sys
2✔
18
import time
2✔
19
import traceback
2✔
20
from abc import ABC
2✔
21
from collections import OrderedDict
2✔
22
from collections.abc import Callable
2✔
23
from pathlib import Path
2✔
24

25
import numpy as np
2✔
26
import pandas as pd
2✔
27
import scipy.interpolate
2✔
28
import serial
2✔
29
import yaml
2✔
30
from pythonosc import udp_client
2✔
31

32
import ibllib.io.session_params as ses_params
2✔
33
import iblrig
2✔
34
import iblrig.graphic as graph
2✔
35
import iblrig.path_helper
2✔
36
import pybpodapi
2✔
37
from ibllib.oneibl.registration import IBLRegistrationClient
2✔
38
from iblrig import net, sound
2✔
39
from iblrig.constants import BASE_PATH, BONSAI_EXE, PYSPIN_AVAILABLE
2✔
40
from iblrig.frame2ttl import Frame2TTL
2✔
41
from iblrig.hardware import SOFTCODE, Bpod, MyRotaryEncoder, sound_device_factory
2✔
42
from iblrig.hifi import HiFi
2✔
43
from iblrig.path_helper import load_pydantic_yaml
2✔
44
from iblrig.pydantic_definitions import HardwareSettings, RigSettings
2✔
45
from iblrig.tools import call_bonsai
2✔
46
from iblrig.transfer_experiments import BehaviorCopier, VideoCopier
2✔
47
from iblutil.io.net.base import ExpMessage
2✔
48
from iblutil.spacer import Spacer
2✔
49
from iblutil.util import Bunch, flatten, setup_logger
2✔
50
from one.alf.io import next_num_folder
2✔
51
from one.api import ONE, OneAlyx
2✔
52
from pybpodapi.protocol import StateMachine
2✔
53

54
OSC_CLIENT_IP = '127.0.0.1'
2✔
55

56
log = logging.getLogger(__name__)
2✔
57

58

59
class BaseSession(ABC):
2✔
60
    version = None
2✔
61
    """str: !!CURRENTLY UNUSED!! task version string."""
2✔
62
    protocol_name: str | None = None
2✔
63
    """str: The name of the task protocol (NB: avoid spaces)."""
2✔
64
    base_parameters_file: Path | None = None
2✔
65
    """Path: A YAML file containing base, default task parameters."""
2✔
66
    is_mock = False
2✔
67
    """list of str: One or more ibllib.pipes.tasks.Task names for task extraction."""
2✔
68
    logger: logging.Logger = None
2✔
69
    """logging.Logger: Log instance used solely to keep track of log level passed to constructor."""
2✔
70
    experiment_description: dict = {}
2✔
71
    """dict: The experiment description."""
2✔
72
    extractor_tasks: list | None = None
2✔
73
    """list of str: An optional list of pipeline task class names to instantiate when preprocessing task data."""
2✔
74

75
    def __init__(
2✔
76
        self,
77
        subject=None,
78
        task_parameter_file=None,
79
        file_hardware_settings=None,
80
        hardware_settings: HardwareSettings = None,
81
        file_iblrig_settings=None,
82
        iblrig_settings: RigSettings = None,
83
        one=None,
84
        interactive=True,
85
        projects=None,
86
        procedures=None,
87
        stub=None,
88
        subject_weight_grams=None,
89
        append=False,
90
        wizard=False,
91
        log_level='INFO',
92
        **kwargs,
93
    ):
94
        """
95
        :param subject: The subject nickname. Required.
96
        :param task_parameter_file: an optional path to the task_parameters.yaml file
97
        :param file_hardware_settings: name of the hardware file in the settings folder, or full file path
98
        :param hardware_settings: an optional dictionary of hardware settings. Keys will override any keys in the file
99
        :param file_iblrig_settings: name of the iblrig file in the settings folder, or full file path
100
        :param iblrig_settings: an optional dictionary of iblrig settings. Keys will override any keys in the file
101
        :param one: an optional instance of ONE
102
        :param interactive:
103
        :param projects: An optional list of Alyx protocols.
104
        :param procedures: An optional list of Alyx procedures.
105
        :param subject_weight_grams: weight of the subject
106
        :param stub: A full path to an experiment description file containing experiment information.
107
        :param append: bool, if True, append to the latest existing session of the same subject for the same day
108
        """
109
        self.extractor_tasks = getattr(self, 'extractor_tasks', None)
2✔
110
        assert self.protocol_name is not None, 'Protocol name must be defined by the child class'
2✔
111
        self._logger = None
2✔
112
        self._setup_loggers(level=log_level)
2✔
113
        if not isinstance(self, EmptySession):
2✔
114
            log.info(f'Running iblrig {iblrig.__version__}, pybpod version {pybpodapi.__version__}')
2✔
115
        log.info(f'Session call: {" ".join(sys.argv)}')
2✔
116
        self.interactive = interactive
2✔
117
        self._one = one
2✔
118
        self.init_datetime = datetime.datetime.now()
2✔
119

120
        # loads in the settings: first load the files, then update with the input argument if provided
121
        self.hardware_settings: HardwareSettings = load_pydantic_yaml(HardwareSettings, file_hardware_settings)
2✔
122
        if hardware_settings is not None:
2✔
123
            self.hardware_settings.update(hardware_settings)
2✔
124
            HardwareSettings.model_validate(self.hardware_settings)
2✔
125
        self.iblrig_settings: RigSettings = load_pydantic_yaml(RigSettings, file_iblrig_settings)
2✔
126
        if iblrig_settings is not None:
2✔
127
            self.iblrig_settings.update(iblrig_settings)
2✔
128
            RigSettings.model_validate(self.iblrig_settings)
2✔
129

130
        self.wizard = wizard
2✔
131

132
        # Load the tasks settings, from the task folder or override with the input argument
133
        self.task_params = self.read_task_parameter_files(task_parameter_file)
2✔
134

135
        self.session_info = Bunch(
2✔
136
            {
137
                'NTRIALS': 0,
138
                'NTRIALS_CORRECT': 0,
139
                'PROCEDURES': procedures,
140
                'PROJECTS': projects,
141
                'SESSION_START_TIME': self.init_datetime.isoformat(),
142
                'SESSION_END_TIME': None,
143
                'SESSION_NUMBER': 0,
144
                'SUBJECT_NAME': subject,
145
                'SUBJECT_WEIGHT': subject_weight_grams,
146
                'TOTAL_WATER_DELIVERED': 0,
147
            }
148
        )
149
        # Executes mixins init methods
150
        self._execute_mixins_shared_function('init_mixin')
2✔
151
        self.paths = self._init_paths(append=append)
2✔
152
        if not isinstance(self, EmptySession):
2✔
153
            log.info(f'Session raw data: {self.paths.SESSION_RAW_DATA_FOLDER}')
2✔
154
        # Prepare the experiment description dictionary
155
        self.experiment_description = self.make_experiment_description_dict(
2✔
156
            self.protocol_name,
157
            self.paths.get('TASK_COLLECTION'),
158
            procedures,
159
            projects,
160
            self.hardware_settings,
161
            stub,
162
            extractors=self.extractor_tasks,
163
        )
164

165
    @classmethod
2✔
166
    def get_task_file(cls) -> Path:
2✔
167
        """
168
        Get the path to the task's python file.
169

170
        Returns
171
        -------
172
        Path
173
            The path to the task file.
174
        """
175
        return Path(inspect.getfile(cls))
2✔
176

177
    @classmethod
2✔
178
    def get_task_directory(cls) -> Path:
2✔
179
        """
180
        Get the path to the task's directory.
181

182
        Returns
183
        -------
184
        Path
185
            The path to the task's directory.
186
        """
187
        return cls.get_task_file().parent
2✔
188

189
    @classmethod
2✔
190
    def read_task_parameter_files(cls, task_parameter_file: str | Path | None = None) -> Bunch:
2✔
191
        """
192
        Get the task's parameters from the various YAML files in the hierarchy.
193

194
        Parameters
195
        ----------
196
        task_parameter_file : str or Path, optional
197
            Path to override the task parameter file
198

199
        Returns
200
        -------
201
        Bunch
202
            Task parameters
203
        """
204
        # Load the tasks settings, from the task folder or override with the input argument
205
        base_parameters_files = [task_parameter_file or cls.get_task_directory().joinpath('task_parameters.yaml')]
2✔
206

207
        # loop through the task hierarchy to gather parameter files
208
        for c in cls.__mro__:
2✔
209
            base_file = getattr(c, 'base_parameters_file', None)
2✔
210
            if base_file is not None:
2✔
211
                base_parameters_files.append(base_file)
2✔
212

213
        # remove list duplicates while preserving order, we want the highest order first
214
        base_parameters_files = list(reversed(list(dict.fromkeys(base_parameters_files))))
2✔
215

216
        # loop through files and update the dictionary, the latest files in the hierarchy have precedence
217
        task_params = dict()
2✔
218
        for param_file in base_parameters_files:
2✔
219
            if Path(param_file).exists():
2✔
220
                with open(param_file) as fp:
2✔
221
                    params = yaml.safe_load(fp)
2✔
222
                if params is not None:
2✔
223
                    task_params.update(params)
2✔
224

225
        # at last sort the dictionary so itÅ› easier for a human to navigate the many keys, return as a Bunch
226
        return Bunch(sorted(task_params.items()))
2✔
227

228
    def _init_paths(self, append: bool = False) -> Bunch:
2✔
229
        r"""
230
        Initialize session paths.
231

232
        Parameters
233
        ----------
234
        append : bool
235
            Iterate task collection within today's most recent session folder for the selected subject, instead of
236
            iterating session number.
237

238
        Returns
239
        -------
240
        Bunch
241
            Bunch with keys:
242

243
            *   BONSAI: full path to the bonsai executable
244
                `C:\iblrigv8\Bonsai\Bonsai.exe`
245
            *   VISUAL_STIM_FOLDER: full path to the visual stimulus folder
246
                `C:\iblrigv8\visual_stim`
247
            *   LOCAL_SUBJECT_FOLDER: full path to the local subject folder
248
                `C:\iblrigv8_data\mainenlab\Subjects`
249
            *   REMOTE_SUBJECT_FOLDER: full path to the remote subject folder
250
                `Y:\Subjects`
251
            *   SESSION_FOLDER: full path to the current session:
252
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001`
253
            *   TASK_COLLECTION: folder name of the current task
254
                `raw_task_data_00`
255
            *   SESSION_RAW_DATA_FOLDER: concatenation of the session folder and the task collection.
256
                This is where the task data gets written
257
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001\raw_task_data_00`
258
            *   DATA_FILE_PATH: contains the bpod trials
259
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001\raw_task_data_00\_iblrig_taskData.raw.jsonable`
260
            *   SETTINGS_FILE_PATH: contains the task settings
261
                `C:\iblrigv8_data\mainenlab\Subjects\SWC_043\2019-01-01\001\raw_task_data_00\_iblrig_taskSettings.raw.json`
262
        """
263
        rig_computer_paths = iblrig.path_helper.get_local_and_remote_paths(
2✔
264
            local_path=self.iblrig_settings.iblrig_local_data_path,
265
            remote_path=self.iblrig_settings.iblrig_remote_data_path,
266
            lab=self.iblrig_settings.ALYX_LAB,
267
            iblrig_settings=self.iblrig_settings,
268
        )
269
        paths = Bunch({'IBLRIG_FOLDER': BASE_PATH})
2✔
270
        paths.BONSAI = BONSAI_EXE
2✔
271
        paths.VISUAL_STIM_FOLDER = BASE_PATH.joinpath('visual_stim')
2✔
272
        paths.LOCAL_SUBJECT_FOLDER = rig_computer_paths['local_subjects_folder']
2✔
273
        paths.REMOTE_SUBJECT_FOLDER = rig_computer_paths['remote_subjects_folder']
2✔
274
        # initialize the session path
275
        date_folder = paths.LOCAL_SUBJECT_FOLDER.joinpath(
2✔
276
            self.session_info.SUBJECT_NAME, self.session_info.SESSION_START_TIME[:10]
277
        )
278
        if append:
2✔
279
            # this is the case where we append a new protocol to an existing session
280
            todays_sessions = sorted(filter(Path.is_dir, date_folder.glob('*')), reverse=True)
2✔
281
            assert len(todays_sessions) > 0, f'Trying to chain a protocol, but no session folder found in {date_folder}'
2✔
282
            paths.SESSION_FOLDER = todays_sessions[0]
2✔
283
            paths.TASK_COLLECTION = iblrig.path_helper.iterate_collection(paths.SESSION_FOLDER)
2✔
284
            if self.hardware_settings.get('MAIN_SYNC', False) and not paths.TASK_COLLECTION.endswith('00'):
2✔
UNCOV
285
                """
×
286
                Chained protocols make little sense when Bpod is the main sync as there is no
287
                continuous acquisition between protocols.  Only one sync collection can be defined in
288
                the experiment description file.
289
                If you are running experiments with an ephys rig (nidq) or an external daq, you should
290
                correct the MAIN_SYNC parameter in the hardware settings file in ./settings/hardware_settings.yaml
291
                """
UNCOV
292
                raise RuntimeError('Chained protocols not supported for bpod-only sessions')
×
293
        else:
294
            # in this case the session path is created from scratch
295
            paths.SESSION_FOLDER = date_folder / next_num_folder(date_folder)
2✔
296
            paths.TASK_COLLECTION = iblrig.path_helper.iterate_collection(paths.SESSION_FOLDER)
2✔
297

298
        self.session_info.SESSION_NUMBER = int(paths.SESSION_FOLDER.name)
2✔
299
        paths.SESSION_RAW_DATA_FOLDER = paths.SESSION_FOLDER.joinpath(paths.TASK_COLLECTION)
2✔
300
        paths.DATA_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskData.raw.jsonable')
2✔
301
        paths.SETTINGS_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskSettings.raw.json')
2✔
302
        return paths
2✔
303

304
    @property
2✔
305
    def exp_ref(self):
2✔
306
        """Construct an experiment reference string from the session info attribute."""
307
        subject, date, number = (self.session_info[k] for k in ('SUBJECT_NAME', 'SESSION_START_TIME', 'SESSION_NUMBER'))
2✔
308
        if not all([subject, date, number]):
2✔
309
            return None
2✔
310
        return self.one.dict2ref(dict(subject=subject, date=date[:10], sequence=str(number)))
2✔
311

312
    def _setup_loggers(self, level='INFO', level_bpod='WARNING', file=None):
2✔
313
        self._logger = setup_logger('iblrig', level=level, file=file)  # logger attr used by create_session to determine log level
2✔
314
        setup_logger('pybpodapi', level=level_bpod, file=file)
2✔
315

316
    def _remove_file_loggers(self):
2✔
317
        for logger_name in ['iblrig', 'pybpodapi']:
2✔
318
            logger = logging.getLogger(logger_name)
2✔
319
            file_handlers = [fh for fh in logger.handlers if isinstance(fh, logging.FileHandler)]
2✔
320
            for fh in file_handlers:
2✔
321
                fh.close()
2✔
322
                logger.removeHandler(fh)
2✔
323

324
    @staticmethod
2✔
325
    def make_experiment_description_dict(
2✔
326
        task_protocol: str,
327
        task_collection: str,
328
        procedures: list = None,
329
        projects: list = None,
330
        hardware_settings: dict | HardwareSettings = None,
331
        stub: Path = None,
332
        extractors: list = None,
333
        camera_config: str = None,
334
    ):
335
        """
336
        Construct an experiment description dictionary.
337

338
        Parameters
339
        ----------
340
        task_protocol : str
341
            The task protocol name, e.g. _ibl_trainingChoiceWorld2.0.0.
342
        task_collection : str
343
            The task collection name, e.g. raw_task_data_00.
344
        procedures : list
345
            An optional list of Alyx procedures.
346
        projects : list
347
            An optional list of Alyx protocols.
348
        hardware_settings : dict
349
            An optional dict of hardware devices, loaded from the hardware_settings.yaml file.
350
        stub : dict
351
            An optional experiment description stub to update.
352
        extractors: list
353
            An optional list of extractor names for the task.
354
        camera_config : str
355
            The camera configuration name in the hardware settings. Defaults to the first key in
356
            'device_cameras'.
357

358
        Returns
359
        -------
360
        dict
361
            The experiment description.
362
        """
363
        description = ses_params.read_params(stub) if stub else {}
2✔
364

365
        # Add hardware devices
366
        if hardware_settings is not None:
2✔
367
            if isinstance(hardware_settings, HardwareSettings):
2✔
368
                hardware_settings = hardware_settings.model_dump()
2✔
369
            devices = {}
2✔
370
            cams = hardware_settings.get('device_cameras', None)
2✔
371
            if cams:
2✔
372
                devices['cameras'] = {}
2✔
373
                camera_config = camera_config or next((k for k in cams), {})
2✔
374
                devices.update(VideoCopier.config2stub(cams[camera_config])['devices'])
2✔
375
            if hardware_settings.get('device_microphone', None):
2✔
376
                devices['microphone'] = {'microphone': {'collection': task_collection, 'sync_label': 'audio'}}
2✔
377
            ses_params.merge_params(description, {'devices': devices})
2✔
378

379
        # Add projects and procedures
380
        description['procedures'] = list(set(description.get('procedures', []) + (procedures or [])))
2✔
381
        description['projects'] = list(set(description.get('projects', []) + (projects or [])))
2✔
382
        is_main_sync = (hardware_settings or {}).get('MAIN_SYNC', False)
2✔
383
        # Add sync key if required
384
        if is_main_sync and 'sync' not in description:
2✔
385
            description['sync'] = {
2✔
386
                'bpod': {'collection': task_collection, 'acquisition_software': 'pybpod', 'extension': '.jsonable'}
387
            }
388
        # Add task
389
        task = {task_protocol: {'collection': task_collection}}
2✔
390
        if not is_main_sync:
2✔
391
            task[task_protocol]['sync_label'] = 'bpod'
2✔
392
        if extractors:
2✔
393
            assert isinstance(extractors, list), 'extractors parameter must be a list of strings'
2✔
394
            task[task_protocol].update({'extractors': extractors})
2✔
395
        if 'tasks' not in description:
2✔
396
            description['tasks'] = [task]
2✔
397
        else:
398
            description['tasks'].append(task)
2✔
399
        return description
2✔
400

401
    def _make_task_parameters_dict(self):
2✔
402
        """
403
        Create dictionary that will be saved to the settings json file for extraction.
404

405
        Returns
406
        -------
407
        dict
408
            A dictionary that will be saved to the settings json file for extraction.
409
        """
410
        output_dict = dict(self.task_params)  # Grab parameters from task_params session
2✔
411
        output_dict.update(self.hardware_settings.model_dump())  # Update dict with hardware settings from session
2✔
412
        output_dict.update(dict(self.session_info))  # Update dict with session_info (subject, procedure, projects)
2✔
413
        patch_dict = {  # Various values added to ease transition from iblrig v7 to v8, different home may be desired
2✔
414
            'IBLRIG_VERSION': iblrig.__version__,
415
            'PYBPOD_PROTOCOL': self.protocol_name,
416
            'ALYX_USER': self.iblrig_settings.ALYX_USER,
417
            'ALYX_LAB': self.iblrig_settings.ALYX_LAB,
418
        }
419
        with contextlib.suppress(importlib.metadata.PackageNotFoundError):
2✔
420
            patch_dict['PROJECT_EXTRACTION_VERSION'] = importlib.metadata.version('project_extraction')
2✔
421
        output_dict.update(patch_dict)
2✔
422
        return output_dict
2✔
423

424
    def save_task_parameters_to_json_file(self, destination_folder: Path | None = None) -> Path:
2✔
425
        """
426
        Collects the various settings and parameters of the session and outputs them to a JSON file.
427

428
        Returns
429
        -------
430
        Path
431
            Path to the resultant JSON file
432
        """
433
        output_dict = self._make_task_parameters_dict()
2✔
434
        if destination_folder:
2✔
UNCOV
435
            json_file = destination_folder.joinpath('_iblrig_taskSettings.raw.json')
×
436
        else:
437
            json_file = self.paths['SETTINGS_FILE_PATH']
2✔
438
        json_file.parent.mkdir(parents=True, exist_ok=True)
2✔
439
        with open(json_file, 'w') as outfile:
2✔
440
            json.dump(output_dict, outfile, indent=4, sort_keys=True, default=str)  # converts datetime objects to string
2✔
441
        return json_file  # PosixPath
2✔
442

443
    @property
2✔
444
    def one(self):
2✔
445
        """ONE getter."""
446
        if self._one is None:
2✔
447
            if self.iblrig_settings['ALYX_URL'] is None:
2✔
448
                return
2✔
UNCOV
449
            info_str = (
×
450
                f"alyx client with user name {self.iblrig_settings['ALYX_USER']} "
451
                + f"and url: {self.iblrig_settings['ALYX_URL']}"
452
            )
NEW
UNCOV
453
            try:
×
NEW
UNCOV
454
                self._one = ONE(
×
455
                    base_url=str(self.iblrig_settings['ALYX_URL']),
456
                    username=self.iblrig_settings['ALYX_USER'],
457
                    mode='remote',
458
                    cache_rest=None,
459
                )
NEW
UNCOV
460
                log.info('instantiated ' + info_str)
×
NEW
UNCOV
461
            except Exception:
×
NEW
UNCOV
462
                log.error(traceback.format_exc())
×
NEW
UNCOV
463
                log.error('could not connect to ' + info_str)
×
464
        return self._one
2✔
465

466
    def register_to_alyx(self):
2✔
467
        """
468
        Registers the session to Alyx.
469

470
        This registers the session using the IBLRegistrationClient class.  This uses the settings
471
        file(s) and experiment description file to extract the session data.  This may be called
472
        any number of times and if the session record already exists in Alyx it will be updated.
473
        If session registration fails, it will be done before extraction in the ibllib pipeline.
474

475
        Note that currently the subject weight is registered once and only once.  The recorded
476
        weight of the first protocol run is used.
477

478
        Water administrations are added separately by this method: it is expected that
479
        `register_session` is first called with no recorded total water. This method will then add
480
        a water administration each time it is called, and should therefore be called only once
481
        after each protocol is run. If water administration registration fails for all protocols,
482
        this will be done before extraction in the ibllib pipline, however, if a water
483
        administration is successfully registered for one protocol and subsequent ones fail to
484
        register, these will not be added before extraction in ibllib and therefore must be
485
        manually added to Alyx.
486

487
        Returns
488
        -------
489
        dict
490
            The registered session record.
491

492
        See Also
493
        --------
494
        ibllib.oneibl.IBLRegistrationClient.register_session - The registration method.
495
        """
496
        if self.session_info['SUBJECT_NAME'] in ('iblrig_test_subject', 'test', 'test_subject'):
2✔
497
            log.warning('Not registering test subject to Alyx')
2✔
498
            return
2✔
499
        if not self.one or self.one.offline:
2✔
500
            return
2✔
501
        try:
2✔
502
            client = IBLRegistrationClient(self.one)
2✔
503
            ses, _ = client.register_session(self.paths.SESSION_FOLDER, register_reward=False)
2✔
504
        except Exception:
2✔
505
            log.error(traceback.format_exc())
2✔
506
            log.error('Could not register session to Alyx')
2✔
507
            return
2✔
508
        # add the water administration if there was water administered
509
        try:
2✔
510
            if self.session_info['TOTAL_WATER_DELIVERED']:
2✔
511
                wa = client.register_water_administration(
2✔
512
                    self.session_info.SUBJECT_NAME,
513
                    self.session_info['TOTAL_WATER_DELIVERED'] / 1000,
514
                    session=ses['url'][-36:],
515
                    water_type=self.task_params.get('REWARD_TYPE', None),
516
                )
517
                log.info(f"Water administered registered in Alyx database: {ses['subject']}, " f"{wa['water_administered']}mL")
2✔
518
        except Exception:
2✔
519
            log.error(traceback.format_exc())
2✔
520
            log.error('Could not register water administration to Alyx')
2✔
521
            return
2✔
522
        return ses
2✔
523

524
    def _execute_mixins_shared_function(self, pattern):
2✔
525
        """
526
        Loop over all methods of the class that start with pattern and execute them.
527

528
        Parameters
529
        ----------
530
        pattern : str
531
            'init_mixin', 'start_mixin', 'stop_mixin', or 'cleanup_mixin'
532
        """
533
        method_names = [method for method in dir(self) if method.startswith(pattern)]
2✔
534
        methods = [getattr(self, method) for method in method_names if inspect.ismethod(getattr(self, method))]
2✔
535
        for meth in methods:
2✔
536
            meth()
2✔
537

538
    @property
2✔
539
    def time_elapsed(self):
2✔
540
        return datetime.datetime.now() - self.init_datetime
2✔
541

542
    def mock(self):
2✔
543
        self.is_mock = True
2✔
544

545
    def create_session(self):
2✔
546
        """
547
        Create the session path and save json parameters in the task collection folder.
548

549
        This will also create the protocol folder.
550
        """
551
        self.paths['TASK_PARAMETERS_FILE'] = self.save_task_parameters_to_json_file()
2✔
552
        # enable file logging
553
        logfile = self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_ibl_log.info-acquisition.log')
2✔
554
        self._setup_loggers(level=self._logger.level, file=logfile)
2✔
555
        # copy the acquisition stub to the remote session folder
556
        sc = BehaviorCopier(self.paths.SESSION_FOLDER, remote_subjects_folder=self.paths['REMOTE_SUBJECT_FOLDER'])
2✔
557
        sc.initialize_experiment(self.experiment_description, overwrite=False)
2✔
558
        self.register_to_alyx()
2✔
559

560
    def run(self):
2✔
561
        """
562
        Common pre-run instructions for all tasks.
563

564
        Defines sigint handler for a graceful exit.
565
        """
566
        # here we make sure we connect to the hardware before writing the session to disk
567
        # this prevents from incrementing endlessly the session number if the hardware fails to connect
568
        self.start_hardware()
2✔
569
        self.create_session()
2✔
570
        # When not running the first chained protocol, we can skip the weighing dialog
571
        first_protocol = int(self.paths.SESSION_RAW_DATA_FOLDER.name.split('_')[-1]) == 0
2✔
572
        if self.session_info.SUBJECT_WEIGHT is None and self.interactive and first_protocol:
2✔
573
            self.session_info.SUBJECT_WEIGHT = graph.numinput(
2✔
574
                'Subject weighing (gr)', f'{self.session_info.SUBJECT_NAME} weight (gr):', nullable=False
575
            )
576

577
        def sigint_handler(*args, **kwargs):
2✔
578
            # create a signal handler for a graceful exit: create a stop flag in the session folder
UNCOV
579
            self.paths.SESSION_FOLDER.joinpath('.stop').touch()
×
UNCOV
580
            log.critical('SIGINT signal detected, will exit at the end of the trial')
×
581

582
        # if upon starting there is a flag just remove it, this is to prevent killing a session in the egg
583
        if self.paths.SESSION_FOLDER.joinpath('.stop').exists():
2✔
UNCOV
584
            self.paths.SESSION_FOLDER.joinpath('.stop').unlink()
×
585

586
        signal.signal(signal.SIGINT, sigint_handler)
2✔
587
        self._run()  # runs the specific task logic i.e. trial loop etc...
2✔
588
        # post task instructions
589
        log.critical('Graceful exit')
2✔
590
        log.info(f'Session {self.paths.SESSION_RAW_DATA_FOLDER}')
2✔
591
        self.session_info.SESSION_END_TIME = datetime.datetime.now().isoformat()
2✔
592
        if self.interactive and not self.wizard:
2✔
593
            self.session_info.POOP_COUNT = graph.numinput(
2✔
594
                'Poop count', f'{self.session_info.SUBJECT_NAME} droppings count:', nullable=True, askint=True
595
            )
596
        self.save_task_parameters_to_json_file()
2✔
597
        self.register_to_alyx()
2✔
598
        self._execute_mixins_shared_function('stop_mixin')
2✔
599
        self._execute_mixins_shared_function('cleanup_mixin')
2✔
600

601
    @abc.abstractmethod
2✔
602
    def start_hardware(self):
2✔
603
        """
604
        Start the hardware.
605

606
        This method doesn't explicitly start the mixins as the order has to be defined in the child classes.
607
        This needs to be implemented in the child classes, and should start and connect to all hardware pieces.
608
        """
UNCOV
609
        pass
×
610

611
    @abc.abstractmethod
2✔
612
    def _run(self):
2✔
613
        pass
×
614

615
    @staticmethod
2✔
616
    def extra_parser():
2✔
617
        """
618
        Specify extra kwargs arguments to expose to the user prior running the task.
619

620
        Make sure you instantiate the parser.
621

622
        Returns
623
        -------
624
        argparse.ArgumentParser
625
            The extra parser instance.
626
        """
627
        parser = argparse.ArgumentParser(add_help=False)
2✔
628
        return parser
2✔
629

630

631
# this class gets called to get the path constructor utility to predict the session path
632
class EmptySession(BaseSession):
2✔
633
    protocol_name = 'empty'
2✔
634

635
    def _run(self):
2✔
636
        pass
2✔
637

638
    def start_hardware(self):
2✔
639
        pass
2✔
640

641

642
class OSCClient(udp_client.SimpleUDPClient):
2✔
643
    """
644
    Handles communication to Bonsai using a UDP Client
645
    OSC channels:
646
        USED:
647
        /t  -> (int)    trial number current
648
        /p  -> (int)    position of stimulus init for current trial
649
        /h  -> (float)  phase of gabor for current trial
650
        /c  -> (float)  contrast of stimulus for current trial
651
        /f  -> (float)  frequency of gabor patch for current trial
652
        /a  -> (float)  angle of gabor patch for current trial
653
        /g  -> (float)  gain of RE to visual stim displacement
654
        /s  -> (float)  sigma of the 2D gaussian of gabor
655
        /e  -> (int)    events transitions  USED BY SOFTCODE HANDLER FUNC
656
        /r  -> (int)    whether to reverse the side contingencies (0, 1)
657
    """
658

659
    OSC_PROTOCOL = {
2✔
660
        'trial_num': dict(mess='/t', type=int),
661
        'position': dict(mess='/p', type=int),
662
        'stim_phase': dict(mess='/h', type=float),
663
        'contrast': dict(mess='/c', type=float),
664
        'stim_freq': dict(mess='/f', type=float),
665
        'stim_angle': dict(mess='/a', type=float),
666
        'stim_gain': dict(mess='/g', type=float),
667
        'stim_sigma': dict(mess='/s', type=float),
668
        # 'stim_reverse': dict(mess='/r', type=int),  # this is not handled by Bonsai
669
    }
670

671
    def __init__(self, port, ip='127.0.0.1'):
2✔
672
        super().__init__(ip, port)
2✔
673

674
    def __del__(self):
2✔
675
        self._sock.close()
2✔
676

677
    def send2bonsai(self, **kwargs):
2✔
678
        """
679
        :param see list of keys in OSC_PROTOCOL
680
        :example: client.send2bonsai(trial_num=6, sim_freq=50)
681
        :return:
682
        """
683
        for k in kwargs:
2✔
684
            if k in self.OSC_PROTOCOL:
2✔
685
                # need to convert basic numpy types to low-level python types for
686
                # punch card generation OSC module, I might as well have written C code
687
                value = kwargs[k].item() if isinstance(kwargs[k], np.generic) else kwargs[k]
2✔
688
                self.send_message(self.OSC_PROTOCOL[k]['mess'], self.OSC_PROTOCOL[k]['type'](value))
2✔
689

690
    def exit(self):
2✔
691
        self.send_message('/x', 1)
2✔
692

693

694
class BonsaiRecordingMixin(BaseSession):
2✔
695
    def init_mixin_bonsai_recordings(self, *args, **kwargs):
2✔
696
        self.bonsai_camera = Bunch({'udp_client': OSCClient(port=7111)})
2✔
697
        self.bonsai_microphone = Bunch({'udp_client': OSCClient(port=7112)})
2✔
698
        self.config = None  # the name of the configuration to run
2✔
699

700
    def stop_mixin_bonsai_recordings(self):
2✔
701
        log.info('Stopping Bonsai recordings')
2✔
702
        self.bonsai_camera.udp_client.exit()
2✔
703
        self.bonsai_microphone.udp_client.exit()
2✔
704

705
    def start_mixin_bonsai_microphone(self):
2✔
706
        if not self.config:
2✔
707
            # Use the first key in the device_cameras map
UNCOV
708
            self.config = next((k for k in self.hardware_settings.device_cameras), None)
×
709
        # The camera workflow on the behaviour computer already contains the microphone recording
710
        # so the device camera workflow and the microphone one are exclusive
711
        if self.config:
2✔
712
            return  # Camera workflow defined; so no need to separately start microphone.
2✔
UNCOV
713
        if not self.task_params.RECORD_SOUND:
×
UNCOV
714
            return  # Sound should not be recorded
×
UNCOV
715
        workflow_file = self.paths.IBLRIG_FOLDER.joinpath(*self.hardware_settings.device_microphone['BONSAI_WORKFLOW'].parts)
×
UNCOV
716
        parameters = {
×
717
            'FileNameMic': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_micData.raw.wav'),
718
            'RecordSound': self.task_params.RECORD_SOUND,
719
        }
UNCOV
720
        call_bonsai(workflow_file, parameters, wait=False, editor=False)
×
UNCOV
721
        log.info('Bonsai microphone recording module loaded: OK')
×
722

723
    @staticmethod
2✔
724
    def _camera_mixin_bonsai_get_workflow_file(cameras: dict | None, name: str) -> Path | None:
2✔
725
        """
726
        Returns the bonsai workflow file for the cameras from the hardware_settings.yaml file.
727

728
        Parameters
729
        ----------
730
        cameras : dict
731
            The hardware settings configuration.
732
        name : {'setup', 'recording'} str
733
            The workflow type.
734

735
        Returns
736
        -------
737
        Path
738
            The workflow path.
739
        """
740
        if cameras is None:
2✔
741
            return None
2✔
742
        return cameras['BONSAI_WORKFLOW'][name]
2✔
743

744
    def start_mixin_bonsai_cameras(self):
2✔
745
        """
746
        Prepare the cameras.
747

748
        Starts the pipeline that aligns the camera focus with the desired borders of rig features.
749
        The actual triggering of the cameras is done in the trigger_bonsai_cameras method.
750
        """
751
        if not self.config:
2✔
752
            # Use the first key in the device_cameras map
753
            try:
2✔
754
                self.config = next(k for k in self.hardware_settings.device_cameras)
2✔
755
            except StopIteration:
×
756
                return
×
757
        configuration = self.hardware_settings.device_cameras[self.config]
2✔
758
        if (workflow_file := self._camera_mixin_bonsai_get_workflow_file(configuration, 'setup')) is None:
2✔
UNCOV
759
            return
×
760

761
        # enable trigger of cameras (so Bonsai can disable it again ... sigh)
762
        if PYSPIN_AVAILABLE:
2✔
UNCOV
763
            from iblrig.video_pyspin import enable_camera_trigger
×
764

UNCOV
765
            enable_camera_trigger(True)
×
766

767
        call_bonsai(workflow_file, wait=True)  # TODO Parameterize using configuration cameras
2✔
768
        log.info('Bonsai cameras setup module loaded: OK')
2✔
769

770
    def trigger_bonsai_cameras(self):
2✔
771
        if not self.config:
2✔
772
            # Use the first key in the device_cameras map
UNCOV
773
            try:
×
UNCOV
774
                self.config = next(k for k in self.hardware_settings.device_cameras)
×
UNCOV
775
            except StopIteration:
×
UNCOV
776
                return
×
777
        configuration = self.hardware_settings.device_cameras[self.config]
2✔
778
        if set(configuration.keys()) != {'BONSAI_WORKFLOW', 'left'}:
2✔
UNCOV
779
            raise NotImplementedError
×
780
        workflow_file = self._camera_mixin_bonsai_get_workflow_file(configuration, 'recording')
2✔
781
        if workflow_file is None:
2✔
UNCOV
782
            return
×
783
        iblrig.path_helper.create_bonsai_layout_from_template(workflow_file)
2✔
784
        # FIXME Use parameters in configuration map
785
        parameters = {
2✔
786
            'FileNameLeft': self.paths.SESSION_FOLDER.joinpath('raw_video_data', '_iblrig_leftCamera.raw.avi'),
787
            'FileNameLeftData': self.paths.SESSION_FOLDER.joinpath('raw_video_data', '_iblrig_leftCamera.frameData.bin'),
788
            'FileNameMic': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_micData.raw.wav'),
789
            'RecordSound': self.task_params.RECORD_SOUND,
790
        }
791
        call_bonsai(workflow_file, parameters, wait=False, editor=False)
2✔
792
        log.info('Bonsai camera recording process started')
2✔
793

794

795
class BonsaiVisualStimulusMixin(BaseSession):
2✔
796
    def init_mixin_bonsai_visual_stimulus(self, *args, **kwargs):
2✔
797
        # camera 7111, microphone 7112
798
        self.bonsai_visual_udp_client = OSCClient(port=7110)
2✔
799

800
    def start_mixin_bonsai_visual_stimulus(self):
2✔
801
        self.choice_world_visual_stimulus()
2✔
802

803
    def stop_mixin_bonsai_visual_stimulus(self):
2✔
804
        log.info('Stopping Bonsai visual stimulus')
2✔
805
        self.bonsai_visual_udp_client.exit()
2✔
806

807
    def send_trial_info_to_bonsai(self):
2✔
808
        """
809
        Send the trial information to Bonsai via UDP.
810

811
        The OSC protocol is documented in iblrig.base_tasks.BonsaiVisualStimulusMixin
812
        """
813
        bonsai_dict = {
2✔
814
            k: self.trials_table[k][self.trial_num]
815
            for k in self.bonsai_visual_udp_client.OSC_PROTOCOL
816
            if k in self.trials_table.columns
817
        }
818

819
        # reverse wheel contingency: if stim_reverse is True we invert stim_gain
820
        if self.trials_table.get('stim_reverse', {}).get(self.trial_num, False):
2✔
UNCOV
821
            bonsai_dict['stim_gain'] = -bonsai_dict['stim_gain']
×
822

823
        self.bonsai_visual_udp_client.send2bonsai(**bonsai_dict)
2✔
824
        log.debug(bonsai_dict)
2✔
825

826
    def run_passive_visual_stim(self, map_time='00:05:00', rate=0.1, sa_time='00:05:00'):
2✔
827
        workflow_file = self.paths.VISUAL_STIM_FOLDER.joinpath('passiveChoiceWorld', 'passiveChoiceWorld_passive.bonsai')
2✔
828
        file_output_rfm = self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_RFMapStim.raw.bin')
2✔
829
        parameters = {
2✔
830
            'Stim.DisplayIndex': self.hardware_settings.device_screen['DISPLAY_IDX'],
831
            'Stim.SpontaneousActivity0.DueTime': sa_time,
832
            'Stim.ReceptiveFieldMappingStim.FileNameRFMapStim': file_output_rfm,
833
            'Stim.ReceptiveFieldMappingStim.MappingTime': map_time,
834
            'Stim.ReceptiveFieldMappingStim.Rate': rate,
835
        }
836
        map_seconds = pd.to_timedelta(map_time).seconds
2✔
837
        sa_seconds = pd.to_timedelta(sa_time).seconds
2✔
838
        log.info(f'Starting spontaneous activity ({sa_seconds} s) and RF mapping stims ({map_seconds} s)')
2✔
839
        s = call_bonsai(workflow_file, parameters, editor=False)
2✔
840
        log.info('Spontaneous activity and RF mapping stims finished')
2✔
841
        return s
2✔
842

843
    def choice_world_visual_stimulus(self):
2✔
844
        if self.task_params.VISUAL_STIMULUS is None:
2✔
UNCOV
845
            return
×
846
        workflow_file = self.paths.VISUAL_STIM_FOLDER.joinpath(self.task_params.VISUAL_STIMULUS)
2✔
847
        parameters = {
2✔
848
            'Stim.DisplayIndex': self.hardware_settings.device_screen['DISPLAY_IDX'],
849
            'Stim.FileNameStimPositionScreen': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_stimPositionScreen.raw.csv'),
850
            'Stim.FileNameSyncSquareUpdate': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_syncSquareUpdate.raw.csv'),
851
            'Stim.FileNamePositions': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_encoderPositions.raw.ssv'),
852
            'Stim.FileNameEvents': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_encoderEvents.raw.ssv'),
853
            'Stim.FileNameTrialInfo': self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_encoderTrialInfo.raw.ssv'),
854
            'Stim.REPortName': self.hardware_settings.device_rotary_encoder['COM_ROTARY_ENCODER'],
855
            'Stim.sync_x': self.task_params.SYNC_SQUARE_X,
856
            'Stim.sync_y': self.task_params.SYNC_SQUARE_Y,
857
            'Stim.TranslationZ': -self.task_params.STIM_TRANSLATION_Z,  # MINUS!!
858
        }
859
        call_bonsai(workflow_file, parameters, wait=False, editor=self.task_params.BONSAI_EDITOR, bootstrap=False)
2✔
860
        log.info('Bonsai visual stimulus module loaded: OK')
2✔
861

862

863
class BpodMixin(BaseSession):
2✔
864
    def _raise_on_undefined_softcode_handler(self, byte: int):
2✔
865
        raise ValueError(f'No handler defined for softcode #{byte}')
2✔
866

867
    def softcode_dictionary(self) -> OrderedDict[int, Callable]:
2✔
868
        """
869
        Returns a softcode handler dict where each key corresponds to the softcode and each value to the
870
        function to be called.
871

872
        This needs to be wrapped this way because
873
            1) we want to be able to inherit this and dynamically add softcode to the dictionry
874
            2) we need to provide the Task object (self) at run time to have the functions with static args
875
        This is tricky as it is unclear if the task object is a copy or a reference when passed here.
876

877

878
        Returns
879
        -------
880
        OrderedDict[int, Callable]
881
            Softcode dictionary
882
        """
883
        softcode_dict = OrderedDict(
2✔
884
            {
885
                SOFTCODE.STOP_SOUND: self.sound['sd'].stop,
886
                SOFTCODE.PLAY_TONE: lambda: self.sound['sd'].play(self.sound['GO_TONE'], self.sound['samplerate']),
887
                SOFTCODE.PLAY_NOISE: lambda: self.sound['sd'].play(self.sound['WHITE_NOISE'], self.sound['samplerate']),
888
                SOFTCODE.TRIGGER_CAMERA: getattr(
889
                    self, 'trigger_bonsai_cameras', lambda: self._raise_on_undefined_softcode_handler(SOFTCODE.TRIGGER_CAMERA)
890
                ),
891
            }
892
        )
893
        return softcode_dict
2✔
894

895
    def init_mixin_bpod(self, *args, **kwargs):
2✔
896
        self.bpod = Bpod()
2✔
897

898
    def stop_mixin_bpod(self):
2✔
899
        self.bpod.close()
2✔
900

901
    def start_mixin_bpod(self):
2✔
902
        if self.hardware_settings['device_bpod']['COM_BPOD'] is None:
2✔
903
            raise ValueError(
2✔
904
                'The value for device_bpod:COM_BPOD in '
905
                'settings/hardware_settings.yaml is null. Please '
906
                'provide a valid port name.'
907
            )
UNCOV
908
        disabled_ports = [x - 1 for x in self.hardware_settings['device_bpod']['DISABLE_BEHAVIOR_INPUT_PORTS']]
×
UNCOV
909
        self.bpod = Bpod(self.hardware_settings['device_bpod']['COM_BPOD'], disable_behavior_ports=disabled_ports)
×
UNCOV
910
        self.bpod.define_rotary_encoder_actions()
×
UNCOV
911
        self.bpod.set_status_led(False)
×
UNCOV
912
        assert self.bpod.is_connected
×
UNCOV
913
        log.info('Bpod hardware module loaded: OK')
×
914
        # make the bpod send spacer signals to the main sync clock for protocol discovery
UNCOV
915
        self.send_spacers()
×
916

917
    def send_spacers(self):
2✔
UNCOV
918
        log.info('Starting task by sending a spacer signal on BNC1')
×
UNCOV
919
        sma = StateMachine(self.bpod)
×
UNCOV
920
        Spacer().add_spacer_states(sma, next_state='exit')
×
UNCOV
921
        self.bpod.send_state_machine(sma)
×
UNCOV
922
        self.bpod.run_state_machine(sma)  # Locks until state machine 'exit' is reached
×
UNCOV
923
        return self.bpod.session.current_trial.export()
×
924

925

926
class Frame2TTLMixin(BaseSession):
2✔
927
    """Frame 2 TTL interface for state machine."""
928

929
    def init_mixin_frame2ttl(self, *args, **kwargs):
2✔
930
        pass
2✔
931

932
    def start_mixin_frame2ttl(self):
2✔
933
        # todo assert calibration
934
        if self.hardware_settings['device_frame2ttl']['COM_F2TTL'] is None:
2✔
935
            raise ValueError(
2✔
936
                'The value for device_frame2ttl:COM_F2TTL in '
937
                'settings/hardware_settings.yaml is null. Please '
938
                'provide a valid port name.'
939
            )
UNCOV
940
        Frame2TTL(
×
941
            port=self.hardware_settings['device_frame2ttl']['COM_F2TTL'],
942
            threshold_dark=self.hardware_settings['device_frame2ttl']['F2TTL_DARK_THRESH'],
943
            threshold_light=self.hardware_settings['device_frame2ttl']['F2TTL_LIGHT_THRESH'],
944
        ).close()
UNCOV
945
        log.info('Frame2TTL: Thresholds set.')
×
946

947

948
class RotaryEncoderMixin(BaseSession):
2✔
949
    """Rotary encoder interface for state machine."""
950

951
    def init_mixin_rotary_encoder(self, *args, **kwargs):
2✔
952
        self.device_rotary_encoder = MyRotaryEncoder(
2✔
953
            all_thresholds=self.task_params.STIM_POSITIONS + self.task_params.QUIESCENCE_THRESHOLDS,
954
            gain=self.task_params.STIM_GAIN,
955
            com=self.hardware_settings.device_rotary_encoder['COM_ROTARY_ENCODER'],
956
            connect=False,
957
        )
958

959
    def start_mixin_rotary_encoder(self):
2✔
960
        if self.hardware_settings['device_rotary_encoder']['COM_ROTARY_ENCODER'] is None:
2✔
961
            raise ValueError(
2✔
962
                'The value for device_rotary_encoder:COM_ROTARY_ENCODER in '
963
                'settings/hardware_settings.yaml is null. Please '
964
                'provide a valid port name.'
965
            )
UNCOV
966
        try:
×
UNCOV
967
            self.device_rotary_encoder.connect()
×
UNCOV
968
        except serial.serialutil.SerialException as e:
×
UNCOV
969
            raise serial.serialutil.SerialException(
×
970
                f'The rotary encoder COM port {self.device_rotary_encoder.RE_PORT} is already in use. This is usually'
971
                f' due to a Bonsai process currently running on the computer. Make sure all Bonsai windows are'
972
                f' closed prior to running the task'
973
            ) from e
UNCOV
974
        except Exception as e:
×
UNCOV
975
            raise Exception(
×
976
                "The rotary encoder couldn't connect. If the bpod is glowing in green,"
977
                'disconnect and reconnect bpod from the computer'
978
            ) from e
UNCOV
979
        log.info('Rotary encoder module loaded: OK')
×
980

981

982
class ValveMixin(BaseSession):
2✔
983
    def init_mixin_valve(self: object):
2✔
984
        self.valve = Bunch({})
2✔
985
        # the template settings files have a date in 2099, so assume that the rig is not calibrated if that is the case
986
        # the assertion on calibration is thrown when starting the device
987
        self.valve['is_calibrated'] = datetime.date.today() >= self.hardware_settings['device_valve']['WATER_CALIBRATION_DATE']
2✔
988
        self.valve['fcn_vol2time'] = scipy.interpolate.pchip(
2✔
989
            self.hardware_settings['device_valve']['WATER_CALIBRATION_WEIGHT_PERDROP'],
990
            self.hardware_settings['device_valve']['WATER_CALIBRATION_OPEN_TIMES'],
991
        )
992

993
    def start_mixin_valve(self):
2✔
994
        # if the rig is not on manual settings, then the reward valve has to be calibrated to run the experiment
UNCOV
995
        assert self.task_params.AUTOMATIC_CALIBRATION is False or self.valve['is_calibrated'], """
×
996
            ##########################################
997
            NO CALIBRATION INFORMATION FOUND IN HARDWARE SETTINGS:
998
            Calibrate the rig or use a manual calibration
999
            PLEASE GO TO the task settings yaml file and set:
1000
                'AUTOMATIC_CALIBRATION': false
1001
                'CALIBRATION_VALUE' = <MANUAL_CALIBRATION>
1002
            ##########################################"""
1003
        # regardless of the calibration method, the reward valve time has to be lower than 1 second
UNCOV
1004
        assert self.compute_reward_time(amount_ul=1.5) < 1, """
×
1005
            ##########################################
1006
                REWARD VALVE TIME IS TOO HIGH!
1007
            Probably because of a BAD calibration file
1008
            Calibrate the rig or use a manual calibration
1009
            PLEASE GO TO the task settings yaml file and set:
1010
                AUTOMATIC_CALIBRATION = False
1011
                CALIBRATION_VALUE = <MANUAL_CALIBRATION>
1012
            ##########################################"""
UNCOV
1013
        log.info('Water valve module loaded: OK')
×
1014

1015
    def compute_reward_time(self, amount_ul=None):
2✔
1016
        amount_ul = self.task_params.REWARD_AMOUNT_UL if amount_ul is None else amount_ul
2✔
1017
        if self.task_params.AUTOMATIC_CALIBRATION:
2✔
1018
            return self.valve['fcn_vol2time'](amount_ul) / 1e3
2✔
1019
        else:  # this is the manual manual calibration value
UNCOV
1020
            return self.task_params.CALIBRATION_VALUE / 3 * amount_ul
×
1021

1022
    def valve_open(self, reward_valve_time):
2✔
1023
        """
1024
        Open the reward valve for a given amount of time and return bpod data.
1025

1026
        Parameters
1027
        ----------
1028
        reward_valve_time : float
1029
            Amount of time in seconds to open the reward valve.
1030
        """
UNCOV
1031
        sma = StateMachine(self.bpod)
×
UNCOV
1032
        sma.add_state(
×
1033
            state_name='valve_open',
1034
            state_timer=reward_valve_time,
1035
            output_actions=[('Valve1', 255), ('BNC1', 255)],  # To FPGA
1036
            state_change_conditions={'Tup': 'exit'},
1037
        )
UNCOV
1038
        self.bpod.send_state_machine(sma)
×
UNCOV
1039
        self.bpod.run_state_machine(sma)  # Locks until state machine 'exit' is reached
×
UNCOV
1040
        return self.bpod.session.current_trial.export()
×
1041

1042

1043
class SoundMixin(BaseSession):
2✔
1044
    """Sound interface methods for state machine."""
1045

1046
    def init_mixin_sound(self):
2✔
1047
        self.sound = Bunch({'GO_TONE': None, 'WHITE_NOISE': None})
2✔
1048
        sound_output = self.hardware_settings.device_sound['OUTPUT']
2✔
1049

1050
        # additional gain factor for bringing the different combinations of sound-cards and amps to the same output level
1051
        # TODO: this needs proper calibration and refactoring
1052
        if self.hardware_settings.device_sound.OUTPUT == 'hifi' and self.hardware_settings.device_sound.AMP_TYPE == 'AMP2X15':
2✔
UNCOV
1053
            amp_gain_factor = 0.25
×
1054
        else:
1055
            amp_gain_factor = 1.0
2✔
1056
        self.task_params.GO_TONE_AMPLITUDE *= amp_gain_factor
2✔
1057
        self.task_params.WHITE_NOISE_AMPLITUDE *= amp_gain_factor
2✔
1058

1059
        # sound device sd is actually the module soundevice imported above.
1060
        # not sure how this plays out when referenced outside of this python file
1061
        self.sound['sd'], self.sound['samplerate'], self.sound['channels'] = sound_device_factory(output=sound_output)
2✔
1062
        # Create sounds and output actions of state machine
1063
        self.sound['GO_TONE'] = iblrig.sound.make_sound(
2✔
1064
            rate=self.sound['samplerate'],
1065
            frequency=self.task_params.GO_TONE_FREQUENCY,
1066
            duration=self.task_params.GO_TONE_DURATION,
1067
            amplitude=self.task_params.GO_TONE_AMPLITUDE * amp_gain_factor,
1068
            fade=0.01,
1069
            chans=self.sound['channels'],
1070
        )
1071
        self.sound['WHITE_NOISE'] = iblrig.sound.make_sound(
2✔
1072
            rate=self.sound['samplerate'],
1073
            frequency=-1,
1074
            duration=self.task_params.WHITE_NOISE_DURATION,
1075
            amplitude=self.task_params.WHITE_NOISE_AMPLITUDE * amp_gain_factor,
1076
            fade=0.01,
1077
            chans=self.sound['channels'],
1078
        )
1079

1080
    def start_mixin_sound(self):
2✔
1081
        """
1082
        Depends on bpod mixin start for hard sound card
1083
        :return:
1084
        """
UNCOV
1085
        assert self.bpod.is_connected, 'The sound mixin depends on the bpod mixin being connected'
×
1086
        # SoundCard config params
NEW
UNCOV
1087
        match self.hardware_settings.device_sound['OUTPUT']:
×
UNCOV
1088
            case 'harp':
×
UNCOV
1089
                assert self.bpod.sound_card is not None, 'No harp sound-card connected to Bpod'
×
UNCOV
1090
                sound.configure_sound_card(
×
1091
                    sounds=[self.sound.GO_TONE, self.sound.WHITE_NOISE],
1092
                    indexes=[self.task_params.GO_TONE_IDX, self.task_params.WHITE_NOISE_IDX],
1093
                    sample_rate=self.sound['samplerate'],
1094
                )
UNCOV
1095
                self.bpod.define_harp_sounds_actions(
×
1096
                    module=self.bpod.sound_card,
1097
                    go_tone_index=self.task_params.GO_TONE_IDX,
1098
                    noise_index=self.task_params.WHITE_NOISE_IDX,
1099
                )
UNCOV
1100
            case 'hifi':
×
UNCOV
1101
                module = self.bpod.get_module('^HiFi')
×
UNCOV
1102
                assert module is not None, 'No HiFi module connected to Bpod'
×
UNCOV
1103
                assert self.hardware_settings.device_sound.COM_SOUND is not None
×
UNCOV
1104
                hifi = HiFi(port=self.hardware_settings.device_sound.COM_SOUND, sampling_rate_hz=self.sound['samplerate'])
×
UNCOV
1105
                hifi.load(index=self.task_params.GO_TONE_IDX, data=self.sound.GO_TONE)
×
UNCOV
1106
                hifi.load(index=self.task_params.WHITE_NOISE_IDX, data=self.sound.WHITE_NOISE)
×
UNCOV
1107
                hifi.push()
×
UNCOV
1108
                hifi.close()
×
UNCOV
1109
                self.bpod.define_harp_sounds_actions(
×
1110
                    module=module,
1111
                    go_tone_index=self.task_params.GO_TONE_IDX,
1112
                    noise_index=self.task_params.WHITE_NOISE_IDX,
1113
                )
UNCOV
1114
            case _:
×
UNCOV
1115
                self.bpod.define_xonar_sounds_actions()
×
UNCOV
1116
        log.info(f"Sound module loaded: OK: {self.hardware_settings.device_sound['OUTPUT']}")
×
1117

1118
    def sound_play_noise(self, state_timer=0.510, state_name='play_noise'):
2✔
1119
        """
1120
        Play the noise sound for the error feedback using bpod state machine.
1121
        :return: bpod current trial export
1122
        """
UNCOV
1123
        return self._sound_play(state_name=state_name, output_actions=[self.bpod.actions.play_tone], state_timer=state_timer)
×
1124

1125
    def sound_play_tone(self, state_timer=0.102, state_name='play_tone'):
2✔
1126
        """
1127
        Play the ready tone beep using bpod state machine.
1128
        :return: bpod current trial export
1129
        """
UNCOV
1130
        return self._sound_play(state_name=state_name, output_actions=[self.bpod.actions.play_tone], state_timer=state_timer)
×
1131

1132
    def _sound_play(self, state_timer=None, output_actions=None, state_name='play_sound'):
2✔
1133
        """Plays a sound using bpod state machine.
1134

1135
        The sound must be defined in the init_mixin_sound method.
1136
        """
UNCOV
1137
        assert state_timer is not None, 'state_timer must be defined'
×
UNCOV
1138
        assert output_actions is not None, 'output_actions must be defined'
×
UNCOV
1139
        sma = StateMachine(self.bpod)
×
UNCOV
1140
        sma.add_state(
×
1141
            state_name=state_name,
1142
            state_timer=state_timer,
1143
            output_actions=[self.bpod.actions.play_tone],
1144
            state_change_conditions={'BNC2Low': 'exit', 'Tup': 'exit'},
1145
        )
UNCOV
1146
        self.bpod.send_state_machine(sma)
×
UNCOV
1147
        self.bpod.run_state_machine(sma)  # Locks until state machine 'exit' is reached
×
UNCOV
1148
        return self.bpod.session.current_trial.export()
×
1149

1150

1151
class NetworkSession(BaseSession):
2✔
1152
    """A mixin for communicating to auxiliary acquisition PC over a network."""
1153

1154
    remote_rigs = None
2✔
1155
    """net.Auxiliaries: An auxiliary services object for communicating with remote devices."""
2✔
1156
    exp_ref = None
2✔
1157
    """dict: The experiment reference (i.e. subject, date, sequence) as returned by main remote service."""
2✔
1158

1159
    def __init__(self, *_, remote_rigs=None, **kwargs):
2✔
1160
        """
1161
        A mixin for communicating to auxiliary acquisition PC over a network.
1162

1163
        This should retrieve the services list, i.e. the list of available auxiliary rigs,
1164
        and determine which is the main sync. The main sync is the rig that determines the
1165
        experiment.
1166

1167
        The services list is in a yaml file somewhere, called 'remote_rigs.yaml' and should
1168
        be a map of device name to URI. These are then selectable in the GUI and the URI of
1169
        those selected are added to the experiment description.
1170

1171
        Subclasses should add their callbacks within init by calling :meth:`self.remote_rigs.services.assign_callback`.
1172

1173
        Parameters
1174
        ----------
1175
        remote_rigs : list, dict
1176
            Either a list of remote device names (in which case URI is looked up from remote devices
1177
            file), or a map of device name to URI.
1178
        kwargs
1179
            Optional args such as 'file_iblrig_settings' for defining location of remote data folder
1180
            when loading remote devices file.
1181
        """
1182
        if isinstance(remote_rigs, list):
2✔
1183
            # For now we flatten to list of remote rig names but could permit list of (name, URI) tuples
1184
            remote_rigs = list(filter(None, flatten(remote_rigs)))
2✔
1185
            all_remote_rigs = net.get_remote_devices(iblrig_settings=kwargs.get('iblrig_settings', None))
2✔
1186
            if not set(remote_rigs).issubset(all_remote_rigs.keys()):
2✔
1187
                raise ValueError('Selected remote rigs not in remote rigs list')
2✔
UNCOV
1188
            remote_rigs = {k: v for k, v in all_remote_rigs.items() if k in remote_rigs}
×
1189
        # Load and connect to remote services
1190
        self.connect(remote_rigs)
2✔
1191
        self.exp_ref = {}
2✔
1192
        try:
2✔
1193
            super().__init__(**kwargs)
2✔
1194
        except Exception as ex:
2✔
1195
            self.cleanup_mixin_network()
2✔
1196
            raise ex
2✔
1197

1198
    @property
2✔
1199
    def one(self):
2✔
1200
        """Return ONE instance.
1201

1202
        Unlike super class getter, this method will always instantiate ONE, allowing subclasses to update with an Alyx
1203
        token from a remotely connected rig.  This instance is used for formatting the experiment reference string.
1204

1205
        Returns
1206
        -------
1207
        one.api.One
1208
            An instance of ONE.
1209
        """
1210
        if super().one is None:
2✔
UNCOV
1211
            self._one = OneAlyx(silent=True, mode='local')
×
1212
        return self._one
2✔
1213

1214
    def connect(self, remote_rigs):
2✔
1215
        """
1216
        Connect to remote services.
1217

1218
        Instantiates the Communicator objects that establish connections with each remote device.
1219
        This also creates the thread that uses asynchronous callbacks.
1220

1221
        Parameters
1222
        ----------
1223
        remote_rigs : dict
1224
            A map of name to URI.
1225
        """
1226
        self.remote_rigs = net.Auxiliaries(remote_rigs or {})
2✔
1227
        assert not remote_rigs or self.remote_rigs.is_connected
2✔
1228
        # Handle termination event by graciously completing thread
1229
        signal.signal(signal.SIGTERM, lambda sig, frame: self.cleanup_mixin_network())
2✔
1230

1231
    def _init_paths(self, append: bool = False):
2✔
1232
        """
1233
        Determine session paths.
1234

1235
        Unlike :meth:`BaseSession._init_paths`, this method determines the session number from the remote main sync if
1236
        connected.
1237

1238
        Parameters
1239
        ----------
1240
        append : bool
1241
            Iterate task collection within today's most recent session folder for the selected subject, instead of
1242
            iterating session number.
1243

1244
        Returns
1245
        -------
1246
        iblutil.util.Bunch
1247
            A bunch of paths.
1248
        """
1249
        if self.hardware_settings.MAIN_SYNC:
2✔
1250
            return BaseSession._init_paths(self, append)
2✔
1251
        # Check if we have rigs connected
1252
        if not self.remote_rigs.is_connected:
2✔
1253
            log.warning('No remote rigs; experiment reference may not match the main sync.')
2✔
1254
            return BaseSession._init_paths(self, append)
2✔
1255
        # Set paths in a similar way to the super class
1256
        rig_computer_paths = iblrig.path_helper.get_local_and_remote_paths(
2✔
1257
            local_path=self.iblrig_settings['iblrig_local_data_path'],
1258
            remote_path=self.iblrig_settings['iblrig_remote_data_path'],
1259
            lab=self.iblrig_settings['ALYX_LAB'],
1260
            iblrig_settings=self.iblrig_settings,
1261
        )
1262
        paths = Bunch({'IBLRIG_FOLDER': BASE_PATH})
2✔
1263
        paths.BONSAI = BONSAI_EXE
2✔
1264
        paths.VISUAL_STIM_FOLDER = paths.IBLRIG_FOLDER.joinpath('visual_stim')
2✔
1265
        paths.LOCAL_SUBJECT_FOLDER = rig_computer_paths['local_subjects_folder']
2✔
1266
        paths.REMOTE_SUBJECT_FOLDER = rig_computer_paths['remote_subjects_folder']
2✔
1267
        date_folder = paths.LOCAL_SUBJECT_FOLDER.joinpath(
2✔
1268
            self.session_info.SUBJECT_NAME, self.session_info.SESSION_START_TIME[:10]
1269
        )
1270
        assert self.exp_ref
2✔
1271
        paths.SESSION_FOLDER = date_folder / f'{self.exp_ref["sequence"]:03}'
2✔
1272
        paths.TASK_COLLECTION = iblrig.path_helper.iterate_collection(paths.SESSION_FOLDER)
2✔
1273
        if append == paths.TASK_COLLECTION.endswith('00'):
2✔
1274
            raise ValueError(
2✔
1275
                f'Append value incorrect. Either remove previous task collections from '
1276
                f'{paths.SESSION_FOLDER}, or select append in GUI (--append arg in cli)'
1277
            )
1278

1279
        paths.SESSION_RAW_DATA_FOLDER = paths.SESSION_FOLDER.joinpath(paths.TASK_COLLECTION)
2✔
1280
        paths.DATA_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskData.raw.jsonable')
2✔
1281
        paths.SETTINGS_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskSettings.raw.json')
2✔
1282
        self.session_info.SESSION_NUMBER = int(paths.SESSION_FOLDER.name)
2✔
1283
        return paths
2✔
1284

1285
    def run(self):
2✔
1286
        """Run session and report exceptions to remote services."""
1287
        self.start_mixin_network()
2✔
1288
        try:
2✔
1289
            return super().run()
2✔
1290
        except Exception as e:
2✔
1291
            # Communicate error to services
1292
            if self.remote_rigs.is_connected:
2✔
1293
                tb = e.__traceback__  # TODO maybe go one level down with tb_next?
2✔
1294
                details = {
2✔
1295
                    'error': e.__class__.__name__,  # exception name str
1296
                    'message': str(e),  # error str
1297
                    'traceback': traceback.format_exc(),  # stack str
1298
                    'file': tb.tb_frame.f_code.co_filename,  # filename str
1299
                    'line_no': (tb.tb_lineno, tb.tb_lasti),  # (int, int)
1300
                }
1301
                self.remote_rigs.push(ExpMessage.EXPINTERRUPT, details, wait=True)
2✔
1302
                self.cleanup_mixin_network()
2✔
1303
            raise e
2✔
1304

1305
    def communicate(self, message, *args, raise_on_exception=True):
2✔
1306
        """
1307
        Communicate message to remote services.
1308

1309
        This method is blocking and by default will raise if not all responses received in time.
1310

1311
        Parameters
1312
        ----------
1313
        message : iblutil.io.net.base.ExpMessage, str, int
1314
            An experiment message to send to remote services.
1315
        args
1316
            One or more optional variables to send.
1317
        raise_on_exception : bool
1318
            If true, exceptions arising from message timeouts will be re-raised in main thread and
1319
            services will be cleaned up. Only applies when wait is true.
1320

1321
        Returns
1322
        -------
1323
        Exception | dict
1324
            If raise_on_exception is False, returns an exception if failed to receive all responses
1325
            in time, otherwise a map of service name to response is returned.
1326
        """
1327
        r = self.remote_rigs.push(message, *args, wait=True)
2✔
1328
        if isinstance(r, Exception):
2✔
1329
            log.error('Error on %s network mixin: %s', message, r)
2✔
1330
            if raise_on_exception:
2✔
1331
                self.cleanup_mixin_network()
2✔
1332
                raise r
2✔
1333
        return r
2✔
1334

1335
    def get_exp_info(self):
2✔
1336
        ref = self.exp_ref or None
2✔
1337
        if isinstance(ref, dict) and self.one:
2✔
UNCOV
1338
            ref = self.one.dict2ref(ref)
×
1339
        is_main_sync = self.hardware_settings.get('MAIN_SYNC', False)
2✔
1340
        info = net.ExpInfo(ref, is_main_sync, self.experiment_description, master=is_main_sync)
2✔
1341
        return info.to_dict()
2✔
1342

1343
    def init_mixin_network(self):
2✔
1344
        """Initialize remote services.
1345

1346
        This method sends an EXPINFO message to all services, expecting exactly one of the responses
1347
        to contain main_sync: True, along with the experiment reference to use. It then sends an
1348
        EXPINIT message to all services.
1349
        """
1350
        if not self.remote_rigs.is_connected:
2✔
1351
            return
2✔
1352
        # Determine experiment reference from main sync
1353
        is_main_sync = self.hardware_settings.get('MAIN_SYNC', False)
2✔
1354
        if is_main_sync:
2✔
1355
            raise NotImplementedError
2✔
1356
        assert self.one
2✔
1357

1358
        expinfo = self.get_exp_info() | {'subject': self.session_info['SUBJECT_NAME']}
2✔
1359
        r = self.communicate('EXPINFO', 'CONNECTED', expinfo)
2✔
1360
        assert sum(x[-1]['main_sync'] for x in r.values()) == 1, 'one main sync expected'
2✔
1361
        main_rig_name, (status, info) = next((k, v) for k, v in r.items() if v[-1]['main_sync'])
2✔
1362
        self.exp_ref = self.one.ref2dict(info['exp_ref']) if isinstance(info['exp_ref'], str) else info['exp_ref']
2✔
1363
        if self.exp_ref['subject'] != self.session_info['SUBJECT_NAME']:
2✔
1364
            log.error(
2✔
1365
                'Running task for "%s" but main sync returned exp ref for "%s".',
1366
                self.session_info['SUBJECT_NAME'],
1367
                self.exp_ref['subject'],
1368
            )
1369
            raise ValueError("Subject name doesn't match remote session on " + main_rig_name)
2✔
1370
        if str(self.exp_ref['date']) != self.session_info['SESSION_START_TIME'][:10]:
2✔
1371
            raise RuntimeError(
2✔
1372
                f'Session dates do not match between this rig and {main_rig_name}. \n'
1373
                f'Running past or future sessions not currently supported. \n'
1374
                f'Please check the system date time settings on each rig.'
1375
            )
1376

1377
        # exp_ref = ConversionMixin.path2ref(self.paths['SESSION_FOLDER'], as_dict=False)
1378
        exp_ref = self.one.dict2ref(self.exp_ref)
2✔
1379
        self.communicate('EXPINIT', {'exp_ref': exp_ref})
2✔
1380

1381
    def start_mixin_network(self):
2✔
1382
        """Start remote services.
1383

1384
        This method sends an EXPSTART message to all services, along with an exp_ref string.
1385
        Responses are required but ignored.
1386
        """
1387
        if not self.remote_rigs.is_connected:
2✔
1388
            return
2✔
1389
        self.communicate('EXPSTART', self.exp_ref)
2✔
1390

1391
    def stop_mixin_network(self):
2✔
1392
        """Start remote services.
1393

1394
        This method sends an EXPEND message to all services. Responses are required but ignored.
1395
        """
1396
        if not self.remote_rigs.is_connected:
2✔
1397
            return
2✔
1398
        self.communicate('EXPEND')
2✔
1399

1400
    def cleanup_mixin_network(self):
2✔
1401
        """Clean up services."""
1402
        self.remote_rigs.close()
2✔
1403
        if self.remote_rigs.is_connected:
2✔
1404
            log.warning('Failed to properly clean up network mixin')
2✔
1405

1406

1407
class SpontaneousSession(BaseSession):
2✔
1408
    """
1409
    A Spontaneous task doesn't have trials, it just runs until the user stops it.
1410

1411
    It is used to get extraction structure for data streams
1412
    """
1413

1414
    def __init__(self, duration_secs=None, **kwargs):
2✔
1415
        super().__init__(**kwargs)
2✔
1416
        self.duration_secs = duration_secs
2✔
1417

1418
    def start_hardware(self):
2✔
1419
        pass  # no mixin here, life is but a dream
2✔
1420

1421
    def _run(self):
2✔
1422
        """Run the task with the actual state machine."""
1423
        log.info('Starting spontaneous acquisition')
2✔
1424
        while True:
2✔
1425
            time.sleep(1.5)
2✔
1426
            if self.duration_secs is not None and self.time_elapsed.seconds > self.duration_secs:
2✔
1427
                break
2✔
UNCOV
1428
            if self.paths.SESSION_FOLDER.joinpath('.stop').exists():
×
UNCOV
1429
                self.paths.SESSION_FOLDER.joinpath('.stop').unlink()
×
UNCOV
1430
                break
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc