• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

int-brain-lab / iblrig / 10568073180

26 Aug 2024 10:13PM UTC coverage: 47.538% (+0.7%) from 46.79%
10568073180

Pull #711

github

eeff82
web-flow
Merge 599c9edfb into ad41db25f
Pull Request #711: 8.23.2

121 of 135 new or added lines in 8 files covered. (89.63%)

1025 existing lines in 22 files now uncovered.

4084 of 8591 relevant lines covered (47.54%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.28
/iblrig_tasks/_iblrig_tasks_neuroModulatorChoiceWorld/task.py
1
import logging
2✔
2

3
import numpy as np
2✔
4

5
import iblrig.misc
2✔
6
from iblrig.base_choice_world import BiasedChoiceWorldSession
2✔
7
from iblrig.hardware import SOFTCODE
2✔
8
from pybpodapi.protocol import StateMachine
2✔
9

10
REWARD_AMOUNTS_UL = (1, 3)
2✔
11
log = logging.getLogger(__name__)
2✔
12

13

14
class Session(BiasedChoiceWorldSession):
2✔
15
    protocol_name = '_iblrig_tasks_neuromodulatorChoiceWorld'
2✔
16

17
    def __init__(self, *args, **kwargs):
2✔
18
        super().__init__(*args, **kwargs)
2✔
19
        self.trials_table['omit_feedback'] = np.zeros(self.trials_table.shape[0], dtype=bool)
2✔
20
        self.trials_table['choice_delay'] = np.zeros(self.trials_table.shape[0], dtype=np.float32)
2✔
21

22
    def next_trial(self):
2✔
23
        super().next_trial()
2✔
24
        # then there is a probability of omitting feedback regardless of the choice
25
        self.trials_table.at[self.trial_num, 'omit_feedback'] = np.random.random() < self.task_params.OMIT_FEEDBACK_PROBABILITY
2✔
26

27
        # then drawing the delay for the choice
28
        choice_delay_strategy = 'binned'
2✔
29
        if choice_delay_strategy == 'binary':  # this is a choice with probabilities 1/3 2/3
2✔
UNCOV
30
            self.trials_table.at[self.trial_num, 'choice_delay'] = np.random.choice([1.5, 3.0], p=[2 / 3, 1 / 3])
×
31
        elif choice_delay_strategy == 'uniform':  # uniform probability draw between 1.5s and 3s
2✔
UNCOV
32
            self.trials_table.at[self.trial_num, 'choice_delay'] = np.random.random() * 1.5 + 1.5
×
33
        elif choice_delay_strategy == 'binned':  # 5 valures from 0 to 2.5 secs The "Charline Way"
2✔
34
            self.trials_table.at[self.trial_num, 'choice_delay'] = np.random.choice(np.linspace(0, 2.5, 3))
2✔
35

36
        if self.task_params.VARIABLE_REWARDS:
2✔
37
            # the reward is a draw within an uniform distribution between 3 and 1
38
            reward_amount = 1.5 if self.block_num == 0 else np.random.choice(REWARD_AMOUNTS_UL, p=[0.8, 0.2])
2✔
39
            self.trials_table.at[self.trial_num, 'reward_amount'] = reward_amount
2✔
40

41
    @property
2✔
42
    def omit_feedback(self):
2✔
43
        return self.trials_table.at[self.trial_num, 'omit_feedback']
2✔
44

45
    @property
2✔
46
    def choice_to_feedback_delay(self):
2✔
47
        return self.trials_table.at[self.trial_num, 'choice_delay']
2✔
48

49
    def get_state_machine_trial(self, i):
2✔
50
        sma = StateMachine(self.bpod)
2✔
51

52
        if i == 0:  # First trial exception start camera
2✔
53
            session_delay_start = self.task_params.get('SESSION_DELAY_START', 0)
2✔
54
            log.info('First trial initializing, will move to next trial only if:')
2✔
55
            log.info('1. camera is detected')
2✔
56
            log.info(f'2. {session_delay_start} sec have elapsed')
2✔
57
            sma.add_state(
2✔
58
                state_name='trial_start',
59
                state_timer=0,
60
                state_change_conditions={'Port1In': 'delay_initiation'},
61
                output_actions=[('SoftCode', SOFTCODE.TRIGGER_CAMERA), ('BNC1', 255)],
62
            )  # start camera
63
            sma.add_state(
2✔
64
                state_name='delay_initiation',
65
                state_timer=session_delay_start,
66
                output_actions=[],
67
                state_change_conditions={'Tup': 'reset_rotary_encoder'},
68
            )
69
        else:
UNCOV
70
            sma.add_state(
×
71
                state_name='trial_start',
72
                state_timer=0,  # ~100µs hardware irreducible delay
73
                state_change_conditions={'Tup': 'reset_rotary_encoder'},
74
                output_actions=[self.bpod.actions.stop_sound, ('BNC1', 255)],
75
            )  # stop all sounds
76

77
        sma.add_state(
2✔
78
            state_name='reset_rotary_encoder',
79
            state_timer=0,
80
            output_actions=[self.bpod.actions.rotary_encoder_reset],
81
            state_change_conditions={'Tup': 'quiescent_period'},
82
        )
83

84
        sma.add_state(  # '>back' | '>reset_timer'
2✔
85
            state_name='quiescent_period',
86
            state_timer=self.quiescent_period,
87
            output_actions=[],
88
            state_change_conditions={
89
                'Tup': 'stim_on',
90
                self.movement_left: 'reset_rotary_encoder',
91
                self.movement_right: 'reset_rotary_encoder',
92
            },
93
        )
94

95
        sma.add_state(
2✔
96
            state_name='stim_on',
97
            state_timer=0.1,
98
            output_actions=[self.bpod.actions.bonsai_show_stim],
99
            state_change_conditions={'Tup': 'interactive_delay', 'BNC1High': 'interactive_delay', 'BNC1Low': 'interactive_delay'},
100
        )
101

102
        sma.add_state(
2✔
103
            state_name='interactive_delay',
104
            state_timer=self.task_params.INTERACTIVE_DELAY,
105
            output_actions=[],
106
            state_change_conditions={'Tup': 'play_tone'},
107
        )
108

109
        sma.add_state(
2✔
110
            state_name='play_tone',
111
            state_timer=0.1,
112
            output_actions=[self.bpod.actions.play_tone, ('BNC1', 255)],
113
            state_change_conditions={'Tup': 'reset2_rotary_encoder', 'BNC2High': 'reset2_rotary_encoder'},
114
        )
115

116
        sma.add_state(
2✔
117
            state_name='reset2_rotary_encoder',
118
            state_timer=0.05,
119
            output_actions=[self.bpod.actions.rotary_encoder_reset],
120
            state_change_conditions={'Tup': 'closed_loop'},
121
        )
122

123
        if self.omit_feedback:
2✔
UNCOV
124
            sma.add_state(
×
125
                state_name='closed_loop',
126
                state_timer=self.task_params.RESPONSE_WINDOW,
127
                output_actions=[self.bpod.actions.bonsai_closed_loop],
128
                state_change_conditions={'Tup': 'omit_no_go', self.event_error: 'omit_error', self.event_reward: 'omit_correct'},
129
            )
130
        else:
131
            sma.add_state(
2✔
132
                state_name='closed_loop',
133
                state_timer=self.task_params.RESPONSE_WINDOW,
134
                output_actions=[self.bpod.actions.bonsai_closed_loop],
135
                state_change_conditions={
136
                    'Tup': 'delay_no_go',
137
                    self.event_error: 'delay_error',
138
                    self.event_reward: 'delay_reward',
139
                },
140
            )
141

142
        # here we create 3 separates states to disambiguate the choice of the mouse
143
        # in the output data - apart from the name they are exactly the same state
144
        for state_name in ['omit_error', 'omit_correct', 'omit_no_go']:
2✔
145
            sma.add_state(
2✔
146
                state_name=state_name,
147
                state_timer=(
148
                    self.task_params.FEEDBACK_NOGO_DELAY_SECS
149
                    + self.task_params.FEEDBACK_ERROR_DELAY_SECS
150
                    + self.task_params.FEEDBACK_CORRECT_DELAY_SECS
151
                )
152
                / 3,
153
                output_actions=[],
154
                state_change_conditions={'Tup': 'hide_stim'},
155
            )
156

157
        sma.add_state(
2✔
158
            state_name='delay_no_go',
159
            state_timer=self.choice_to_feedback_delay,
160
            state_change_conditions={'Tup': 'no_go'},
161
            output_actions=[],
162
        )
163

164
        sma.add_state(
2✔
165
            state_name='no_go',
166
            state_timer=self.task_params.FEEDBACK_NOGO_DELAY_SECS,
167
            output_actions=[self.bpod.actions.bonsai_hide_stim, self.bpod.actions.play_noise],
168
            state_change_conditions={'Tup': 'exit_state'},
169
        )
170

171
        sma.add_state(
2✔
172
            state_name='delay_error',
173
            state_timer=self.choice_to_feedback_delay,
174
            state_change_conditions={'Tup': 'freeze_error'},
175
            output_actions=[],
176
        )
177

178
        sma.add_state(
2✔
179
            state_name='freeze_error',
180
            state_timer=0,
181
            output_actions=[self.bpod.actions.bonsai_freeze_stim],
182
            state_change_conditions={'Tup': 'error'},
183
        )
184

185
        sma.add_state(
2✔
186
            state_name='error',
187
            state_timer=self.task_params.FEEDBACK_ERROR_DELAY_SECS,
188
            output_actions=[self.bpod.actions.play_noise],
189
            state_change_conditions={'Tup': 'hide_stim'},
190
        )
191

192
        sma.add_state(
2✔
193
            state_name='delay_reward',
194
            state_timer=self.choice_to_feedback_delay,
195
            state_change_conditions={'Tup': 'freeze_reward'},
196
            output_actions=[],
197
        )
198

199
        sma.add_state(
2✔
200
            state_name='freeze_reward',
201
            state_timer=0,
202
            output_actions=[self.bpod.actions.bonsai_freeze_stim],
203
            state_change_conditions={'Tup': 'reward'},
204
        )
205

206
        sma.add_state(
2✔
207
            state_name='reward',
208
            state_timer=self.reward_time,
209
            output_actions=[('Valve1', 255)],
210
            state_change_conditions={'Tup': 'correct'},
211
        )
212

213
        sma.add_state(
2✔
214
            state_name='correct',
215
            state_timer=self.task_params.FEEDBACK_CORRECT_DELAY_SECS,
216
            output_actions=[],
217
            state_change_conditions={'Tup': 'hide_stim'},
218
        )
219

220
        sma.add_state(
2✔
221
            state_name='hide_stim',
222
            state_timer=0.1,
223
            output_actions=[self.bpod.actions.bonsai_hide_stim],
224
            state_change_conditions={'Tup': 'exit_state', 'BNC1High': 'exit_state', 'BNC1Low': 'exit_state'},
225
        )
226

227
        sma.add_state(
2✔
228
            state_name='exit_state', state_timer=0.5, output_actions=[('BNC1', 255)], state_change_conditions={'Tup': 'exit'}
229
        )
230
        return sma
2✔
231

232

233
class SessionRelatedBlocks(Session):
2✔
234
    """
235
    In this scenario, the blocks define a poor and a rich side.
236
    The probability blocks and reward blocks structure is staggered so that we explore all configurations every 4 blocks
237
    P0 P1 P2 P1 P2 P1 P2 P1 P2
238
    R0 R1 R1 R2 R2 R1 R1 R2 R2
239
    """
240

241
    # from iblrig_tasks._iblrig_tasks_neuroModulatorChoiceWorld.task import SessionRelatedBlocks
242
    # sess = SessionRelatedBlocks()
243
    def __init__(self, *args, **kwargs):
2✔
UNCOV
244
        super().__init__(*args, **kwargs)
×
UNCOV
245
        self.trials_table['omit_feedback'] = np.zeros(self.trials_table.shape[0], dtype=bool)
×
UNCOV
246
        self.trials_table['choice_delay'] = np.zeros(self.trials_table.shape[0], dtype=np.float32)
×
UNCOV
247
        self.trials_table['probability_left_rich'] = np.zeros(self.trials_table.shape[0], dtype=np.float32)
×
UNCOV
248
        self.blocks_table['probability_left_rich'] = np.zeros(self.blocks_table.shape[0], dtype=np.float32)
×
UNCOV
249
        self.BLOCK_REWARD_STAGGER = np.random.randint(0, 2)
×
250

251
    def new_block(self):
2✔
UNCOV
252
        super(Session, self).new_block()
×
UNCOV
253
        if self.block_num == 0:
×
UNCOV
254
            probability_left_rich = 0.5
×
UNCOV
255
        elif int((self.block_num + self.BLOCK_REWARD_STAGGER) / 2 % 2):
×
UNCOV
256
            probability_left_rich = 0.8
×
257
        else:
258
            probability_left_rich = 0.2
×
259
        self.blocks_table.at[self.block_num, 'probability_left_rich'] = probability_left_rich
×
260

261
    def next_trial(self):
2✔
262
        super().next_trial()
×
UNCOV
263
        self.trials_table.at[self.trial_num, 'reward_amount'] = self.draw_reward_amount()
×
UNCOV
264
        prich = self.blocks_table.loc[self.block_num, 'probability_left_rich']
×
265
        self.trials_table.at[self.trial_num, 'probability_left_rich'] = prich
×
266

267
    def draw_reward_amount(self):
2✔
268
        # FIXME check: this has 0.5 probability of being correct !!!
269
        reward_amounts = (1, 3)  # poor and rich
×
270
        plr = self.blocks_table.at[self.block_num, 'probability_left_rich']
×
UNCOV
271
        if np.sign(self.position):  # noqa: SIM108
×
272
            probas = [plr, (1 - plr)]  # right
×
273
        else:
UNCOV
274
            probas = [(1 - plr), plr]  # left
×
UNCOV
275
        return np.random.choice(reward_amounts, p=probas)
×
276

277

278
if __name__ == '__main__':  # pragma: no cover
279
    kwargs = iblrig.misc.get_task_arguments(parents=[Session.extra_parser()])
280
    sess = Session(**kwargs)
281
    sess.run()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc