• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

google / scaaml / 13904838322

17 Mar 2025 04:35PM UTC coverage: 85.696%. First build
13904838322

Pull #353

github

web-flow
Merge 72a220bcc into 58956d6f0
Pull Request #353: Allow configuring resolution for PicoScope

15 of 18 new or added lines in 2 files covered. (83.33%)

2672 of 3118 relevant lines covered (85.7%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.84
/scaaml/capture/scope/ps6424e.py
1
"""This code is modified from the ChipWhisperer project:
2
http://www.github.com/newaetech/chipwhisperer"""
3

4
from __future__ import absolute_import
1✔
5

6
import ctypes
1✔
7
from dataclasses import dataclass
1✔
8
from decimal import Decimal, ROUND_HALF_DOWN
1✔
9
import time
1✔
10
import traceback
1✔
11
from typing import Any, Dict, Optional, OrderedDict, Union
1✔
12

13
from chipwhisperer.common.utils import util
1✔
14
import numpy as np
1✔
15
from picosdk.constants import make_enum
1✔
16
from picosdk.ps6000a import ps6000a as ps
1✔
17
from picosdk.PicoDeviceEnums import picoEnum
1✔
18
from picosdk.PicoDeviceStructs import picoStruct
1✔
19
from picosdk.functions import adc2mV, assert_pico_ok
1✔
20
from picosdk.errors import PicoSDKCtypesError
1✔
21

22
from scaaml.capture.scope.scope_template import ScopeTemplate, ScopeTraceType, ScopeTriggerTraceType
1✔
23

24

25
@dataclass
1✔
26
class ChannelRange:
1✔
27
    """API values for channel range."""
28
    range_v: float
1✔
29
    api_value: int
1✔
30
    range_str: str
1✔
31

32

33
def assert_ok(status: int) -> None:  # pragma: no cover
34
    """Check assert_pico_ok and if it raises change PicoSDKCtypesError to
35
    IOError."""
36
    try:
37
        assert_pico_ok(status)
38
    except PicoSDKCtypesError as error:
39
        raise IOError from error
40

41

42
# Workaround until PR #43 in
43
# https://github.com/picotech/picosdk-python-wrappers/ is merged
44
PICO_PORT_DIGITAL_CHANNEL = make_enum([  # pylint: disable=invalid-name
1✔
45
    "PICO_PORT_DIGITAL_CHANNEL0",
46
    "PICO_PORT_DIGITAL_CHANNEL1",
47
    "PICO_PORT_DIGITAL_CHANNEL2",
48
    "PICO_PORT_DIGITAL_CHANNEL3",
49
    "PICO_PORT_DIGITAL_CHANNEL4",
50
    "PICO_PORT_DIGITAL_CHANNEL5",
51
    "PICO_PORT_DIGITAL_CHANNEL6",
52
    "PICO_PORT_DIGITAL_CHANNEL7",
53
])
54

55

56
class CaptureSettings:
1✔
57
    """Channel settings."""
58
    _name = "Capture Setting"
1✔
59

60
    CHANNEL_COUPLINGS: Dict[str, int] = {
1✔
61
        "DC50": picoEnum.PICO_COUPLING["PICO_DC_50OHM"],
62
        "DC": picoEnum.PICO_COUPLING["PICO_DC"],
63
        "AC": picoEnum.PICO_COUPLING["PICO_AC"],
64
    }
65
    CHANNELS: Dict[str, int] = {
1✔
66
        "A": picoEnum.PICO_CHANNEL["PICO_CHANNEL_A"],
67
        "B": picoEnum.PICO_CHANNEL["PICO_CHANNEL_B"],
68
        "C": picoEnum.PICO_CHANNEL["PICO_CHANNEL_C"],
69
        "D": picoEnum.PICO_CHANNEL["PICO_CHANNEL_D"],
70
        "PORT0": picoEnum.PICO_CHANNEL["PICO_PORT0"],
71
        "PORT1": picoEnum.PICO_CHANNEL["PICO_PORT1"],
72
        "External": 4,
73
        "TriggerAux": 5
74
    }
75
    CHANNEL_RANGE: list[ChannelRange] = [
1✔
76
        ChannelRange(
77
            range_v=20E-3,
78
            api_value=1,
79
            range_str="20 mV",
80
        ),
81
        ChannelRange(
82
            range_v=50E-3,
83
            api_value=2,
84
            range_str="50 mV",
85
        ),
86
        ChannelRange(
87
            range_v=100E-3,
88
            api_value=3,
89
            range_str="100 mV",
90
        ),
91
        ChannelRange(
92
            range_v=200E-3,
93
            api_value=4,
94
            range_str="200 mV",
95
        ),
96
        ChannelRange(
97
            range_v=500E-3,
98
            api_value=5,
99
            range_str="500 mV",
100
        ),
101
        ChannelRange(
102
            range_v=1.0,
103
            api_value=6,
104
            range_str="1 V",
105
        ),
106
        ChannelRange(
107
            range_v=2.0,
108
            api_value=7,
109
            range_str="2 V",
110
        ),
111
        ChannelRange(
112
            range_v=5.0,
113
            api_value=8,
114
            range_str="5 V",
115
        ),
116
        ChannelRange(
117
            range_v=10.0,
118
            api_value=9,
119
            range_str="10 V",
120
        ),
121
        ChannelRange(
122
            range_v=20.0,
123
            api_value=10,
124
            range_str="20 V",
125
        ),
126
    ]
127

128
    ATTENUATION = {
1✔
129
        "1:1": 1,
130
        "1:10": 10,
131
    }
132
    REV_ATTENUATION = {1: "1:1", 10: "1:10"}
1✔
133

134
    def __init__(self) -> None:
1✔
135
        self._couplings: Dict[str, int] = {}
1✔
136
        self._rev_couplings: Dict[int, str] = {}
1✔
137
        for name, val in self.CHANNEL_COUPLINGS.items():
1✔
138
            self._couplings[name] = val
1✔
139
            self._rev_couplings[val] = name
1✔
140
        # channels
141
        self._ch_list: Dict[str, int] = {}
1✔
142
        self._rev_ch_list: Dict[int, str] = {}
1✔
143
        for channel_name, channel_id in self.CHANNELS.items():
1✔
144
            self._ch_list[channel_name] = channel_id
1✔
145
            self._rev_ch_list[channel_id] = channel_name
1✔
146
        # ranges
147
        self._ch_range: Dict[float, str] = {}
1✔
148
        self._ch_range_list: list[float] = []
1✔
149
        self._ch_range_api_value: Dict[float, int] = {}
1✔
150
        for key in self.CHANNEL_RANGE:
1✔
151
            self._ch_range[key.range_v] = key.range_str
1✔
152
            self._ch_range_list.append(key.range_v)
1✔
153

154
            self._ch_range_api_value[key.range_v] = key.api_value
1✔
155

156
        self._ch_range_list.sort()
1✔
157

158
        self._channel = 0
1✔
159
        self._port_pin: Optional[int] = None
1✔
160
        self._probe_attenuation = 1
1✔
161
        self._coupling = self._couplings["AC"]
1✔
162
        self._range: float = 5.0
1✔
163
        self.bw_limit: str = "PICO_BW_FULL"
1✔
164

165
    @property
1✔
166
    def ps_api_channel(self) -> int:
1✔
167
        """Channel for PicoScope API."""
168
        return self._channel
1✔
169

170
    @property
1✔
171
    def channel(self) -> str:
1✔
172
        return self._rev_ch_list[self._channel]
×
173

174
    @channel.setter
1✔
175
    def channel(self, val: str) -> None:
1✔
176
        if val not in self._ch_list:
1✔
177
            raise ValueError(f"Unknown channel {val} not in {self._ch_list}")
×
178
        self._channel = self._ch_list[val]
1✔
179

180
        if self.is_digital:
1✔
181
            if self._port_pin is None:
1✔
182
                # Provide a sensible default if there is no value.
183
                self._port_pin = 0
×
184
        else:
185
            # Analog channels do not have a pin.
186
            self._port_pin = None
1✔
187

188
    @property
1✔
189
    def port_pin(self) -> Optional[int]:
1✔
190
        """Number of pin of the digital port."""
191
        return self._port_pin
×
192

193
    @port_pin.setter
1✔
194
    def port_pin(self, val: Optional[int]) -> None:
1✔
195
        # If the channel is digital port is not None.
196
        if self.is_digital and val is None:
1✔
197
            raise ValueError(f"Using digital port {self.channel} cannot set "
×
198
                             f"the pin to None.")
199

200
        # If the channel is analog port is None.
201
        if not self.is_digital and val is not None:
1✔
202
            raise ValueError(f"Using analog channel {self.channel} pin must "
×
203
                             f"be None, not {val}.")
204

205
        self._port_pin = val
1✔
206

207
    @property
1✔
208
    def probe_attenuation(self) -> str:
1✔
209
        return self.REV_ATTENUATION[self._probe_attenuation]
×
210

211
    @probe_attenuation.setter
1✔
212
    def probe_attenuation(self, val: str) -> None:
1✔
213
        if val not in self.ATTENUATION:
1✔
214
            raise ValueError(f"Unsupported value {val} not in "
×
215
                             f"{self.ATTENUATION}")
216
        self._probe_attenuation = self.ATTENUATION[val]
1✔
217

218
    @property
1✔
219
    def ps_api_coupling(self) -> int:
1✔
220
        return self._coupling
×
221

222
    @property
1✔
223
    def coupling(self) -> str:
1✔
224
        return self._rev_couplings[self._coupling]
×
225

226
    @coupling.setter
1✔
227
    def coupling(self, val: str) -> None:
1✔
228
        if val not in self._couplings:
1✔
229
            raise ValueError("Unsupported value")
×
230
        self._coupling = self._couplings[val]
1✔
231

232
    @property
1✔
233
    def ps_api_range(self) -> float:
1✔
234
        """Range value for PicoScope API."""
235
        return self._ch_range_api_value[self._range]
×
236

237
    @property
1✔
238
    def range(self) -> str:
1✔
239
        """Human readable range voltage string."""
240
        return self._ch_range[self._range]
×
241

242
    @range.setter
1✔
243
    def range(self, val: float) -> None:
1✔
244
        if not isinstance(val, float):
1✔
245
            raise ValueError("Unsupported value (should be float)")
×
246

247
        # Find the smallest supported range that is higher than val
248
        for r in self._ch_range_list:
1✔
249
            if val <= r:
1✔
250
                self._range = r
1✔
251
                return
1✔
252
        raise ValueError(f"Unsupported value (too large), got {val}, maximum "
×
253
                         f"is {self._ch_range_list[-1]}")
254

255
    def _dict_repr(self) -> Dict[str, Any]:
1✔
256
        """Human readable representation as a key value dictionary."""
257
        ret: OrderedDict[str, Any] = OrderedDict()
×
258
        ret["channel"] = self.channel
×
259
        ret["range"] = self.range
×
260
        ret["probe_attenuation"] = self.probe_attenuation
×
261
        ret["coupling"] = self.coupling
×
262
        ret["port_pin"] = self._port_pin
×
263
        ret["bandwidth_limit"] = self.bw_limit
×
264
        return ret
×
265

266
    def dict_repr(self) -> Dict[str, Any]:
1✔
267
        """Public dictionary representation."""
268
        return self._dict_repr()
×
269

270
    def __repr__(self) -> str:
271
        val = util.dict_to_str(
272
            self._dict_repr())  # type: ignore[no-untyped-call]
273
        return str(val)
274

275
    def __str__(self) -> str:
1✔
276
        return self.__repr__()
×
277

278
    @property
1✔
279
    def is_digital(self) -> bool:
1✔
280
        """An MSO Pod can be connected as trigger, but for trace it does not
281
        make sense.
282
        """
283
        return False
1✔
284

285

286
class TriggerSettings(CaptureSettings):
1✔
287
    """Trigger channel settings."""
288
    _name = "Trigger Setting"
1✔
289

290
    THRESHOLD_DIRECTION: Dict[str, int] = {
1✔
291
        "Above":
292
            picoEnum.PICO_THRESHOLD_DIRECTION["PICO_ABOVE"],
293
        "Below":
294
            picoEnum.PICO_THRESHOLD_DIRECTION["PICO_BELOW"],
295
        "Rising":
296
            picoEnum.PICO_THRESHOLD_DIRECTION["PICO_RISING"],
297
        "Falling":
298
            picoEnum.PICO_THRESHOLD_DIRECTION["PICO_FALLING"],
299
        "RiseOrFall":
300
            picoEnum.PICO_THRESHOLD_DIRECTION["PICO_RISING_OR_FALLING"],
301
    }
302

303
    def __init__(self) -> None:
1✔
304
        super().__init__()
1✔
305
        self._trig_dir: Dict[str, int] = {}
1✔
306
        self._rev_trig_dir: Dict[int, str] = {}
1✔
307
        for name, val in self.THRESHOLD_DIRECTION.items():
1✔
308
            self._trig_dir[name] = val
1✔
309
            self._rev_trig_dir[val] = name
1✔
310

311
        self._channel: int = picoEnum.PICO_CHANNEL["PICO_PORT0"]
1✔
312
        self._port_pin: Optional[int] = 0
1✔
313
        self._range: float = 5.0
1✔
314
        self._coupling: int = self._couplings["DC"]
1✔
315
        self._trigger_direction: int = self._trig_dir["Rising"]
1✔
316
        self._trigger_level: float = 2.0  # V
1✔
317
        # PICO_VERY_HIGH_400MV
318
        # PICO_HIGH_200MV
319
        # PICO_NORMAL_100MV
320
        # PICO_LOW_50MV
321
        self.hysteresis: Optional[str] = "PICO_NORMAL_100MV"
1✔
322

323
    @property
1✔
324
    def ps_api_trigger_direction(self) -> int:
1✔
325
        """Trigger direction compatible with PicoScope API."""
326
        return self._trigger_direction
×
327

328
    @property
1✔
329
    def ps_api_trigger_level(self) -> int:
1✔
330
        """Trigger level compatible with PicoScope simple trigger API. Returns
331
        trigger level in mV.
332
        """
333
        # From V to mV and convert to integer
334
        return int(1_000 * self._trigger_level)
×
335

336
    @property
1✔
337
    def trigger_level(self) -> float:
1✔
338
        """Return trigger level value in V."""
339
        return self._trigger_level
×
340

341
    @trigger_level.setter
1✔
342
    def trigger_level(self, val: float) -> None:
1✔
343
        """Set trigger level in V.
344

345
        Args:
346
          val (float): The level in V at which to trigger.
347
        """
348
        self._trigger_level = val
1✔
349

350
    @property
1✔
351
    def trigger_direction(self) -> str:
1✔
352
        return self._rev_trig_dir[self._trigger_direction]
×
353

354
    @trigger_direction.setter
1✔
355
    def trigger_direction(self, val: str) -> None:
1✔
356
        if val not in self._trig_dir:
×
357
            raise ValueError("Unsupported value")
×
358
        self._trigger_direction = self._trig_dir[val]
×
359

360
    def _dict_repr(self) -> Dict[str, Any]:
1✔
361
        """Human readable representation as a key value dictionary."""
362
        config = super()._dict_repr()
×
363
        config.update({
×
364
            "trigger_level": self.trigger_level,
365
            "trigger_direction": self.trigger_direction,
366
            "hysteresis": self.hysteresis,
367
        })
368

369
        # Remove specific for analog / digital trigger.
370
        if self.is_digital:
×
371
            # Remove safely when keys not present
372
            config.pop("trigger_range", None)
×
373
            config.pop("trigger_coupling", None)
×
374

375
        return config
×
376

377
    @property
1✔
378
    def is_digital(self) -> bool:
1✔
379
        """Return if this channel is a digital channel (PORT0 or PORT1)."""
380
        return self.ps_api_channel in [
1✔
381
            picoEnum.PICO_CHANNEL["PICO_PORT0"],
382
            picoEnum.PICO_CHANNEL["PICO_PORT1"],
383
        ]
384

385

386
class Pico6424E(ScopeTemplate):
1✔
387
    """Class that interacts with the Picoscope 6424E oscilloscope."""
388
    _name = "Picoscope 6424E series 6000a (picosdk)"
1✔
389
    _NUM_CHANNELS: int = 4  # Number of analog channels
1✔
390

391
    DOWNSAMPLING_RATIO: int = 1
1✔
392

393
    def __init__(self, *args: Any, **kwargs: Any) -> None:
1✔
394
        del args  # unused
1✔
395
        del kwargs  # unused
1✔
396
        self.ps_handle: ctypes.c_int16 = ctypes.c_int16()
1✔
397

398
        self._resolution: int = picoEnum.PICO_DEVICE_RESOLUTION["PICO_DR_10BIT"]
1✔
399

400
        self.trace = CaptureSettings()
1✔
401
        self.trigger = TriggerSettings()
1✔
402
        self._sample_length: int = 500
1✔
403
        self._sample_offset: int = 0
1✔
404

405
        # Sample rate settings.
406
        self._sample_rate: float = 1E6
1✔
407
        # Set timebase (seconds per sample)
408
        self._timebase = Pico6424E._get_timebase(self._sample_rate)
1✔
409

410
        # Trace and trigger buffer, _buffers[0] is the trace buffer,
411
        # _buffers[1] is the trigger buffer.
412
        self._buffer_trace: list[float] = []
1✔
413
        self._buffer_trigger: Union[list[float], list[int]] = []
1✔
414

415
        # Part of cw API
416
        self.connectStatus: bool = False  # Connected status for cw  # pylint: disable=C0103
1✔
417
        self._max_adc: ctypes.c_int16 = ctypes.c_int16()  # To get mV values
1✔
418

419
        # Ignore signal overflow during capture.
420
        self.ignore_overflow: bool = False
1✔
421

422
        # If we use hardware downsampling, use averaging.
423
        # "PICO_RATIO_MODE_DECIMATE" could also be an option.
424
        self._downsampling_mode: int = picoEnum.PICO_RATIO_MODE[
1✔
425
            "PICO_RATIO_MODE_RAW"]
426
        if self.DOWNSAMPLING_RATIO > 1:
1✔
427
            self._downsampling_mode = picoEnum.PICO_RATIO_MODE[
×
428
                "PICO_RATIO_MODE_AVERAGE"]
429

430
    @property
1✔
431
    def resolution(self) -> int:
1✔
NEW
432
        return self._resolution
×
433

434
    @resolution.setter
1✔
435
    def resolution(self, value: str) -> None:
1✔
436
        """Set resolution. If the scope is connected it will reconnect. Higher
437
        resolution is not available for very high sampling frequencies. In such
438
        case PICO_CHANNEL_COMBINATION_NOT_VALID_IN_THIS_RESOLUTION will be
439
        raised.
440

441
        Args:
442

443
          value (str): The resolution value. See
444
          `picosdk.PicoDeviceEnums.PICO_DEVICE_RESOLUTION`.
445
        """
446
        if value not in picoEnum.PICO_DEVICE_RESOLUTION:
1✔
NEW
447
            raise ValueError(f"{value} not in supported values: "
×
448
                             f"{picoEnum.PICO_DEVICE_RESOLUTION.keys()}")
449

450
        reconnect: bool = self.connectStatus
1✔
451
        if reconnect:
1✔
NEW
452
            self.dis()
×
453

454
        self._resolution = picoEnum.PICO_DEVICE_RESOLUTION[value]
1✔
455

456
        if reconnect:
1✔
457
            self.con()
×
458

459
    @staticmethod
1✔
460
    def _get_timebase(sample_rate: float) -> ctypes.c_uint32:
1✔
461
        """Return timebase for PicoScope API.
462

463
        Args:
464
          sample_rate (float): Samples per second (in Hz).
465

466
        Returns: Timebase (seconds per sample) represented as ctypes.c_uint32
467
          value for use in ps6000aRunBlock.
468
        """
469
        # Handle too large sample_rate
470
        if sample_rate > 5e9:
1✔
471
            raise ValueError("This scope supports at most 5GHz sample_rate.")
1✔
472

473
        # From PicoScope API manual:
474
        # https://www.picotech.com/download/manuals/picoscope-6000-series-a-api-programmers-guide.pdf
475
        # n<5       2**timebase / 5_000_000_000
476
        # n>4       (timebase - 4) / 156_250_000
477
        # timebase  time
478
        # 0         200ps
479
        # 1         400ps
480
        # 2         800ps
481
        # 3         1.6ns
482
        # 4         3.2ns
483
        # 5         6.4ns
484
        # ...
485
        # 2**32-1   6.87s
486
        s_per_sample = 1 / Decimal(sample_rate)  # avoid floating point errors
1✔
487

488
        if s_per_sample >= Decimal("6.4e-9"):
1✔
489
            # Compute for the large timebase
490
            timebase = (156_250_000 * s_per_sample) + 4
1✔
491
            # Round carefully
492
            timebase = timebase.to_integral_exact(rounding=ROUND_HALF_DOWN)
1✔
493
            return ctypes.c_uint32(int(timebase))
1✔
494

495
        # timebase should be <= 4
496
        smallest_timebase = Decimal("0.2e-9")  # 200ps
1✔
497
        for i in range(4, -1, -1):
1✔
498
            if s_per_sample >= (2**i) * smallest_timebase:
1✔
499
                return ctypes.c_uint32(i)
1✔
500
        raise ValueError("Couldn't find a supported timebase")
×
501

502
    def con(self, sn: Optional[str] = None) -> bool:  # pragma: no cover
503
        del sn  # unused
504
        try:
505
            # Open the scope and get the corresponding handle self.ps_handle.
506
            # resolution 8, 10, 12 bit
507
            assert_ok(
508
                ps.ps6000aOpenUnit(
509
                    ctypes.byref(self.ps_handle),  # handle
510
                    None,  # serial, open the first scope found
511
                    self._resolution,  # resolution
512
                ))
513
            # ps6000aOpenUnit could return an indication of a needed firmware
514
            # update, but picosdk.constants.PICO_STATUS raises KeyError on
515
            # PICO_FIRMWARE_UPDATE_REQUIRED_TO_USE_DEVICE_WITH_THIS_DRIVER.
516

517
            # Get analog to digital converter limits.
518
            assert_ok(
519
                ps.ps6000aGetAdcLimits(
520
                    self.ps_handle,  # handle
521
                    self._resolution,  # resolution
522
                    ctypes.byref(ctypes.c_int16()),  # minADC
523
                    ctypes.byref(self._max_adc),  # maxADC
524
                ))
525

526
            # Set channels and trigger.
527
            self._set_channels()
528
            self.connectStatus = True
529
            return True
530
        except Exception:  # pylint: disable=W0703
531
            # Whatever happened call disconnect.
532

533
            # Print stack traceback.
534
            traceback.print_exc()
535

536
            # Disconnect the scope if the exception was raised during setting
537
            # channels.
538
            self.dis()
539
            return False
540

541
    def dis(self) -> bool:  # pragma: no cover
542
        if self.ps_handle.value > 0:
543
            # Check that the scope is connected
544
            assert self.connectStatus
545

546
            # Interrupt data capture
547
            assert_ok(ps.ps6000aStop(self.ps_handle))
548

549
            # Close the connection to the PicoScope.
550
            assert_ok(ps.ps6000aCloseUnit(self.ps_handle))
551
            # set the handle value to zero
552
            self.ps_handle.value = 0
553

554
        self.connectStatus = False
555
        # ScopeTemplate expects True to be returned.
556
        return True
557

558
    def arm(self) -> None:  # pragma: no cover
559
        """Prepare the scope for capturing."""
560
        # Check if this scope is connected.
561
        if self.connectStatus is False:
562
            raise ConnectionError(
563
                f"Scope {self._name} is not connected. Connect it first.")
564

565
        # Run the capture block
566
        assert_ok(
567
            ps.ps6000aRunBlock(
568
                self.ps_handle,  # handle
569
                self.DOWNSAMPLING_RATIO *
570
                self._sample_offset,  # Pre-trigger samples
571
                self.DOWNSAMPLING_RATIO *
572
                self._sample_length,  # Post-trigger samples
573
                self._timebase,  # timebase
574
                ctypes.byref(ctypes.c_double(0)),  # timeIndisposedMs
575
                0,  # segmentIndex
576
                None,  # lpReady callback
577
                None,  # pParameter
578
            ))
579

580
    def capture(self, poll_done: bool = False) -> bool:  # pragma: no cover
581
        """Capture one trace and return True if timeout has happened
582
        (possible capture failure).
583

584
        Args:
585
          poll_done: Not supported in PicoScope, but a part of API.
586

587
        Raises: IOError if unknown failure.
588

589
        Returns: True if the trace needs to be recaptured due to timeout or
590
          trace overflow (if ignore_overflow is set to False). False otherwise.
591
        """
592
        del poll_done  # unused
593

594
        # Wait until the result is ready
595
        ready: ctypes.c_int16 = ctypes.c_int16(0)
596
        check: ctypes.c_int16 = ctypes.c_int16(0)
597
        start_waiting = time.time()  # s from start of the epoch
598
        while ready.value == check.value:
599
            ps.ps6000aIsReady(self.ps_handle, ctypes.byref(ready))
600
            # Check for timeout
601
            time_now = time.time()
602
            if time_now - start_waiting > 100:  # [s]
603
                # Stop current capture
604
                assert_ok(ps.ps6000aStop(self.ps_handle))
605
                # Indicate timeout
606
                return True
607

608
        # Retrieve the values
609
        overflow: ctypes.c_int16 = ctypes.c_int16()
610
        max_samples: ctypes.c_int32 = ctypes.c_int32(self._total_samples)
611
        assert_ok(
612
            ps.ps6000aGetValues(
613
                self.ps_handle,  # handle
614
                0,  # Start index
615
                ctypes.byref(max_samples),  # number of processed samples
616
                self.DOWNSAMPLING_RATIO,  # Downsample ratio
617
                self._downsampling_mode,  # Downsampling mode
618
                0,  # Segment index
619
                ctypes.byref(overflow),  # overflow
620
            ))
621

622
        # Convert memory buffers into mV values and save them into
623
        # self._buffers[0] is the trace and self._buffers[1] is the trigger
624
        self._buffer_trace = adc2mV(
625
            self._trace_buffer,
626
            self.trace.ps_api_range,  # range
627
            self._max_adc)[:max_samples.value]
628
        if self.trigger.is_digital:
629
            self._buffer_trigger = self._trigger_buffer[:max_samples.value]
630
        else:
631
            self._buffer_trigger = adc2mV(
632
                self._trigger_buffer,
633
                self.trigger.ps_api_range,  # range
634
                self._max_adc)[:max_samples.value]
635

636
        # print("Overflow: {}".format(overflow.value))
637
        # print("Samples: {}".format(len(self._trace_buffer)))
638
        # print("Max samples: {}".format(max_samples.value))
639
        if self.ignore_overflow:
640
            return False
641
        else:
642
            return (overflow.value >> self.trace.ps_api_channel) & 1 == 1
643

644
    def get_last_trace(self, as_int: bool = False) -> ScopeTraceType:
1✔
645
        """Return a copy of the last trace.
646

647
        Args:
648
          as_int (bool): Not supported, part of CW API. Return integer
649
            representation of the trace. Defaults to False meaning return
650
            np.array of dtype np.float32.
651

652
        Returns: np array representing the last captured trace in mV unless
653
        `as_int` is set to `True` in which case it returns the ADC values
654
        directly.
655
        """
656
        if as_int:
×
657
            return np.array(self._trace_buffer[:], dtype=np.int32)
×
658

659
        return np.array(self._buffer_trace[:], dtype=np.float32)
×
660

661
    def get_last_trigger_trace(self) -> ScopeTriggerTraceType:
1✔
662
        """Return a copy of the last trigger trace.
663

664
        Returns: np array representing the trigger trace. If the trigger is
665
          digital returns a boolean array, else returns a float32 array.
666
        """
667
        # Digital trigger.
668
        if self.trigger.is_digital:
×
669
            bits = np.array(self._buffer_trigger[:], dtype=np.int16)
×
670
            port_pin = self.trigger.port_pin
×
671
            assert port_pin is not None
×
672
            return np.array((bits >> port_pin) & 1, dtype=np.bool_)
×
673

674
        # Analog trigger.
675
        return np.array(self._buffer_trigger[:], dtype=np.float32)
×
676

677
    def _set_channel_on(
678
            self, channel_info: CaptureSettings) -> None:  # pragma: no cover
679
        """Turn on a single channel.
680

681
        Args:
682
          channel_info (CaptureSettings): Either the trace or the trigger
683
            channel.
684
        """
685
        if channel_info.is_digital:
686
            assert isinstance(channel_info, TriggerSettings)
687
            # Turn on a digital port.
688
            # Logic level needs to be set individually for each digital
689
            # channel/pin in the port.
690
            pins = 8
691
            # A list of threshold voltages (one for each port pin), used to
692
            # distinguish 0 and 1 states. Range -32_767 (-5V) to 32_767 (5V).
693
            logic_threshold_level = (ctypes.c_int16 * pins)(0)
694
            for i in range(pins):
695
                logic_threshold_level[i] = int(
696
                    (32_767 / 5) * channel_info.trigger_level)
697
            logic_threshold_level_length = len(logic_threshold_level)
698
            # Hysteresis of the digital channel.
699
            ps_api_hysteresis = picoEnum.PICO_DIGITAL_PORT_HYSTERESIS[
700
                channel_info.hysteresis]
701
            assert_ok(
702
                ps.ps6000aSetDigitalPortOn(
703
                    self.ps_handle,  # handle
704
                    channel_info.ps_api_channel,  # port
705
                    ctypes.byref(logic_threshold_level),
706
                    logic_threshold_level_length,
707
                    ps_api_hysteresis,
708
                ))
709
        else:
710
            # Turn on an analog channel.
711
            assert_ok(
712
                ps.ps6000aSetChannelOn(
713
                    self.ps_handle,  # handle
714
                    channel_info.ps_api_channel,  # channel
715
                    channel_info.ps_api_coupling,  # coupling
716
                    channel_info.ps_api_range,  # range
717
                    0,  # analogue offset
718
                    picoEnum.PICO_BANDWIDTH_LIMITER[
719
                        channel_info.bw_limit],  # bandwidth
720
                ))
721

722
    def _set_digital_trigger(self) -> None:  # pragma: no cover
723
        """Set a trigger on a digital channel.
724
        """
725
        # Make sure we are using a digital trigger
726
        assert self.trigger.is_digital
727

728
        # Set Pico channel conditions
729
        conditions = (picoStruct.PICO_CONDITION * 1)()
730
        conditions[0] = picoStruct.PICO_CONDITION(
731
            self.trigger.ps_api_channel,  # PICO_CHANNEL source
732
            picoEnum.PICO_TRIGGER_STATE[
733
                "PICO_CONDITION_TRUE"],  # PICO_TRIGGER_STATE condition
734
        )
735
        assert_ok(
736
            ps.ps6000aSetTriggerChannelConditions(
737
                self.ps_handle,  # handle
738
                ctypes.byref(conditions),  # * conditions
739
                len(conditions),  # n_conditions
740
                3,  # action PICO_CLEAR_ALL = 1  PICO_ADD = 2
741
            ))
742

743
        # Set trigger digital port properties
744
        digital_channels: int = 8
745
        # PICO_DIGITAL_DONT_CARE:
746
        #    channel has no effect on trigger
747
        # PICO_DIGITAL_DIRECTION_LOW:
748
        #    channel must be low to trigger
749
        # PICO_DIGITAL_DIRECTION_HIGH:
750
        #    channel must be high to trigger
751
        # PICO_DIGITAL_DIRECTION_RISING:
752
        #    channel must transition from low to high to trigger
753
        # PICO_DIGITAL_DIRECTION_FALLING:
754
        #    channel must transition from high to low to trigger
755
        # PICO_DIGITAL_DIRECTION_RISING_OR_FALLING:
756
        #    any transition on channel causes a trigger
757
        directions = (picoStruct.DIGITAL_CHANNEL_DIRECTIONS *
758
                      digital_channels)()
759
        for i in range(digital_channels):
760
            # Ignore all pins except for the trigger pin.
761
            trigger_direction: str = "PICO_DIGITAL_DONT_CARE"
762
            if i == self.trigger.port_pin:
763
                trigger_direction = "PICO_DIGITAL_DIRECTION_RISING"
764

765
            directions[i] = picoStruct.DIGITAL_CHANNEL_DIRECTIONS(
766
                PICO_PORT_DIGITAL_CHANNEL[
767
                    f"PICO_PORT_DIGITAL_CHANNEL{i}"],  # channel
768
                picoEnum.PICO_DIGITAL_DIRECTION[trigger_direction],
769
            )
770

771
        assert_ok(
772
            ps.ps6000aSetTriggerDigitalPortProperties(
773
                self.ps_handle,  # handle
774
                self.trigger.ps_api_channel,  # port
775
                ctypes.byref(directions),  # directions
776
                len(directions),  # n_directions
777
            ))
778

779
    def _set_channels(self) -> None:  # pragma: no cover
780
        """Setup channels, buffers, and trigger."""
781
        if self.ps_handle.value <= 0:
782
            # No opened PicoScope handle
783
            msg = f"Scope handle is {self.ps_handle.value}"
784
            assert not self.connectStatus, msg
785
            return
786

787
        # Turn off all analog channels
788
        for c in range(self._NUM_CHANNELS):
789
            assert_ok(ps.ps6000aSetChannelOff(
790
                self.ps_handle,
791
                c,  # channel
792
            ))
793
        # Set MSO pods off
794
        for port in [
795
                picoEnum.PICO_CHANNEL["PICO_PORT0"],
796
                picoEnum.PICO_CHANNEL["PICO_PORT1"],
797
        ]:
798
            assert_ok(
799
                ps.ps6000aSetDigitalPortOff(
800
                    self.ps_handle,  # handle
801
                    port,  # port
802
                ))
803

804
        # Turn on trace and trigger channels.
805
        self._set_channel_on(channel_info=self.trace)
806
        self._set_channel_on(channel_info=self.trigger)
807

808
        # Set a trigger.
809
        if self.trigger.is_digital:
810
            # Set a digital trigger.
811
            self._set_digital_trigger()
812
        else:
813
            # Set simple trigger
814
            assert_ok(
815
                ps.ps6000aSetSimpleTrigger(
816
                    self.ps_handle,  # handle
817
                    1,  # enable=1 (for disable set to 0)
818
                    self.trigger.ps_api_channel,  # channel
819
                    self.trigger.ps_api_trigger_level,  # threshold in mV
820
                    self.trigger.ps_api_trigger_direction,  # direction
821
                    0,  # delay = 0 s
822
                    100_000_000,  # autoTriggerMicroSeconds = 100s
823
                ))
824

825
        # Set data buffers
826
        self._total_samples = self.DOWNSAMPLING_RATIO * (self._sample_length +
827
                                                         self._sample_offset)
828
        self._trace_buffer = (ctypes.c_int16 * self._total_samples)()
829
        self._trigger_buffer = (ctypes.c_int16 * self._total_samples)()
830
        data_type = picoEnum.PICO_DATA_TYPE["PICO_INT16_T"]
831
        waveform = 0
832
        # Set trace buffer
833
        action = picoEnum.PICO_ACTION["PICO_CLEAR_ALL"] | picoEnum.PICO_ACTION[
834
            "PICO_ADD"]
835
        assert_ok(
836
            ps.ps6000aSetDataBuffer(
837
                self.ps_handle,  # handle
838
                self.trace.ps_api_channel,  # channel
839
                ctypes.byref(self._trace_buffer),  # buffer
840
                self._total_samples,  # samples
841
                data_type,  # data type
842
                waveform,  # waveform (the segment index)
843
                self._downsampling_mode,  # Downsampling mode
844
                action,  # buffer action
845
            ))
846
        # Set trigger buffer
847
        # Note that there is no PICO_CLEAR_ALL action, as that would remove
848
        # the trace buffer.
849
        assert_ok(
850
            ps.ps6000aSetDataBuffer(
851
                self.ps_handle,  # handle
852
                self.trigger.ps_api_channel,  # channel
853
                ctypes.byref(self._trigger_buffer),  # buffer
854
                self._total_samples,  # samples
855
                data_type,  # data type
856
                waveform,  # waveform (the segment index)
857
                self._downsampling_mode,  # Downsampling mode
858
                picoEnum.PICO_ACTION["PICO_ADD"],  # buffer action
859
            ))
860

861
    @property
1✔
862
    def sample_rate(self) -> float:
1✔
863
        return self._sample_rate
×
864

865
    @sample_rate.setter
1✔
866
    def sample_rate(self, val: float) -> None:
1✔
867
        self._sample_rate = val
1✔
868
        self._timebase = Pico6424E._get_timebase(self._sample_rate)
1✔
869

870
    @property
1✔
871
    def sample_length(self) -> int:
1✔
872
        return self._sample_length
×
873

874
    @sample_length.setter
1✔
875
    def sample_length(self, val: int) -> None:
1✔
876
        self._sample_length = val
1✔
877
        self._set_channels()
1✔
878

879
    @property
1✔
880
    def sample_offset(self) -> int:
1✔
881
        return self._sample_offset
×
882

883
    @sample_offset.setter
1✔
884
    def sample_offset(self, val: int) -> None:
1✔
885
        self._sample_offset = val
1✔
886
        self._set_channels()
1✔
887

888
    def _dict_repr(self) -> Dict[str, Any]:
1✔
889
        """Human readable representation as a key value dictionary."""
890
        ret: OrderedDict[str, Any] = OrderedDict()
×
891
        ret["trace"] = self.trace.dict_repr()
×
892
        ret["trigger"] = self.trigger.dict_repr()
×
893
        ret["sample_rate"] = self.sample_rate
×
894
        ret["sample_length"] = self.sample_length
×
895
        ret["sample_offset"] = self.sample_offset
×
896
        ret["ignore_overflow"] = self.ignore_overflow
×
897
        return ret
×
898

899
    def __repr__(self) -> str:
900
        """Return device name, connected status and dict representation as
901
        multi-line string."""
902
        connected = "Connected" if self.connectStatus else "Not connected"
903
        dict_repr = util.dict_to_str(
904
            self._dict_repr())  # type: ignore[no-untyped-call]
905
        return f"{self._name} device {connected}\n{dict_repr}"
906

907
    def __str__(self) -> str:
1✔
908
        return self.__repr__()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc