• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 12279337432

11 Dec 2024 03:15PM UTC coverage: 47.031% (+0.2%) from 46.79%
12279337432

Pull #751

github

d4edef
web-flow
Merge eea51f2f7 into 2f9d65d86
Pull Request #751: Fiber trajectory GUI

0 of 114 new or added lines in 1 file covered. (0.0%)

1076 existing lines in 22 files now uncovered.

4246 of 9028 relevant lines covered (47.03%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.16
/iblrig_tasks/_iblrig_tasks_neuroModulatorChoiceWorld/task.py
1
import logging
2✔
2

3
import numpy as np
2✔
4
from pydantic import NonNegativeFloat
2✔
5

6
import iblrig.misc
2✔
7
from iblrig.base_choice_world import BiasedChoiceWorldSession, BiasedChoiceWorldTrialData
2✔
8
from iblrig.hardware import SOFTCODE
2✔
9
from pybpodapi.protocol import StateMachine
2✔
10

11
REWARD_AMOUNTS_UL = (1, 3)
2✔
12
log = logging.getLogger(__name__)
2✔
13

14

15
class NeuroModulatorChoiceTrialData(BiasedChoiceWorldTrialData):
2✔
16
    omit_feedback: bool
2✔
17
    choice_delay: NonNegativeFloat
2✔
18

19

20
class Session(BiasedChoiceWorldSession):
2✔
21
    protocol_name = '_iblrig_tasks_neuromodulatorChoiceWorld'
2✔
22
    TrialDataModel = NeuroModulatorChoiceTrialData
2✔
23

24
    def __init__(self, *args, **kwargs):
2✔
25
        super().__init__(*args, **kwargs)
2✔
26

27
    def next_trial(self):
2✔
28
        super().next_trial()
2✔
29
        # then there is a probability of omitting feedback regardless of the choice
30
        self.trials_table.at[self.trial_num, 'omit_feedback'] = np.random.random() < self.task_params.OMIT_FEEDBACK_PROBABILITY
2✔
31

32
        # then drawing the delay for the choice
33
        choice_delay_strategy = 'binned'
2✔
34
        if choice_delay_strategy == 'binary':  # this is a choice with probabilities 1/3 2/3
2✔
UNCOV
35
            self.trials_table.at[self.trial_num, 'choice_delay'] = np.random.choice([1.5, 3.0], p=[2 / 3, 1 / 3])
×
36
        elif choice_delay_strategy == 'uniform':  # uniform probability draw between 1.5s and 3s
2✔
UNCOV
37
            self.trials_table.at[self.trial_num, 'choice_delay'] = np.random.random() * 1.5 + 1.5
×
38
        elif choice_delay_strategy == 'binned':  # 5 valures from 0 to 2.5 secs The "Charline Way"
2✔
39
            self.trials_table.at[self.trial_num, 'choice_delay'] = np.random.choice(np.linspace(0, 2.5, 3))
2✔
40

41
        if self.task_params.VARIABLE_REWARDS:
2✔
42
            # the reward is a draw within an uniform distribution between 3 and 1
43
            reward_amount = 1.5 if self.block_num == 0 else np.random.choice(REWARD_AMOUNTS_UL, p=[0.8, 0.2])
2✔
44
            self.trials_table.at[self.trial_num, 'reward_amount'] = reward_amount
2✔
45

46
    @property
2✔
47
    def omit_feedback(self):
2✔
48
        return self.trials_table.at[self.trial_num, 'omit_feedback']
2✔
49

50
    @property
2✔
51
    def choice_to_feedback_delay(self):
2✔
52
        return self.trials_table.at[self.trial_num, 'choice_delay']
2✔
53

54
    def get_state_machine_trial(self, i):
2✔
55
        sma = StateMachine(self.bpod)
2✔
56

57
        if i == 0:  # First trial exception start camera
2✔
58
            session_delay_start = self.task_params.get('SESSION_DELAY_START', 0)
2✔
59
            log.info('First trial initializing, will move to next trial only if:')
2✔
60
            log.info('1. camera is detected')
2✔
61
            log.info(f'2. {session_delay_start} sec have elapsed')
2✔
62
            sma.add_state(
2✔
63
                state_name='trial_start',
64
                state_timer=0,
65
                state_change_conditions={'Port1In': 'delay_initiation'},
66
                output_actions=[('SoftCode', SOFTCODE.TRIGGER_CAMERA), ('BNC1', 255)],
67
            )  # start camera
68
            sma.add_state(
2✔
69
                state_name='delay_initiation',
70
                state_timer=session_delay_start,
71
                output_actions=[],
72
                state_change_conditions={'Tup': 'reset_rotary_encoder'},
73
            )
74
        else:
UNCOV
75
            sma.add_state(
×
76
                state_name='trial_start',
77
                state_timer=0,  # ~100µs hardware irreducible delay
78
                state_change_conditions={'Tup': 'reset_rotary_encoder'},
79
                output_actions=[self.bpod.actions.stop_sound, ('BNC1', 255)],
80
            )  # stop all sounds
81

82
        sma.add_state(
2✔
83
            state_name='reset_rotary_encoder',
84
            state_timer=0,
85
            output_actions=[self.bpod.actions.rotary_encoder_reset],
86
            state_change_conditions={'Tup': 'quiescent_period'},
87
        )
88

89
        sma.add_state(  # '>back' | '>reset_timer'
2✔
90
            state_name='quiescent_period',
91
            state_timer=self.quiescent_period,
92
            output_actions=[],
93
            state_change_conditions={
94
                'Tup': 'stim_on',
95
                self.movement_left: 'reset_rotary_encoder',
96
                self.movement_right: 'reset_rotary_encoder',
97
            },
98
        )
99

100
        sma.add_state(
2✔
101
            state_name='stim_on',
102
            state_timer=0.1,
103
            output_actions=[self.bpod.actions.bonsai_show_stim],
104
            state_change_conditions={'Tup': 'interactive_delay', 'BNC1High': 'interactive_delay', 'BNC1Low': 'interactive_delay'},
105
        )
106

107
        sma.add_state(
2✔
108
            state_name='interactive_delay',
109
            state_timer=self.task_params.INTERACTIVE_DELAY,
110
            output_actions=[],
111
            state_change_conditions={'Tup': 'play_tone'},
112
        )
113

114
        sma.add_state(
2✔
115
            state_name='play_tone',
116
            state_timer=0.1,
117
            output_actions=[self.bpod.actions.play_tone, ('BNC1', 255)],
118
            state_change_conditions={'Tup': 'reset2_rotary_encoder', 'BNC2High': 'reset2_rotary_encoder'},
119
        )
120

121
        sma.add_state(
2✔
122
            state_name='reset2_rotary_encoder',
123
            state_timer=0.05,
124
            output_actions=[self.bpod.actions.rotary_encoder_reset],
125
            state_change_conditions={'Tup': 'closed_loop'},
126
        )
127

128
        if self.omit_feedback:
2✔
UNCOV
129
            sma.add_state(
×
130
                state_name='closed_loop',
131
                state_timer=self.task_params.RESPONSE_WINDOW,
132
                output_actions=[self.bpod.actions.bonsai_closed_loop],
133
                state_change_conditions={'Tup': 'omit_no_go', self.event_error: 'omit_error', self.event_reward: 'omit_correct'},
134
            )
135
        else:
136
            sma.add_state(
2✔
137
                state_name='closed_loop',
138
                state_timer=self.task_params.RESPONSE_WINDOW,
139
                output_actions=[self.bpod.actions.bonsai_closed_loop],
140
                state_change_conditions={
141
                    'Tup': 'delay_no_go',
142
                    self.event_error: 'delay_error',
143
                    self.event_reward: 'delay_reward',
144
                },
145
            )
146

147
        # here we create 3 separates states to disambiguate the choice of the mouse
148
        # in the output data - apart from the name they are exactly the same state
149
        for state_name in ['omit_error', 'omit_correct', 'omit_no_go']:
2✔
150
            sma.add_state(
2✔
151
                state_name=state_name,
152
                state_timer=(
153
                    self.task_params.FEEDBACK_NOGO_DELAY_SECS
154
                    + self.task_params.FEEDBACK_ERROR_DELAY_SECS
155
                    + self.task_params.FEEDBACK_CORRECT_DELAY_SECS
156
                )
157
                / 3,
158
                output_actions=[],
159
                state_change_conditions={'Tup': 'hide_stim'},
160
            )
161

162
        sma.add_state(
2✔
163
            state_name='delay_no_go',
164
            state_timer=self.choice_to_feedback_delay,
165
            state_change_conditions={'Tup': 'no_go'},
166
            output_actions=[],
167
        )
168

169
        sma.add_state(
2✔
170
            state_name='no_go',
171
            state_timer=self.task_params.FEEDBACK_NOGO_DELAY_SECS,
172
            output_actions=[self.bpod.actions.bonsai_hide_stim, self.bpod.actions.play_noise],
173
            state_change_conditions={'Tup': 'exit_state'},
174
        )
175

176
        sma.add_state(
2✔
177
            state_name='delay_error',
178
            state_timer=self.choice_to_feedback_delay,
179
            state_change_conditions={'Tup': 'freeze_error'},
180
            output_actions=[],
181
        )
182

183
        sma.add_state(
2✔
184
            state_name='freeze_error',
185
            state_timer=0,
186
            output_actions=[self.bpod.actions.bonsai_freeze_stim],
187
            state_change_conditions={'Tup': 'error'},
188
        )
189

190
        sma.add_state(
2✔
191
            state_name='error',
192
            state_timer=self.task_params.FEEDBACK_ERROR_DELAY_SECS,
193
            output_actions=[self.bpod.actions.play_noise],
194
            state_change_conditions={'Tup': 'hide_stim'},
195
        )
196

197
        sma.add_state(
2✔
198
            state_name='delay_reward',
199
            state_timer=self.choice_to_feedback_delay,
200
            state_change_conditions={'Tup': 'freeze_reward'},
201
            output_actions=[],
202
        )
203

204
        sma.add_state(
2✔
205
            state_name='freeze_reward',
206
            state_timer=0,
207
            output_actions=[self.bpod.actions.bonsai_freeze_stim],
208
            state_change_conditions={'Tup': 'reward'},
209
        )
210

211
        sma.add_state(
2✔
212
            state_name='reward',
213
            state_timer=self.reward_time,
214
            output_actions=[('Valve1', 255)],
215
            state_change_conditions={'Tup': 'correct'},
216
        )
217

218
        sma.add_state(
2✔
219
            state_name='correct',
220
            state_timer=self.task_params.FEEDBACK_CORRECT_DELAY_SECS,
221
            output_actions=[],
222
            state_change_conditions={'Tup': 'hide_stim'},
223
        )
224

225
        sma.add_state(
2✔
226
            state_name='hide_stim',
227
            state_timer=0.1,
228
            output_actions=[self.bpod.actions.bonsai_hide_stim],
229
            state_change_conditions={'Tup': 'exit_state', 'BNC1High': 'exit_state', 'BNC1Low': 'exit_state'},
230
        )
231

232
        sma.add_state(
2✔
233
            state_name='exit_state', state_timer=0.5, output_actions=[('BNC1', 255)], state_change_conditions={'Tup': 'exit'}
234
        )
235
        return sma
2✔
236

237

238
class SessionRelatedBlocks(Session):
2✔
239
    """
240
    In this scenario, the blocks define a poor and a rich side.
241
    The probability blocks and reward blocks structure is staggered so that we explore all configurations every 4 blocks
242
    P0 P1 P2 P1 P2 P1 P2 P1 P2
243
    R0 R1 R1 R2 R2 R1 R1 R2 R2
244
    """
245

246
    # from iblrig_tasks._iblrig_tasks_neuroModulatorChoiceWorld.task import SessionRelatedBlocks
247
    # sess = SessionRelatedBlocks()
248
    def __init__(self, *args, **kwargs):
2✔
UNCOV
249
        super().__init__(*args, **kwargs)
×
UNCOV
250
        self.trials_table['omit_feedback'] = np.zeros(self.trials_table.shape[0], dtype=bool)
×
UNCOV
251
        self.trials_table['choice_delay'] = np.zeros(self.trials_table.shape[0], dtype=np.float32)
×
UNCOV
252
        self.trials_table['probability_left_rich'] = np.zeros(self.trials_table.shape[0], dtype=np.float32)
×
UNCOV
253
        self.blocks_table['probability_left_rich'] = np.zeros(self.blocks_table.shape[0], dtype=np.float32)
×
UNCOV
254
        self.BLOCK_REWARD_STAGGER = np.random.randint(0, 2)
×
255

256
    def new_block(self):
2✔
257
        super(Session, self).new_block()
×
258
        if self.block_num == 0:
×
259
            probability_left_rich = 0.5
×
260
        elif int((self.block_num + self.BLOCK_REWARD_STAGGER) / 2 % 2):
×
261
            probability_left_rich = 0.8
×
262
        else:
UNCOV
263
            probability_left_rich = 0.2
×
UNCOV
264
        self.blocks_table.at[self.block_num, 'probability_left_rich'] = probability_left_rich
×
265

266
    def next_trial(self):
2✔
267
        super().next_trial()
×
UNCOV
268
        self.trials_table.at[self.trial_num, 'reward_amount'] = self.draw_reward_amount()
×
269
        prich = self.blocks_table.loc[self.block_num, 'probability_left_rich']
×
270
        self.trials_table.at[self.trial_num, 'probability_left_rich'] = prich
×
271

272
    def draw_reward_amount(self):
2✔
273
        # FIXME check: this has 0.5 probability of being correct !!!
UNCOV
274
        reward_amounts = (1, 3)  # poor and rich
×
UNCOV
275
        plr = self.blocks_table.at[self.block_num, 'probability_left_rich']
×
276
        if np.sign(self.position):  # noqa: SIM108
×
277
            probas = [plr, (1 - plr)]  # right
×
278
        else:
279
            probas = [(1 - plr), plr]  # left
×
UNCOV
280
        return np.random.choice(reward_amounts, p=probas)
×
281

282

283
if __name__ == '__main__':  # pragma: no cover
284
    kwargs = iblrig.misc.get_task_arguments(parents=[Session.extra_parser()])
285
    sess = Session(**kwargs)
286
    sess.run()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc