• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Oucru-Innovations / vital-DSP / 11545366242

28 Oct 2024 12:54AM UTC coverage: 90.288% (-0.8%) from 91.103%
11545366242

push

github

Koaha
fix broken workflow

1263 of 1606 branches covered (78.64%)

Branch coverage included in aggregate %.

5 of 12 new or added lines in 2 files covered. (41.67%)

143 existing lines in 3 files now uncovered.

5607 of 6003 relevant lines covered (93.4%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.51
/src/vitalDSP/physiological_features/waveform.py
1
import numpy as np
1✔
2
from vitalDSP.utils.peak_detection import PeakDetection
1✔
3
from scipy.stats import skew
1✔
4

5

6
class WaveformMorphology:
1✔
7
    """
8
    A class for computing morphological features from physiological waveforms (ECG, PPG, EEG).
9

10
    Attributes:
11
        waveform (np.array): The waveform signal (ECG, PPG, EEG).
12
        fs (int): The sampling frequency of the signal in Hz. Default is 1000 Hz.
13
        signal_type (str): The type of signal ('ECG', 'PPG', 'EEG').
14
    """
15

16
    def __init__(self, waveform, fs=256, signal_type="ECG"):
1✔
17
        """
18
        Initializes the WaveformMorphology object.
19

20
        Args:
21
            waveform (np.array): The waveform signal.
22
            fs (int): The sampling frequency of the signal in Hz. Default is 1000 Hz.
23
            signal_type (str): The type of signal (ECG, PPG, EEG).
24
        """
25
        self.waveform = np.array(waveform)
1✔
26
        self.fs = fs  # Sampling frequency
1✔
27
        self.signal_type = signal_type  # 'ECG', 'PPG', 'EEG'
1✔
28

29
    def detect_troughs(self, systolic_peaks=None):
1✔
30
        """
31
        Detects the troughs (valleys) in the PPG waveform between systolic peaks.
32

33
        Parameters
34
        ----------
35
        systolic_peaks : np.array
36
            Indices of detected systolic peaks in the PPG waveform.
37

38
        Returns
39
        -------
40
        troughs : np.array
41
            Indices of the detected troughs between the systolic peaks.
42

43
        Example
44
        -------
45
        >>> waveform = np.sin(np.linspace(0, 2*np.pi, 100))  # Simulated PPG signal
46
        >>> wm = WaveformMorphology(waveform, signal_type="PPG")
47
        >>> peaks = PeakDetection(waveform).detect_peaks()
48
        >>> troughs = wm.detect_troughs(peaks)
49
        >>> print(f"Troughs: {troughs}")
50
        """
51
        if systolic_peaks is None:
1!
UNCOV
52
            detector = PeakDetection(
×
53
                self.waveform,
54
                "ppg_systolic_peaks",
55
                **{
56
                    "distance": 50,
57
                    "window_size": 7,
58
                    "threshold_factor": 1.6,
59
                    "search_window": 6,
60
                    "fs": self.fs,
61
                }
62
            )
UNCOV
63
            systolic_peaks = detector.detect_peaks()
×
64
        diastolic_troughs = []
1✔
65
        signal_derivative = np.diff(self.waveform)
1✔
66

67
        for i in range(len(systolic_peaks) - 1):
1✔
68
            # Define the search range between two adjacent systolic peaks
69
            peak_start = systolic_peaks[i]
1✔
70
            peak_end = systolic_peaks[i + 1]
1✔
71
            search_start = peak_start + (peak_end - peak_start) // 2
1✔
72
            search_end = peak_end
1✔
73

74
            if search_start >= search_end or search_start < 0:
1!
UNCOV
75
                continue
×
76

77
            # Calculate the adaptive flatness threshold based on the MAD of the derivative within the search range
78
            local_derivative = signal_derivative[search_start:search_end]
1✔
79
            mad_derivative = np.median(
1✔
80
                np.abs(local_derivative - np.median(local_derivative))
81
            )
82
            adaptive_threshold = (
1✔
83
                0.5 * mad_derivative
84
            )  # Flatness threshold set as 50% of local MAD
85

86
            # Identify flat segments based on the adaptive threshold
87
            flat_segment = []
1✔
88
            for j in range(search_start, search_end - 1):
1✔
89
                if abs(signal_derivative[j]) < adaptive_threshold:
1✔
90
                    flat_segment.append(j)
1✔
91

92
            # Find the midpoint of the longest flat segment (if multiple segments are detected)
93
            if flat_segment:
1✔
94
                flat_segment_groups = np.split(
1✔
95
                    flat_segment, np.where(np.diff(flat_segment) != 1)[0] + 1
96
                )
97
                longest_flat_segment = max(flat_segment_groups, key=len)
1✔
98
                trough_index = longest_flat_segment[len(longest_flat_segment) // 2]
1✔
99
                diastolic_troughs.append(trough_index)
1✔
100

101
        return np.array(diastolic_troughs)
1✔
102
        # # Calculate the derivative of the signal for slope information
103
        # signal_derivative = np.diff(self.waveform)
104

105
        # for i in range(len(systolic_peaks) - 1):
106
        #     # Define the search range as the midpoint between two adjacent systolic peaks
107
        #     peak_start = systolic_peaks[i]
108
        #     peak_end = systolic_peaks[i + 1]
109
        #     search_start = peak_start + (peak_end - peak_start) // 2
110
        #     search_end = peak_end
111

112
        #     # Ensure search range is valid
113
        #     if search_start >= search_end or search_start < 0:
114
        #         continue
115

116
        #     # Narrow down to the section within the search range
117
        #     search_section = self.waveform[search_start:search_end]
118
        #     search_derivative = signal_derivative[search_start:search_end-1]
119

120
        #     # Find the index where the slope is closest to zero with a preference for a negative slope
121
        #     candidate_troughs = [
122
        #         idx for idx, slope in enumerate(search_derivative) if slope <= 0
123
        #     ]
124

125
        #     if candidate_troughs:
126
        #         trough_index = candidate_troughs[np.argmin(np.abs(search_derivative[candidate_troughs]))]
127
        #         diastolic_troughs.append(search_start + trough_index)
128
        # return np.array(diastolic_troughs)
129

130
    def detect_notches(self, systolic_peaks=None, diastolic_troughs=None):
1✔
131
        """
132
        Detects the dicrotic notches in a PPG waveform using second derivative.
133

134
        Parameters
135
        ----------
136
        systolic_peaks : np.array
137
            Indices of detected systolic peaks in the PPG waveform.
138
        diastolic_troughs : np.array
139
            Indices of the detected troughs between the systolic peaks.
140

141
        Returns
142
        -------
143
        notches : np.array
144
            Indices of detected dicrotic notches in the PPG waveform.
145

146
        Example
147
        -------
148
        >>> waveform = np.sin(np.linspace(0, 2*np.pi, 100))  # Simulated PPG signal
149
        >>> wm = WaveformMorphology(waveform, signal_type="PPG")
150
        >>> notches = wm.detect_notches()
151
        >>> print(f"Dicrotic Notches: {notches}")
152
        """
153
        if self.signal_type != "PPG":
1✔
154
            raise ValueError("Notches can only be detected for PPG signals.")
1✔
155
        if systolic_peaks is None:
1!
156
            detector = PeakDetection(
1✔
157
                self.waveform,
158
                "ppg_systolic_peaks",
159
                **{
160
                    "distance": 50,
161
                    "window_size": 7,
162
                    "threshold_factor": 1.6,
163
                    "search_window": 6,
164
                    "fs": self.fs,
165
                }
166
            )
167
            systolic_peaks = detector.detect_peaks()
1✔
168
        if diastolic_troughs is None:
1!
169
            diastolic_troughs = self.detect_troughs(systolic_peaks=systolic_peaks)
1✔
170
        notches = []
1✔
171

172
        # Calculate the first and second derivatives of the signal
173
        signal_derivative = np.diff(self.waveform)
1✔
174
        signal_second_derivative = np.diff(signal_derivative)
1✔
175

176
        for i in range(min(len(systolic_peaks), len(diastolic_troughs))):
1!
UNCOV
177
            peak = systolic_peaks[i]
×
UNCOV
178
            trough = diastolic_troughs[i]
×
179

180
            # Define the search range: keep it close to the peak, 10-30% of the distance to the trough
NEW
181
            search_start = peak + int(
×
182
                (trough - peak) * 0.1
183
            )  # Start 10% toward the trough
NEW
184
            search_end = peak + int((trough - peak) * 0.3)  # End 30% toward the trough
×
185

186
            # Ensure the search range is within bounds
UNCOV
187
            search_start = min(max(search_start, peak), trough)
×
UNCOV
188
            search_end = min(max(search_end, peak), trough)
×
189

190
            # Extract search section in the signal and derivatives
191
            # search_section = self.waveform[search_start:search_end]
NEW
192
            search_derivative = signal_derivative[search_start : search_end - 1]
×
NEW
193
            search_second_derivative = signal_second_derivative[
×
194
                search_start : search_end - 2
195
            ]
196

197
            # Identify candidate notches with slope close to zero
UNCOV
198
            candidate_notches = [
×
199
                idx
200
                for idx in range(len(search_derivative))
201
                if abs(search_derivative[idx])
202
                < 0.05  # Adjust threshold for "close to zero" slope
203
            ]
204

205
            # Filter candidates where the second derivative indicates leveling (positive values)
206
            candidate_indices = [
×
207
                idx
208
                for idx in candidate_notches
209
                if idx < len(search_second_derivative)
210
                and search_second_derivative[idx] > 0
211
            ]
212

213
            # Select the best candidate notch based on minimum absolute slope
UNCOV
214
            if candidate_indices:
×
NEW
215
                best_notch_idx = candidate_indices[
×
216
                    np.argmin(np.abs(search_derivative[candidate_indices]))
217
                ]
218
                notches.append(search_start + best_notch_idx)
×
219

220
        return np.array(notches)
1✔
221

222
    def detect_diastolic_peak(self, notches=None, diastolic_troughs=None):
1✔
223
        """
224
        Detect diastolic peaks in PPG signals based on notches and diastolic troughs.
225

226
        Parameters
227
        ----------
228
        notches : list of int
229
            Indices of detected notches in the PPG signal.
230
        diastolic_troughs : list of int
231
            Indices of diastolic troughs in the PPG signal.
232

233
        Returns
234
        -------
235
        diastolic_peaks : list of int
236
            Indices of diastolic peaks detected in the PPG signal.
237
        """
UNCOV
238
        if diastolic_troughs is None:
×
UNCOV
239
            diastolic_troughs = self.detect_troughs()
×
UNCOV
240
        if notches is None:
×
UNCOV
241
            notches = self.detect_notches(diastolic_troughs=diastolic_troughs)
×
UNCOV
242
        diastolic_peaks = []
×
243

UNCOV
244
        for i in range(len(notches)):
×
UNCOV
245
            notch = notches[i]
×
NEW
246
            trough = (
×
247
                diastolic_troughs[i]
248
                if i < len(diastolic_troughs)
249
                else len(self.waveform) - 1
250
            )
251

252
            # Define the initial search range: notch to halfway to the trough
UNCOV
253
            search_start = notch
×
UNCOV
254
            search_end = notch + (trough - notch) // 2
×
255

UNCOV
256
            if search_end <= search_start or search_end > len(self.waveform):
×
257
                diastolic_peaks.append(notch)
×
UNCOV
258
                continue
×
259

UNCOV
260
            search_segment = self.waveform[search_start:search_end]
×
261
            segment_derivative = np.diff(search_segment)
×
262

UNCOV
263
            candidate_peaks = [
×
264
                idx
265
                for idx in range(1, len(segment_derivative))
266
                if segment_derivative[idx - 1] < 0 and segment_derivative[idx] >= 0
267
            ]
268

269
            # Convert candidates to absolute indices and select the most prominent peak if available
UNCOV
270
            if candidate_peaks:
×
UNCOV
271
                candidate_indices = [search_start + idx for idx in candidate_peaks]
×
NEW
272
                diastolic_peak_idx = max(
×
273
                    candidate_indices, key=lambda x: self.waveform[x]
274
                )
UNCOV
275
                diastolic_peaks.append(diastolic_peak_idx)
×
276
            else:
277
                # Fallback assignment: if no diastolic peak is detected, use the notch as the diastolic peak
UNCOV
278
                diastolic_peaks.append(notch)
×
279

UNCOV
280
        return diastolic_peaks
×
281

282
    def detect_q_valley(self, r_peaks=None):
1✔
283
        """
284
        Detects the Q valley (local minimum) in the ECG waveform just before each R peak.
285

286
        Parameters
287
        ----------
288
        r_peaks : np.array
289
            Indices of detected R peaks in the ECG waveform.
290

291
        Returns
292
        -------
293
        q_valleys : list of int
294
            Indices of the Q valley (local minimum) for each R peak.
295
        """
UNCOV
296
        if self.signal_type != "ECG":
×
UNCOV
297
            raise ValueError("Q valleys can only be detected for ECG signals.")
×
UNCOV
298
        if r_peaks is None:
×
UNCOV
299
            detector = PeakDetection(
×
300
                self.waveform,
301
                "ecg_r_peak",
302
                **{
303
                    "distance": 50,
304
                    "window_size": 7,
305
                    "threshold_factor": 1.6,
306
                    "search_window": 6,
307
                }
308
            )
UNCOV
309
            r_peaks = detector.detect_peaks()
×
310

UNCOV
311
        q_valleys = []
×
UNCOV
312
        for i, r_peak in enumerate(r_peaks):
×
313
            # Set the end of the search range to be the R peak
UNCOV
314
            search_end = r_peak
×
315

316
            # Determine the start of the search range
UNCOV
317
            if i == 0:
×
318
                # For the first R peak, start from the beginning of the signal
UNCOV
319
                search_start = max(
×
320
                    0, search_end - int(self.fs * 0.2)
321
                )  # Approx 200ms window
322
            else:
323
                # For subsequent R peaks, set the start as the midpoint to the previous R peak
UNCOV
324
                search_start = (r_peaks[i - 1] + r_peak) // 2
×
325

326
            # Ensure the search range is valid
UNCOV
327
            if search_start < search_end:
×
328
                # Extract the signal segment within the search range
UNCOV
329
                signal_segment = self.waveform[search_start:search_end]
×
330

331
                # Detect the Q valley as the minimum point in the segment
UNCOV
332
                q_valley_idx = np.argmin(signal_segment) + search_start
×
UNCOV
333
                q_valleys.append(q_valley_idx)
×
334

UNCOV
335
        return q_valleys
×
336

337
    def detect_q_session(self, r_peaks):
1✔
338
        """
339
        Detects the Q sessions (start and end) in the ECG waveform based on R peaks.
340

341
        Parameters
342
        ----------
343
        r_peaks : np.array
344
            Indices of detected R peaks in the ECG waveform.
345

346
        Returns
347
        -------
348
        q_sessions : list of tuples
349
            Each tuple contains the start and end index of a Q session.
350

351
        Example
352
        -------
353
        >>> waveform = np.sin(np.linspace(0, 2*np.pi, 100))  # Simulated ECG signal
354
        >>> wm = WaveformMorphology(waveform, signal_type="ECG")
355
        >>> r_peaks = PeakDetection(waveform).detect_peaks()
356
        >>> q_sessions = wm.detect_q_session(r_peaks)
357
        >>> print(f"Q Sessions: {q_sessions}")
358
        """
359
        if self.signal_type != "ECG":
1✔
360
            raise ValueError("Q sessions can only be detected for ECG signals.")
1✔
361

362
        q_sessions = []
1✔
363
        signal_derivative = np.diff(self.waveform)  # First derivative of the signal
1✔
364

365
        for r_peak in r_peaks:
1✔
366
            # Search backward to find the downhill slope, which indicates Q-wave start
367
            search_start = r_peak
1✔
368
            for i in range(
1✔
369
                r_peak - 1, max(0, r_peak - int(self.fs * 0.2)), -1
370
            ):  # Limit to a max of 200ms
371
                if (
1✔
372
                    signal_derivative[i - 1] < 0 and signal_derivative[i] >= 0
373
                ):  # Change from downhill to minimum
374
                    search_start = i
1✔
375
                    break
1✔
376

377
            # Define search end as the R-peak for locating Q-wave
378
            search_end = r_peak
1✔
379

380
            # Find the Q-wave within the determined search window
381
            if search_end > search_start:
1✔
382
                q_wave_idx = search_start + np.argmin(
1✔
383
                    self.waveform[search_start:search_end]
384
                )
385

386
                # Locate Q session boundaries more precisely
387
                # Start of Q session: keep moving left until we find a rising slope
388
                for i in range(q_wave_idx, search_start, -1):
1!
UNCOV
389
                    if signal_derivative[i - 1] >= 0 and signal_derivative[i] < 0:
×
UNCOV
390
                        q_start = i
×
UNCOV
391
                        break
×
392
                else:
393
                    q_start = search_start  # Fallback if boundary not found
1✔
394

395
                # End of Q session: continue to the right until we find the slope beginning to rise again
396
                for i in range(q_wave_idx, search_end - 1):
1✔
397
                    if signal_derivative[i] < 0 and signal_derivative[i + 1] >= 0:
1!
UNCOV
398
                        q_end = i
×
UNCOV
399
                        break
×
400
                else:
401
                    q_end = search_end  # Fallback if boundary not found
1✔
402

403
                q_sessions.append((q_start, q_end))
1✔
404
            else:
405
                continue
1✔
406
                # print(f"Skipping R-peak at {r_peak}: invalid search window [{search_start}, {search_end})")
407

408
        return q_sessions
1✔
409

410
    def detect_r_session(self, r_peaks):
1✔
411
        """
412
        Detects the R sessions (start and end) in the ECG waveform.
413

414
        Parameters
415
        ----------
416
        r_peaks : np.array
417
            Indices of detected R peaks in the ECG waveform.
418

419
        Returns
420
        -------
421
        r_sessions : list of tuples
422
            Each tuple contains the start and end index of an R session.
423

424
        Example
425
        -------
426
        >>> waveform = np.sin(np.linspace(0, 2*np.pi, 100))  # Simulated ECG signal
427
        >>> wm = WaveformMorphology(waveform, signal_type="ECG")
428
        >>> r_peaks = PeakDetection(waveform).detect_peaks()
429
        >>> r_sessions = wm.detect_r_session(r_peaks)
430
        >>> print(f"R Sessions: {r_sessions}")
431
        """
UNCOV
432
        r_sessions = [
×
433
            (r_peak - int(self.fs * 0.02), r_peak + int(self.fs * 0.02))
434
            for r_peak in r_peaks
435
        ]
UNCOV
436
        return r_sessions
×
437

438
    def detect_s_session(self, r_peaks):
1✔
439
        """
440
        Detects the S sessions (start and end) in the ECG waveform based on R peaks.
441

442
        Parameters
443
        ----------
444
        r_peaks : np.array
445
            Indices of detected R peaks in the ECG waveform.
446

447
        Returns
448
        -------
449
        s_sessions : list of tuples
450
            Each tuple contains the start and end index of an S session.
451

452
        Example
453
        -------
454
        >>> waveform = np.sin(np.linspace(0, 2*np.pi, 100))  # Simulated ECG signal
455
        >>> wm = WaveformMorphology(waveform, signal_type="ECG")
456
        >>> r_peaks = PeakDetection(waveform).detect_peaks()
457
        >>> s_sessions = wm.detect_s_session(r_peaks)
458
        >>> print(f"S Sessions: {s_sessions}")
459
        """
460
        if self.signal_type != "ECG":
1✔
461
            raise ValueError("S sessions can only be detected for ECG signals.")
1✔
462

463
        s_sessions = []
1✔
464
        for r_peak in r_peaks:
1✔
465
            s_start = r_peak
1✔
466
            s_end = min(
1✔
467
                len(self.waveform), r_peak + int(self.fs * 0.04)
468
            )  # Example: 40 ms after R peak
469
            s_sessions.append((s_start, s_end))
1✔
470
        return s_sessions
1✔
471

472
    def detect_qrs_session(self, r_peaks):
1✔
473
        """
474
        Detects the QRS complex sessions (start and end) in the ECG waveform.
475

476
        Parameters
477
        ----------
478
        r_peaks : np.array
479
            Indices of detected R peaks in the ECG waveform.
480

481
        Returns
482
        -------
483
        qrs_sessions : list of tuples
484
            Each tuple contains the start and end index of a QRS session.
485

486
        Example
487
        -------
488
        >>> waveform = np.sin(np.linspace(0, 2*np.pi, 100))  # Simulated ECG signal
489
        >>> wm = WaveformMorphology(waveform, signal_type="ECG")
490
        >>> r_peaks = PeakDetection(waveform).detect_peaks()
491
        >>> qrs_sessions = wm.detect_qrs_session(r_peaks)
492
        >>> print(f"QRS Sessions: {qrs_sessions}")
493
        """
UNCOV
494
        q_sessions = self.detect_q_session(r_peaks)
×
UNCOV
495
        s_sessions = self.detect_s_session(r_peaks)
×
496

497
        qrs_sessions = [(q[0], s[1]) for q, s in zip(q_sessions, s_sessions)]
×
498
        return qrs_sessions
×
499

500
    def compute_amplitude(self):
1✔
501
        """
502
        Computes the amplitude (max-min) of the waveform, representing the peak-to-peak range.
503

504
        Returns:
505
            float: The amplitude of the waveform.
506

507
        Example:
508
            >>> waveform = [0.5, 0.8, 1.2, 0.9, 0.7]
509
            >>> wm = WaveformMorphology(waveform, signal_type="ECG")
510
            >>> amplitude = wm.compute_amplitude()
511
            >>> print(f"Amplitude: {amplitude}")
512
        """
513
        return np.max(self.waveform) - np.min(self.waveform)
1✔
514

515
    def compute_volume(self, peaks1, peaks2, mode="peak"):
1✔
516
        """
517
        Compute the area under the curve between two sets of peaks.
518

519
        Parameters
520
        ----------
521
        peaks1 : numpy.ndarray
522
            The first set of peaks (e.g., systolic peaks).
523
        peaks2 : numpy.ndarray
524
            The second set of peaks (e.g., diastolic peaks).
525
        mode : str, optional
526
            The type of area computation method ("peak" or "trough"). Default is "peak".
527

528
        Returns
529
        -------
530
        volume : float
531
            The mean area between the two sets of peaks.
532

533
        Examples
534
        --------
535
        >>> signal = np.random.randn(1000)
536
        >>> peaks1 = np.array([100, 200, 300])
537
        >>> peaks2 = np.array([150, 250, 350])
538
        >>> extractor = PhysiologicalFeatureExtractor(signal)
539
        >>> volume = extractor.compute_volume(peaks1, peaks2)
540
        >>> print(volume)
541
        """
542
        if mode == "peak":
1✔
543
            areas = [
1✔
544
                np.trapz(self.waveform[p1:p2])
545
                for p1, p2 in zip(peaks1, peaks2)
546
                if p1 < p2
547
            ]
548
        elif mode == "trough":
1✔
549
            areas = [
1✔
550
                np.trapz(self.waveform[min(p, t) : max(p, t)])
551
                for p, t in zip(peaks1, peaks2)
552
            ]
553
        else:
554
            raise ValueError("Volume mode must be 'peak' or 'trough'.")
1✔
555
        return np.mean(areas) if areas else 0.0
1✔
556

557
    def compute_volume_sequence(self, peaks):
1✔
558
        """
559
        Computes the area under the waveform for a given peak, typically used to analyze
560
        the volume of QRS complexes (ECG) or systolic/diastolic volumes (PPG).
561

562
        Args:
563
            peaks (np.array): Indices of detected peaks.
564

565
        Returns:
566
            float: The computed area (AUC) for the given peaks.
567

568
        Example:
569
            >>> peaks = [1, 5, 9]
570
            >>> volume = wm.compute_volume(peaks)
571
            >>> print(f"Volume: {volume}")
572
        """
573
        auc = 0.0
1✔
574
        for peak in peaks:
1✔
575
            if peak > 0 and peak < len(self.waveform) - 1:
1!
576
                auc += np.trapz(
1✔
577
                    self.waveform[peak - 1 : peak + 2]
578
                )  # AUC for one peak region
579
        return auc
1✔
580

581
    def compute_skewness(self):
1✔
582
        """
583
        Compute the skewness of the signal.
584

585
        Returns
586
        -------
587
        skewness : float
588
            The skewness of the signal.
589

590
        Examples
591
        --------
592
        >>> signal = np.random.randn(1000)
593
        >>> extractor = PhysiologicalFeatureExtractor(signal)
594
        >>> skewness = extractor.compute_skewness()
595
        >>> print(skewness)
596
        """
597
        return skew(self.waveform)
1✔
598

599
    def detect_s_valley(self, r_peaks=None):
1✔
600
        """
601
        Detects the S valleys (local minima after each R peak).
602

603
        Parameters
604
        ----------
605
        r_peaks : list of int
606
            Indices of detected R peaks in the ECG waveform.
607

608
        Returns
609
        -------
610
        s_valleys : list of int
611
            Indices of the S valleys for each R peak.
612
        """
UNCOV
613
        if self.signal_type != "ECG":
×
UNCOV
614
            raise ValueError("Q valleys can only be detected for ECG signals.")
×
UNCOV
615
        if r_peaks is None:
×
UNCOV
616
            detector = PeakDetection(
×
617
                self.waveform,
618
                "ecg_r_peak",
619
                **{
620
                    "distance": 50,
621
                    "window_size": 7,
622
                    "threshold_factor": 1.6,
623
                    "search_window": 6,
624
                }
625
            )
UNCOV
626
            r_peaks = detector.detect_peaks()
×
627

UNCOV
628
        s_valleys = []
×
629

UNCOV
630
        for i, r_peak in enumerate(r_peaks):
×
631
            # Set the start of the search range to be the R peak
UNCOV
632
            search_start = r_peak
×
633

634
            # Determine the end of the search range
UNCOV
635
            if i == len(r_peaks) - 1:
×
636
                # For the last R peak, set the end to the end of the signal or a 200ms window after the R peak
UNCOV
637
                search_end = min(
×
638
                    len(self.waveform) - 1, search_start + int(self.fs * 0.2)
639
                )  # Approx 200ms window
640
            else:
641
                # For other R peaks, set the end as the midpoint to the next R peak
UNCOV
642
                search_end = (r_peak + r_peaks[i + 1]) // 2
×
643

644
            # Ensure the search range is valid
UNCOV
645
            if search_start < search_end:
×
646
                # Extract the signal segment within the search range
UNCOV
647
                signal_segment = self.waveform[search_start:search_end]
×
648

649
                # Detect the S valley as the minimum point in the segment
UNCOV
650
                s_valley_idx = np.argmin(signal_segment) + search_start
×
UNCOV
651
                s_valleys.append(s_valley_idx)
×
652

UNCOV
653
        return s_valleys
×
654

655
    def detect_p_peak(self, r_peaks=None, q_valleys=None):
1✔
656
        """
657
        Detects the P peak (local maximum) in the ECG waveform just before each Q valley.
658

659
        Parameters
660
        ----------
661
        q_valleys : list of int
662
            Indices of detected Q valleys in the ECG waveform.
663

664
        Returns
665
        -------
666
        p_peaks : list of int
667
            Indices of the P peaks (local maximum) for each Q valley.
668
        """
UNCOV
669
        if self.signal_type != "ECG":
×
UNCOV
670
            raise ValueError("P peaks can only be detected for ECG signals.")
×
UNCOV
671
        if r_peaks is None:
×
UNCOV
672
            detector = PeakDetection(
×
673
                self.waveform,
674
                "ecg_r_peak",
675
                **{
676
                    "distance": 50,
677
                    "window_size": 7,
678
                    "threshold_factor": 1.6,
679
                    "search_window": 6,
680
                }
681
            )
UNCOV
682
            r_peaks = detector.detect_peaks()
×
UNCOV
683
        if q_valleys is None:
×
UNCOV
684
            q_valleys = self.detect_q_valley(r_peaks=r_peaks)
×
UNCOV
685
        p_peaks = []
×
686

UNCOV
687
        for i in range(1, len(r_peaks)):
×
688
            # Define search range with midpoint as start and Q valley as end
UNCOV
689
            midpoint = (r_peaks[i - 1] + r_peaks[i]) // 2
×
UNCOV
690
            q_valley = q_valleys[i - 1]  # Corresponding Q valley for the R peak pair
×
691

692
            # Ensure search range has positive length
UNCOV
693
            if midpoint < q_valley:
×
UNCOV
694
                search_start = midpoint
×
UNCOV
695
                search_end = q_valley
×
696
            else:
697
                # Skip if search range is invalid
UNCOV
698
                continue
×
699

700
            # Extract signal segment within search range
UNCOV
701
            signal_segment = self.waveform[search_start:search_end]
×
702

703
            # Detect the maximum in this segment
UNCOV
704
            p_peak_idx = np.argmax(signal_segment) + search_start
×
UNCOV
705
            p_peaks.append(p_peak_idx)
×
706

UNCOV
707
        return p_peaks
×
708

709
    def detect_t_peak(self, r_peaks=None, s_valleys=None):
1✔
710
        """
711
        Detect T peaks (local maxima) between the S valley and the midpoint to the next R peak.
712

713
        Parameters
714
        ----------
715
        r_peaks : list of int
716
            Indices of detected R peaks in the ECG waveform.
717
        s_valleys : list of int
718
            Indices of detected S valleys in the ECG waveform.
719

720
        Returns
721
        -------
722
        t_peaks : list of int
723
            Indices of the T peaks for each S valley.
724
        """
UNCOV
725
        if self.signal_type != "ECG":
×
UNCOV
726
            raise ValueError("T peaks can only be detected for ECG signals.")
×
UNCOV
727
        if r_peaks is None:
×
UNCOV
728
            detector = PeakDetection(
×
729
                self.waveform,
730
                "ecg_r_peak",
731
                **{
732
                    "distance": 50,
733
                    "window_size": 7,
734
                    "threshold_factor": 1.6,
735
                    "search_window": 6,
736
                }
737
            )
UNCOV
738
            r_peaks = detector.detect_peaks()
×
UNCOV
739
        if s_valleys is None:
×
UNCOV
740
            s_valleys = self.detect_s_valley(r_peaks=r_peaks)
×
741

UNCOV
742
        t_peaks = []
×
743

UNCOV
744
        for i in range(len(s_valleys)):
×
745
            # Define S valley as the start of the search range
UNCOV
746
            s_valley = s_valleys[i]
×
747

748
            # Determine the end of the search range
749
            # For the last S valley, restrict the end point within signal bounds
UNCOV
750
            if i < len(r_peaks) - 1:
×
UNCOV
751
                midpoint = (r_peaks[i] + r_peaks[i + 1]) // 2
×
752
            else:
753
                # For the last R peak, limit the midpoint to signal length
754
                midpoint = len(self.waveform) - 1
×
755

756
            # Check if search range is valid
UNCOV
757
            if s_valley < midpoint:
×
UNCOV
758
                search_start = s_valley
×
UNCOV
759
                search_end = midpoint
×
760

761
                # Extract the signal segment within the search range
UNCOV
762
                signal_segment = self.waveform[search_start:search_end]
×
763

764
                # Detect the T peak as the maximum point in the segment
UNCOV
765
                t_peak_idx = np.argmax(signal_segment) + search_start
×
UNCOV
766
                t_peaks.append(t_peak_idx)
×
767

UNCOV
768
        return t_peaks
×
769

770
    def compute_qrs_duration(self):
1✔
771
        """
772
        Computes the duration of the QRS complex in an ECG waveform.
773

774
        Returns:
775
            float: The QRS duration in milliseconds.
776

777
        Example:
778
            >>> ecg_signal = [...]  # Sample ECG signal
779
            >>> wm = WaveformMorphology(ecg_signal, signal_type="ECG")
780
            >>> qrs_duration = wm.compute_qrs_duration()
781
            >>> print(f"QRS Duration: {qrs_duration} ms")
782
        """
783
        if self.signal_type != "ECG":
1✔
784
            raise ValueError("QRS duration can only be computed for ECG signals.")
1✔
785

786
        # Detect R-peaks in the ECG signal
787
        peak_detector = PeakDetection(self.waveform, method="ecg_r_peak")
1✔
788
        r_peaks = peak_detector.detect_peaks()
1✔
789
        if len(r_peaks) == 0:
1✔
790
            return np.nan  # Return NaN if no peaks are detected
1✔
791
        # Detect Q and S points around R peaks
792
        q_points = []
1✔
793
        s_points = []
1✔
794
        for r_peak in r_peaks:
1✔
795
            # Q point detection
796
            q_start = max(0, r_peak - int(self.fs * 0.04))
1✔
797
            q_end = r_peak
1✔
798
            q_segment = self.waveform[q_start:q_end]
1✔
799
            if len(q_segment) > 0:
1!
800
                q_point = np.argmin(q_segment) + q_start
1✔
801
                q_points.append(q_point)
1✔
802
            else:
UNCOV
803
                q_points.append(q_start)
×
804

805
            # S point detection
806
            s_start = r_peak
1✔
807
            s_end = min(len(self.waveform), r_peak + int(self.fs * 0.04))
1✔
808
            s_segment = self.waveform[s_start:s_end]
1✔
809
            if len(s_segment) > 0:
1!
810
                s_point = np.argmin(s_segment) + s_start
1✔
811
                s_points.append(s_point)
1✔
812
            else:
UNCOV
813
                s_points.append(s_end)
×
814

815
        # Compute QRS durations
816
        qrs_durations = [
1✔
817
            (s_points[i] - q_points[i]) / self.fs
818
            for i in range(len(r_peaks))
819
            if s_points[i] > q_points[i]
820
        ]
821
        qrs_duration = np.mean(qrs_durations) if qrs_durations else 0.0
1✔
822

823
        return qrs_duration
1✔
824

825
    def compute_duration(self, peaks1, peaks2, mode):
1✔
826
        """
827
        Compute the mean duration between two sets of peaks.
828

829
        Parameters
830
        ----------
831
        peaks1 : numpy.ndarray
832
            The first set of peaks (e.g., systolic peaks).
833
        peaks2 : numpy.ndarray
834
            The second set of peaks (e.g., diastolic peaks).
835
        mode : str, optional
836
            The type of duration computation method ("peak" or "trough"). Default is "peak".
837

838
        Returns
839
        -------
840
        duration : float
841
            The mean duration between the two sets of peaks in seconds.
842

843
        Examples
844
        --------
845
        >>> peaks1 = np.array([100, 200, 300])
846
        >>> peaks2 = np.array([150, 250, 350])
847
        >>> extractor = PhysiologicalFeatureExtractor(np.random.randn(1000))
848
        >>> duration = extractor.compute_duration(peaks1, peaks2)
849
        >>> print(duration)
850
        """
851
        if mode == "peak":
1✔
852
            durations = [
1✔
853
                (p2 - p1) / self.fs for p1, p2 in zip(peaks1, peaks2) if p1 < p2
854
            ]
855
        elif mode == "trough":
1✔
856
            durations = [(t - p) / self.fs for p, t in zip(peaks1, peaks2)]
1✔
857
        else:
858
            raise ValueError("Duration mode must be 'peak' or 'trough'.")
1✔
859
        return np.mean(durations) if durations else 0.0
1✔
860

861
    def compute_ppg_dicrotic_notch(self):
1✔
862
        """
863
        Detects the dicrotic notch in a PPG waveform and computes its timing.
864

865
        Returns:
866
            float: The timing of the dicrotic notch in milliseconds.
867

868
        Example:
869
            >>> ppg_signal = [...]  # Sample PPG signal
870
            >>> wm = WaveformMorphology(ppg_signal, signal_type="PPG")
871
            >>> notch_timing = wm.compute_ppg_dicrotic_notch()
872
            >>> print(f"Dicrotic Notch Timing: {notch_timing} ms")
873
        """
874
        if self.signal_type != "PPG":
1!
UNCOV
875
            raise ValueError("Dicrotic notch can only be computed for PPG signals.")
×
876

877
        # Detect peaks and dicrotic notches in the PPG signal
878
        peak_detector = PeakDetection(self.waveform, method="ppg_first_derivative")
1✔
879
        systolic_peaks = peak_detector.detect_peaks()
1✔
880

881
        # dicrotic_notch_detector = PeakDetection(
882
        #     self.waveform, method="ppg_second_derivative"
883
        # )
884
        # dicrotic_notches = dicrotic_notch_detector.detect_peaks()
885
        dicrotic_notches = self.detect_notches(systolic_peaks=systolic_peaks)
1✔
886

887
        # Compute the time difference between systolic peak and dicrotic notch
888
        notch_timing = 0.0
1✔
889
        # for peak, notch in zip(systolic_peaks, dicrotic_notches):
890
        #     if notch > peak:
891
        #         notch_timing += (
892
        #             (notch - peak) * 1000 / self.fs
893
        #         )  # Convert to milliseconds
894

895
        # return notch_timing / len(systolic_peaks) if len(systolic_peaks) > 0 else 0.0
896

897
        valid_pairs = 0
1✔
898

899
        for peak, notch in zip(systolic_peaks, dicrotic_notches):
1✔
900
            if notch > peak:
1!
UNCOV
901
                notch_timing += (
×
902
                    (notch - peak) * 1000 / self.fs
903
                )  # Convert to milliseconds
UNCOV
904
                valid_pairs += 1
×
905

906
        return notch_timing / valid_pairs if valid_pairs > 0 else 0.0
1✔
907

908
    def compute_wave_ratio(self, peaks, notch_points):
1✔
909
        """
910
        Computes the ratio of systolic to diastolic volumes in a PPG waveform.
911

912
        Args:
913
            peaks (np.array): Indices of systolic peaks.
914
            notch_points (np.array): Indices of dicrotic notches.
915

916
        Returns:
917
            float: The ratio of systolic to diastolic areas.
918

919
        Example:
920
            >>> systolic_peaks = [5, 15, 25]
921
            >>> dicrotic_notches = [10, 20, 30]
922
            >>> wave_ratio = wm.compute_wave_ratio(systolic_peaks, dicrotic_notches)
923
            >>> print(f"Systolic/Diastolic Ratio: {wave_ratio}")
924
        """
925
        if self.signal_type != "PPG":
1!
UNCOV
926
            raise ValueError("Wave ratio can only be computed for PPG signals.")
×
927

928
        systolic_volume = self.compute_volume(peaks)
1✔
929
        diastolic_volume = self.compute_volume(notch_points)
1✔
930

931
        if diastolic_volume == 0:
1!
UNCOV
932
            return np.inf  # Avoid division by zero
×
933

934
        return systolic_volume / diastolic_volume
1✔
935

936
    def compute_eeg_wavelet_features(self):
1✔
937
        """
938
        Computes EEG wavelet features by applying a wavelet transform and extracting relevant
939
        frequency bands.
940

941
        Returns:
942
            dict: A dictionary of extracted EEG wavelet features (e.g., delta, theta, alpha).
943

944
        Example:
945
            >>> eeg_signal = [...]  # Sample EEG signal
946
            >>> wm = WaveformMorphology(eeg_signal, signal_type="EEG")
947
            >>> wavelet_features = wm.compute_eeg_wavelet_features()
948
            >>> print(wavelet_features)
949
        """
950
        if self.signal_type != "EEG":
1!
UNCOV
951
            raise ValueError("Wavelet features can only be computed for EEG signals.")
×
952

953
        wavelet_coeffs = np.abs(np.convolve(self.waveform, np.ones(10), "same"))
1✔
954

955
        # Extract frequency bands based on wavelet decomposition
956
        delta = wavelet_coeffs[: len(wavelet_coeffs) // 5]
1✔
957
        theta = wavelet_coeffs[len(wavelet_coeffs) // 5 : len(wavelet_coeffs) // 4]
1✔
958
        alpha = wavelet_coeffs[len(wavelet_coeffs) // 4 : len(wavelet_coeffs) // 3]
1✔
959

960
        return {
1✔
961
            "delta_power": np.sum(delta),
962
            "theta_power": np.sum(theta),
963
            "alpha_power": np.sum(alpha),
964
        }
965

966
    def compute_slope(self, peaks1, peaks2, mode="peak"):
1✔
967
        """
968
        Compute the mean slope between two sets of peaks.
969

970
        Parameters
971
        ----------
972
        peaks1 : numpy.ndarray
973
            The first set of peaks.
974
        peaks2 : numpy.ndarray
975
            The second set of peaks.
976
        mode : str, optional
977
        The type of duration computation method ("peak" or "trough"). Default is "peak".
978

979
        Returns
980
        -------
981
        slope : float
982
            The mean slope between the two sets of peaks.
983

984
        Examples
985
        --------
986
        >>> peaks1 = np.array([100, 200, 300])
987
        >>> peaks2 = np.array([150, 250, 350])
988
        >>> extractor = PhysiologicalFeatureExtractor(np.random.randn(1000))
989
        >>> slope = extractor.compute_slope(peaks1, peaks2)
990
        >>> print(slope)
991
        """
UNCOV
992
        if mode == "peak":
×
UNCOV
993
            slopes = [
×
994
                (self.waveform[p2] - self.waveform[p1]) / (p2 - p1)
995
                for p1, p2 in zip(peaks1, peaks2)
996
                if p1 < p2
997
            ]
UNCOV
998
        elif mode == "trough":
×
UNCOV
999
            slopes = [
×
1000
                (self.waveform[p] - self.waveform[t]) / (p - t)
1001
                for p, t in zip(peaks1, peaks2)
1002
            ]
1003
        else:
UNCOV
1004
            raise ValueError("Slope mode must be 'peak' or 'trough'.")
×
UNCOV
1005
        return np.mean(slopes) if slopes else 0.0
×
1006

1007
    def compute_slope_sequence(self):
1✔
1008
        """
1009
        Computes the slope of the waveform by calculating its first derivative.
1010

1011
        Returns:
1012
            float: The average slope of the waveform.
1013

1014
        Example:
1015
            >>> signal = [0.5, 0.7, 1.0, 0.8, 0.6]
1016
            >>> wm = WaveformMorphology(signal)
1017
            >>> slope = wm.compute_slope()
1018
            >>> print(f"Slope: {slope}")
1019
        """
1020
        slope = np.gradient(self.waveform)
1✔
1021
        return np.mean(slope)
1✔
1022

1023
    def compute_curvature(self):
1✔
1024
        """
1025
        Computes the curvature of the waveform by analyzing its second derivative.
1026

1027
        Returns:
1028
            float: The average curvature of the waveform.
1029

1030
        Example:
1031
            >>> signal = [0.5, 0.7, 1.0, 0.8, 0.6]
1032
            >>> wm = WaveformMorphology(signal)
1033
            >>> curvature = wm.compute_curvature()
1034
            >>> print(f"Curvature: {curvature}")
1035
        """
1036
        first_derivative = np.gradient(self.waveform)
1✔
1037
        second_derivative = np.gradient(first_derivative)
1✔
1038
        curvature = np.abs(second_derivative) / (1 + first_derivative**2) ** (3 / 2)
1✔
1039
        return np.mean(curvature)
1✔
1040

1041
# from vitalDSP.utils.synthesize_data import generate_ecg_signal, generate_synthetic_ppg
1042
# from plotly import graph_objects as go
1043

1044
# if __name__ == "__main__":
1045
#     # sfecg = 256
1046
#     # N = 15
1047
#     # Anoise = 0.05
1048
#     # hrmean = 70
1049
#     # ecg_signal = generate_ecg_signal(
1050
#     #     sfecg=sfecg, N=N, Anoise=Anoise, hrmean=hrmean
1051
#     # )
1052

1053
#     # detector = PeakDetection(
1054
#     #     ecg_signal,"ecg_r_peak", **{
1055
#     #         "distance": 50,
1056
#     #         "window_size": 7,
1057
#     #         "threshold_factor":1.6,
1058
#     #         "search_window":6}
1059
#     #     )
1060

1061
#     # rpeaks = detector.detect_peaks()
1062

1063
#     # waveform = WaveformMorphology(ecg_signal, fs=256, signal_type="ECG")
1064
#     # q_valleys = waveform.detect_q_valley()
1065
#     # p_peaks = waveform.detect_p_peak()
1066
#     # s_valleys = waveform.detect_s_valley()
1067
#     # t_peaks = waveform.detect_t_peak()
1068

1069
#     # fig = go.Figure()
1070
#     #     # Plot the ECG signal
1071
#     # fig.add_trace(go.Scatter(x=np.arange(len(ecg_signal)), y=ecg_signal, mode="lines", name="ECG Signal"))
1072

1073
#     # # Plot R-peaks
1074
#     # fig.add_trace(go.Scatter(x=rpeaks, y=ecg_signal[rpeaks], mode="markers", name="R Peaks", marker=dict(color="red", size=8)))
1075
#     # fig.add_trace(go.Scatter(x=q_valleys, y=ecg_signal[q_valleys], mode="markers", name="Q Valleys", marker=dict(color="green", size=8)))
1076
#     # fig.add_trace(go.Scatter(x=s_valleys, y=ecg_signal[s_valleys], mode="markers", name="S Valleys", marker=dict(size=8)))
1077
#     # fig.add_trace(go.Scatter(x=p_peaks, y=ecg_signal[p_peaks], mode="markers", name="P Peaks", marker=dict(size=8)))
1078
#     # fig.add_trace(go.Scatter(x=t_peaks, y=ecg_signal[t_peaks], mode="markers", name="T Peaks", marker=dict(size=8)))
1079
#     # fig.update_layout(
1080
#     #         title="ECG Signal with QRS-peaks/valleys and P, T peaks",
1081
#     #         xaxis_title="Samples",
1082
#     #         yaxis_title="Amplitude",
1083
#     #         showlegend=True
1084
#     # )
1085
#     # fig.show()
1086

1087
#     fs = 100
1088
#     time, ppg_signal = generate_synthetic_ppg(
1089
#         duration=10, sampling_rate=fs, noise_level=0.01, heart_rate=60, display=False
1090
#     )
1091
#     waveform = WaveformMorphology(ppg_signal, fs=fs, signal_type="PPG")
1092
#     detector = PeakDetection(
1093
#         ppg_signal,
1094
#         "ppg_systolic_peaks",
1095
#         **{
1096
#             "distance": 50,
1097
#             "window_size": 7,
1098
#             "threshold_factor": 1.6,
1099
#             "search_window": 6,
1100
#             "fs": fs,
1101
#         }
1102
#     )
1103
#     systolic_peaks = detector.detect_peaks()
1104
#     troughs = waveform.detect_troughs(systolic_peaks=systolic_peaks)
1105
#     notches = waveform.detect_notches(
1106
#         systolic_peaks=systolic_peaks, diastolic_troughs=troughs
1107
#     )
1108
#     diastolic_peaks = waveform.detect_diastolic_peak(
1109
#         notches=notches, diastolic_troughs=troughs
1110
#     )
1111
#     # detector = PeakDetection(
1112
#     #     ppg_signal,"abp_diastolic", **{
1113
#     #         "distance": 50,
1114
#     #         "window_size": 7,
1115
#     #         "threshold_factor":1.6,
1116
#     #         "search_window":6}
1117
#     # )
1118
#     # diastolic_peaks = detector.detect_peaks()
1119

1120
#     fig = go.Figure()
1121
#     fig.add_trace(go.Scatter(x=time, y=ppg_signal, mode="lines", name="PPG Signal"))
1122
#     fig.add_trace(
1123
#         go.Scatter(
1124
#             x=time[systolic_peaks],
1125
#             y=ppg_signal[systolic_peaks],
1126
#             mode="markers",
1127
#             name="Systolic Peaks",
1128
#         )
1129
#     )
1130
#     fig.add_trace(
1131
#         go.Scatter(
1132
#             x=time[troughs],
1133
#             y=ppg_signal[troughs],
1134
#             name="Diastolic Troughs",
1135
#             mode="markers",
1136
#         )
1137
#     )
1138
#     fig.add_trace(
1139
#         go.Scatter(
1140
#             x=time[notches], y=ppg_signal[notches], name="Notches", mode="markers"
1141
#         )
1142
#     )
1143
#     fig.add_trace(
1144
#         go.Scatter(
1145
#             x=time[diastolic_peaks],
1146
#             y=ppg_signal[diastolic_peaks],
1147
#             name="Diastolic Peaks",
1148
#             mode="markers",
1149
#         )
1150
#     )
1151
#     fig.show()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc