• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Oucru-Innovations / vital-DSP / 11571742451

29 Oct 2024 10:15AM UTC coverage: 88.524% (-1.8%) from 90.288%
11571742451

push

github

Koaha
enhance peak detection and session duration

1274 of 1684 branches covered (75.65%)

Branch coverage included in aggregate %.

57 of 179 new or added lines in 1 file covered. (31.84%)

5 existing lines in 2 files now uncovered.

5630 of 6115 relevant lines covered (92.07%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.87
/src/vitalDSP/physiological_features/waveform.py
1
import numpy as np
1✔
2
from vitalDSP.utils.peak_detection import PeakDetection
1✔
3
from scipy.stats import skew
1✔
4

5

6
class WaveformMorphology:
1✔
7
    """
8
    A class for computing morphological features from physiological waveforms (ECG, PPG, EEG).
9

10
    Attributes:
11
        waveform (np.array): The waveform signal (ECG, PPG, EEG).
12
        fs (int): The sampling frequency of the signal in Hz. Default is 1000 Hz.
13
        signal_type (str): The type of signal ('ECG', 'PPG', 'EEG').
14
    """
15

16
    def __init__(self, waveform, fs=256, qrs_ratio=0.05, signal_type="ECG"):
1✔
17
        """
18
        Initializes the WaveformMorphology object.
19

20
        Args:
21
            waveform (np.array): The waveform signal.
22
            fs (int): The sampling frequency of the signal in Hz. Default is 1000 Hz.
23
            signal_type (str): The type of signal (ECG, PPG, EEG).
24
        """
25
        self.waveform = np.array(waveform)
1✔
26
        self.fs = fs  # Sampling frequency
1✔
27
        self.signal_type = signal_type  # 'ECG', 'PPG', 'EEG'
1✔
28
        self.qrs_ratio = qrs_ratio
1✔
29

30
    def detect_troughs(self, systolic_peaks=None):
1✔
31
        """
32
        Detects the troughs (valleys) in the PPG waveform between systolic peaks.
33

34
        Parameters
35
        ----------
36
        systolic_peaks : np.array
37
            Indices of detected systolic peaks in the PPG waveform.
38

39
        Returns
40
        -------
41
        troughs : np.array
42
            Indices of the detected troughs between the systolic peaks.
43

44
        Example
45
        -------
46
        >>> waveform = np.sin(np.linspace(0, 2*np.pi, 100))  # Simulated PPG signal
47
        >>> wm = WaveformMorphology(waveform, signal_type="PPG")
48
        >>> peaks = PeakDetection(waveform).detect_peaks()
49
        >>> troughs = wm.detect_troughs(peaks)
50
        >>> print(f"Troughs: {troughs}")
51
        """
52
        if systolic_peaks is None:
1!
53
            detector = PeakDetection(
×
54
                self.waveform,
55
                "ppg_systolic_peaks",
56
                **{
57
                    "distance": 50,
58
                    "window_size": 7,
59
                    "threshold_factor": 1.6,
60
                    "search_window": 6,
61
                    "fs": self.fs,
62
                }
63
            )
64
            systolic_peaks = detector.detect_peaks()
×
65
        diastolic_troughs = []
1✔
66
        signal_derivative = np.diff(self.waveform)
1✔
67

68
        for i in range(len(systolic_peaks) - 1):
1✔
69
            # Define the search range between two adjacent systolic peaks
70
            peak_start = systolic_peaks[i]
1✔
71
            peak_end = systolic_peaks[i + 1]
1✔
72
            search_start = peak_start + (peak_end - peak_start) // 2
1✔
73
            search_end = peak_end
1✔
74

75
            if search_start >= search_end or search_start < 0:
1!
76
                continue
×
77

78
            # Calculate the adaptive flatness threshold based on the MAD of the derivative within the search range
79
            local_derivative = signal_derivative[search_start:search_end]
1✔
80
            mad_derivative = np.median(
1✔
81
                np.abs(local_derivative - np.median(local_derivative))
82
            )
83
            adaptive_threshold = (
1✔
84
                0.5 * mad_derivative
85
            )  # Flatness threshold set as 50% of local MAD
86

87
            # Identify flat segments based on the adaptive threshold
88
            flat_segment = []
1✔
89
            for j in range(search_start, search_end - 1):
1✔
90
                if abs(signal_derivative[j]) < adaptive_threshold:
1✔
91
                    flat_segment.append(j)
1✔
92

93
            # Find the midpoint of the longest flat segment (if multiple segments are detected)
94
            if flat_segment:
1✔
95
                flat_segment_groups = np.split(
1✔
96
                    flat_segment, np.where(np.diff(flat_segment) != 1)[0] + 1
97
                )
98
                longest_flat_segment = max(flat_segment_groups, key=len)
1✔
99
                trough_index = longest_flat_segment[len(longest_flat_segment) // 2]
1✔
100
                diastolic_troughs.append(trough_index)
1✔
101

102
        return np.array(diastolic_troughs)
1✔
103
        # # Calculate the derivative of the signal for slope information
104
        # signal_derivative = np.diff(self.waveform)
105

106
        # for i in range(len(systolic_peaks) - 1):
107
        #     # Define the search range as the midpoint between two adjacent systolic peaks
108
        #     peak_start = systolic_peaks[i]
109
        #     peak_end = systolic_peaks[i + 1]
110
        #     search_start = peak_start + (peak_end - peak_start) // 2
111
        #     search_end = peak_end
112

113
        #     # Ensure search range is valid
114
        #     if search_start >= search_end or search_start < 0:
115
        #         continue
116

117
        #     # Narrow down to the section within the search range
118
        #     search_section = self.waveform[search_start:search_end]
119
        #     search_derivative = signal_derivative[search_start:search_end-1]
120

121
        #     # Find the index where the slope is closest to zero with a preference for a negative slope
122
        #     candidate_troughs = [
123
        #         idx for idx, slope in enumerate(search_derivative) if slope <= 0
124
        #     ]
125

126
        #     if candidate_troughs:
127
        #         trough_index = candidate_troughs[np.argmin(np.abs(search_derivative[candidate_troughs]))]
128
        #         diastolic_troughs.append(search_start + trough_index)
129
        # return np.array(diastolic_troughs)
130

131
    def detect_notches(self, systolic_peaks=None, diastolic_troughs=None):
1✔
132
        """
133
        Detects the dicrotic notches in a PPG waveform using second derivative.
134

135
        Parameters
136
        ----------
137
        systolic_peaks : np.array
138
            Indices of detected systolic peaks in the PPG waveform.
139
        diastolic_troughs : np.array
140
            Indices of the detected troughs between the systolic peaks.
141

142
        Returns
143
        -------
144
        notches : np.array
145
            Indices of detected dicrotic notches in the PPG waveform.
146

147
        Example
148
        -------
149
        >>> waveform = np.sin(np.linspace(0, 2*np.pi, 100))  # Simulated PPG signal
150
        >>> wm = WaveformMorphology(waveform, signal_type="PPG")
151
        >>> notches = wm.detect_notches()
152
        >>> print(f"Dicrotic Notches: {notches}")
153
        """
154
        if self.signal_type != "PPG":
1✔
155
            raise ValueError("Notches can only be detected for PPG signals.")
1✔
156
        if systolic_peaks is None:
1!
157
            detector = PeakDetection(
1✔
158
                self.waveform,
159
                "ppg_systolic_peaks",
160
                **{
161
                    "distance": 50,
162
                    "window_size": 7,
163
                    "threshold_factor": 1.6,
164
                    "search_window": 6,
165
                    "fs": self.fs,
166
                }
167
            )
168
            systolic_peaks = detector.detect_peaks()
1✔
169
        if diastolic_troughs is None:
1!
170
            diastolic_troughs = self.detect_troughs(systolic_peaks=systolic_peaks)
1✔
171
        notches = []
1✔
172

173
        # Calculate the first and second derivatives of the signal
174
        signal_derivative = np.diff(self.waveform)
1✔
175
        signal_second_derivative = np.diff(signal_derivative)
1✔
176

177
        for i in range(min(len(systolic_peaks), len(diastolic_troughs))):
1!
178
            peak = systolic_peaks[i]
×
179
            trough = diastolic_troughs[i]
×
180

181
            # Define the search range: keep it close to the peak, 10-30% of the distance to the trough
182
            search_start = peak + int(
×
183
                (trough - peak) * 0.1
184
            )  # Start 10% toward the trough
185
            search_end = peak + int((trough - peak) * 0.3)  # End 30% toward the trough
×
186

187
            # Ensure the search range is within bounds
188
            search_start = min(max(search_start, peak), trough)
×
189
            search_end = min(max(search_end, peak), trough)
×
190

191
            # Extract search section in the signal and derivatives
192
            # search_section = self.waveform[search_start:search_end]
193
            search_derivative = signal_derivative[search_start : search_end - 1]
×
194
            search_second_derivative = signal_second_derivative[
×
195
                search_start : search_end - 2
196
            ]
197

198
            # Identify candidate notches with slope close to zero
199
            candidate_notches = [
×
200
                idx
201
                for idx in range(len(search_derivative))
202
                if abs(search_derivative[idx])
203
                < 0.05  # Adjust threshold for "close to zero" slope
204
            ]
205

206
            # Filter candidates where the second derivative indicates leveling (positive values)
207
            candidate_indices = [
×
208
                idx
209
                for idx in candidate_notches
210
                if idx < len(search_second_derivative)
211
                and search_second_derivative[idx] > 0
212
            ]
213

214
            # Select the best candidate notch based on minimum absolute slope
215
            if candidate_indices:
×
216
                best_notch_idx = candidate_indices[
×
217
                    np.argmin(np.abs(search_derivative[candidate_indices]))
218
                ]
219
                notches.append(search_start + best_notch_idx)
×
220

221
        return np.array(notches)
1✔
222

223
    def detect_diastolic_peak(self, notches=None, diastolic_troughs=None):
1✔
224
        """
225
        Detect diastolic peaks in PPG signals based on notches and diastolic troughs.
226

227
        Parameters
228
        ----------
229
        notches : list of int
230
            Indices of detected notches in the PPG signal.
231
        diastolic_troughs : list of int
232
            Indices of diastolic troughs in the PPG signal.
233

234
        Returns
235
        -------
236
        diastolic_peaks : list of int
237
            Indices of diastolic peaks detected in the PPG signal.
238
        """
239
        if diastolic_troughs is None:
×
240
            diastolic_troughs = self.detect_troughs()
×
241
        if notches is None:
×
242
            notches = self.detect_notches(diastolic_troughs=diastolic_troughs)
×
243
        diastolic_peaks = []
×
244

245
        for i in range(len(notches)):
×
246
            notch = notches[i]
×
247
            trough = (
×
248
                diastolic_troughs[i]
249
                if i < len(diastolic_troughs)
250
                else len(self.waveform) - 1
251
            )
252

253
            # Define the initial search range: notch to halfway to the trough
254
            search_start = notch
×
255
            search_end = notch + (trough - notch) // 2
×
256

257
            if search_end <= search_start or search_end > len(self.waveform):
×
258
                diastolic_peaks.append(notch)
×
259
                continue
×
260

261
            search_segment = self.waveform[search_start:search_end]
×
262
            segment_derivative = np.diff(search_segment)
×
263

264
            candidate_peaks = [
×
265
                idx
266
                for idx in range(1, len(segment_derivative))
267
                if segment_derivative[idx - 1] < 0 and segment_derivative[idx] >= 0
268
            ]
269

270
            # Convert candidates to absolute indices and select the most prominent peak if available
271
            if candidate_peaks:
×
272
                candidate_indices = [search_start + idx for idx in candidate_peaks]
×
273
                diastolic_peak_idx = max(
×
274
                    candidate_indices, key=lambda x: self.waveform[x]
275
                )
276
                diastolic_peaks.append(diastolic_peak_idx)
×
277
            else:
278
                # Fallback assignment: if no diastolic peak is detected, use the notch as the diastolic peak
279
                diastolic_peaks.append(notch)
×
280

281
        return diastolic_peaks
×
282

283
    def detect_q_valley(self, r_peaks=None):
1✔
284
        """
285
        Detects the Q valley (local minimum) in the ECG waveform just before each R peak.
286

287
        Parameters
288
        ----------
289
        r_peaks : np.array
290
            Indices of detected R peaks in the ECG waveform.
291

292
        Returns
293
        -------
294
        q_valleys : list of int
295
            Indices of the Q valley (local minimum) for each R peak.
296
        """
297
        if self.signal_type != "ECG":
1!
298
            raise ValueError("Q valleys can only be detected for ECG signals.")
×
299
        if r_peaks is None:
1!
300
            detector = PeakDetection(
×
301
                self.waveform,
302
                "ecg_r_peak",
303
                **{
304
                    "distance": 50,
305
                    "window_size": 7,
306
                    "threshold_factor": 1.6,
307
                    "search_window": 6,
308
                }
309
            )
310
            r_peaks = detector.detect_peaks()
×
311

312
        q_valleys = []
1✔
313
        for i, r_peak in enumerate(r_peaks):
1✔
314
            # Set the end of the search range to be the R peak
315
            search_end = r_peak
1✔
316

317
            # Determine the start of the search range
318
            if i == 0:
1✔
319
                # For the first R peak, start from the beginning of the signal
320
                search_start = max(
1✔
321
                    0, search_end - int(self.fs * self.qrs_ratio)
322
                )  # Approx 200ms window
323
            else:
324
                # For subsequent R peaks, set the start as the midpoint to the previous R peak
325
                # search_start = (r_peaks[i - 1] + r_peak) // 2
326
                search_start = r_peak - int(self.qrs_ratio * self.fs)
1✔
327

328
            # Ensure the search range is valid
329
            if search_start < search_end:
1!
330
                # Extract the signal segment within the search range
331
                signal_segment = self.waveform[search_start:search_end]
1✔
332

333
                # Detect the Q valley as the minimum point in the segment
334
                q_valley_idx = np.argmin(signal_segment) + search_start
1✔
335
                q_valleys.append(q_valley_idx)
1✔
336

337
        return q_valleys
1✔
338

339
    def detect_s_valley(self, r_peaks=None):
1✔
340
        """
341
        Detects the S valleys (local minima after each R peak).
342

343
        Parameters
344
        ----------
345
        r_peaks : list of int
346
            Indices of detected R peaks in the ECG waveform.
347

348
        Returns
349
        -------
350
        s_valleys : list of int
351
            Indices of the S valleys for each R peak.
352
        """
353
        if self.signal_type != "ECG":
1!
354
            raise ValueError("Q valleys can only be detected for ECG signals.")
×
355
        if r_peaks is None:
1!
356
            detector = PeakDetection(
×
357
                self.waveform,
358
                "ecg_r_peak",
359
                **{
360
                    "distance": 50,
361
                    "window_size": 7,
362
                    "threshold_factor": 1.6,
363
                    "search_window": 6,
364
                }
365
            )
366
            r_peaks = detector.detect_peaks()
×
367

368
        s_valleys = []
1✔
369

370
        for i, r_peak in enumerate(r_peaks):
1✔
371
            # Set the start of the search range to be the R peak
372
            search_start = r_peak
1✔
373

374
            # Determine the end of the search range
375
            if i == len(r_peaks) - 1:
1✔
376
                # For the last R peak, set the end to the end of the signal or a 200ms window after the R peak
377
                search_end = min(
1✔
378
                    len(self.waveform) - 1, search_start + int(self.fs * 0.1)
379
                )
380
            else:
381
                # For other R peaks, set the end as the midpoint to the next R peak
382
                # search_end = (r_peak + r_peaks[i + 1]) // 2
383
                search_end = r_peak + int(self.qrs_ratio * self.fs)
1✔
384

385
            # Ensure the search range is valid
386
            if search_start < search_end:
1✔
387
                # Extract the signal segment within the search range
388
                signal_segment = self.waveform[search_start:search_end]
1✔
389

390
                # Detect the S valley as the minimum point in the segment
391
                s_valley_idx = np.argmin(signal_segment) + search_start
1✔
392
                s_valleys.append(s_valley_idx)
1✔
393

394
        return s_valleys
1✔
395

396
    def detect_p_peak(self, r_peaks=None, q_valleys=None):
1✔
397
        """
398
        Detects the P peak (local maximum) in the ECG waveform just before each Q valley.
399

400
        Parameters
401
        ----------
402
        q_valleys : list of int
403
            Indices of detected Q valleys in the ECG waveform.
404

405
        Returns
406
        -------
407
        p_peaks : list of int
408
            Indices of the P peaks (local maximum) for each Q valley.
409
        """
410
        if self.signal_type != "ECG":
×
411
            raise ValueError("P peaks can only be detected for ECG signals.")
×
412
        if r_peaks is None:
×
413
            detector = PeakDetection(
×
414
                self.waveform,
415
                "ecg_r_peak",
416
                **{
417
                    "distance": 50,
418
                    "window_size": 7,
419
                    "threshold_factor": 1.6,
420
                    "search_window": 6,
421
                }
422
            )
423
            r_peaks = detector.detect_peaks()
×
424
        if q_valleys is None:
×
425
            q_valleys = self.detect_q_valley(r_peaks=r_peaks)
×
426
        p_peaks = []
×
427

428
        for i in range(1, len(r_peaks)):
×
429
            # Define search range with midpoint as start and Q valley as end
430
            midpoint = (r_peaks[i - 1] + r_peaks[i]) // 2
×
431

UNCOV
432
            q_valley = q_valleys[i - 1]  # Corresponding Q valley for the R peak pair
×
433
            # Trick to shift the starting point of the search range
NEW
434
            if q_valley < midpoint:
×
NEW
435
                q_valley = q_valleys[i]
×
436

437
            # Ensure search range has positive length
438
            if midpoint < q_valley:
×
439
                search_start = midpoint
×
440
                search_end = q_valley
×
441
            else:
442
                # Skip if search range is invalid
443
                continue
×
444

445
            # Extract signal segment within search range
446
            signal_segment = self.waveform[search_start:search_end]
×
447

448
            # Detect the maximum in this segment
449
            p_peak_idx = np.argmax(signal_segment) + search_start
×
450
            p_peaks.append(p_peak_idx)
×
451

452
        return p_peaks
×
453

454
    def detect_t_peak(self, r_peaks=None, s_valleys=None):
1✔
455
        """
456
        Detect T peaks (local maxima) between the S valley and the midpoint to the next R peak.
457

458
        Parameters
459
        ----------
460
        r_peaks : list of int
461
            Indices of detected R peaks in the ECG waveform.
462
        s_valleys : list of int
463
            Indices of detected S valleys in the ECG waveform.
464

465
        Returns
466
        -------
467
        t_peaks : list of int
468
            Indices of the T peaks for each S valley.
469
        """
470
        if self.signal_type != "ECG":
×
471
            raise ValueError("T peaks can only be detected for ECG signals.")
×
472
        if r_peaks is None:
×
473
            detector = PeakDetection(
×
474
                self.waveform,
475
                "ecg_r_peak",
476
                **{
477
                    "distance": 50,
478
                    "window_size": 7,
479
                    "threshold_factor": 1.6,
480
                    "search_window": 6,
481
                }
482
            )
483
            r_peaks = detector.detect_peaks()
×
484
        if s_valleys is None:
×
485
            s_valleys = self.detect_s_valley(r_peaks=r_peaks)
×
486

487
        t_peaks = []
×
488

489
        for i in range(len(s_valleys)):
×
490
            # Define S valley as the start of the search range
491
            s_valley = s_valleys[i]
×
492

493
            # Determine the end of the search range
494
            # For the last S valley, restrict the end point within signal bounds
495
            if i < len(r_peaks) - 1:
×
496
                midpoint = (r_peaks[i] + r_peaks[i + 1]) // 2
×
497
            else:
498
                # For the last R peak, limit the midpoint to signal length
499
                midpoint = len(self.waveform) - 1
×
500

501
            # Check if search range is valid
502
            if s_valley < midpoint:
×
503
                search_start = s_valley
×
504
                search_end = midpoint
×
505

506
                # Extract the signal segment within the search range
507
                signal_segment = self.waveform[search_start:search_end]
×
508

509
                # Detect the T peak as the maximum point in the segment
510
                t_peak_idx = np.argmax(signal_segment) + search_start
×
511
                t_peaks.append(t_peak_idx)
×
512

513
        return t_peaks
×
514

515
    def detect_q_session(self, p_peaks=None, q_valleys=None, r_peaks=None):
1✔
516
        """
517
        Detects the Q sessions (start and end) in the ECG waveform based on R peaks.
518

519
        Parameters
520
        ----------
521
        r_peaks : np.array
522
            Indices of detected R peaks in the ECG waveform.
523

524
        Returns
525
        -------
526
        q_sessions : list of tuples
527
            Each tuple contains the start and end index of a Q session.
528

529
        Example
530
        -------
531
        >>> waveform = np.sin(np.linspace(0, 2*np.pi, 100))  # Simulated ECG signal
532
        >>> wm = WaveformMorphology(waveform, signal_type="ECG")
533
        >>> r_peaks = PeakDetection(waveform).detect_peaks()
534
        >>> q_sessions = wm.detect_q_session(r_peaks)
535
        >>> print(f"Q Sessions: {q_sessions}")
536
        """
537
        if self.signal_type != "ECG":
1✔
538
            raise ValueError("Q sessions can only be detected for ECG signals.")
1✔
539
        if r_peaks is None:
1!
540
            detector = PeakDetection(
1✔
541
                self.waveform,
542
                "ecg_r_peak",
543
                **{
544
                    "distance": 50,
545
                    "window_size": 7,
546
                    "threshold_factor": 1.6,
547
                    "search_window": 6,
548
                }
549
            )
550
            r_peaks = detector.detect_peaks()
1✔
551
        if q_valleys is None:
1!
552
            q_valleys = self.detect_q_valley(r_peaks=r_peaks)
1✔
553
        if p_peaks is None:
1!
NEW
554
            p_peaks = self.detect_p_peak(r_peaks=r_peaks, q_valleys=q_valleys)
×
555
        q_sessions = []
1✔
556

557
        # Trick to ensure that P peak comes before Q valley at the start
558
        if q_valleys[0] < p_peaks[0]:
1!
559
            q_valleys = q_valleys[
1✔
560
                1:
561
            ]  # Skip the first Q valley if it comes before the first P peak
562
            r_peaks = r_peaks[1:]  # Also skip the corresponding first R peak
1✔
563

564
        # Iterate over detected peaks and valleys
565
        for i in range(len(q_valleys)):
1✔
566
            # Ensure indices are within valid bounds
567
            if i < len(p_peaks) and i < len(r_peaks):
1!
568
                # Define start of Q session as the midpoint between P peak and Q valley
569
                start = (p_peaks[i] + q_valleys[i]) // 2
1✔
570

571
                # Set search range from Q valley to R peak
572
                search_start = q_valleys[i]
1✔
573
                search_end = r_peaks[i]
1✔
574

575
                # Check for valid search range
576
                if search_end > search_start:
1!
577
                    # Find the end of Q session: closest point to start signal value within search range
578
                    start_value = self.waveform[start]
1✔
579
                    end = search_start + np.argmin(
1✔
580
                        np.abs(self.waveform[search_start:search_end] - start_value)
581
                    )
582

583
                    # Append the session to the q_sessions list
584
                    q_sessions.append((start, end))
1✔
585

586
        return q_sessions
1✔
587

588
    def detect_s_session(self, t_peaks=None, s_valleys=None, r_peaks=None):
1✔
589
        """
590
        Detects the S sessions (start and end) in the ECG waveform based on R peaks.
591

592
        Parameters
593
        ----------
594
        r_peaks : np.array
595
            Indices of detected R peaks in the ECG waveform.
596

597
        Returns
598
        -------
599
        s_sessions : list of tuples
600
            Each tuple contains the start and end index of an S session.
601

602
        Example
603
        -------
604
        >>> waveform = np.sin(np.linspace(0, 2*np.pi, 100))  # Simulated ECG signal
605
        >>> wm = WaveformMorphology(waveform, signal_type="ECG")
606
        >>> r_peaks = PeakDetection(waveform).detect_peaks()
607
        >>> s_sessions = wm.detect_s_session(r_peaks)
608
        >>> print(f"S Sessions: {s_sessions}")
609
        """
610
        if self.signal_type != "ECG":
1✔
611
            raise ValueError("S sessions can only be detected for ECG signals.")
1✔
612
        if r_peaks is None:
1!
613
            detector = PeakDetection(
1✔
614
                self.waveform,
615
                "ecg_r_peak",
616
                **{
617
                    "distance": 50,
618
                    "window_size": 7,
619
                    "threshold_factor": 1.6,
620
                    "search_window": 6,
621
                }
622
            )
623
            r_peaks = detector.detect_peaks()
1✔
624
        if s_valleys is None:
1!
625
            s_valleys = self.detect_s_valley(r_peaks=r_peaks)
1✔
626
        if t_peaks is None:
1!
NEW
627
            t_peaks = self.detect_t_peak(r_peaks=r_peaks, s_valleys=s_valleys)
×
628

629
        s_sessions = []
1✔
630
        # Ensure all input arrays are aligned in size
631
        for i in range(1, min(len(r_peaks), len(s_valleys), len(t_peaks))):
1✔
632
            r_peak = r_peaks[i]
1✔
633
            s_valley = s_valleys[i]
1✔
634
            t_peak = t_peaks[i]
1✔
635

636
            # Calculate the midpoint between S valley and T peak for the end point
637
            s_end = s_valley + (t_peak - s_valley) // 2
1✔
638

639
            # Determine the start point within the range from R peak to S valley,
640
            # choosing the point closest in value to s_end
641
            # Ensure the search range is valid and not empty
642
            if r_peak < s_valley:
1!
643
                search_range = self.waveform[r_peak:s_valley]
1✔
644
                if len(search_range) > 0:
1!
645
                    # Find the start point within the search range closest to s_end's value
646
                    start_index_within_range = np.argmin(
1✔
647
                        np.abs(search_range - self.waveform[s_end])
648
                    )
649
                    s_start = r_peak + start_index_within_range
1✔
650

651
                    # Only add the session if s_start is before s_end
652
                    if s_start < s_end:
1!
NEW
653
                        s_sessions.append((s_start, s_end))
×
654

655
        return s_sessions
1✔
656

657
    def detect_r_session(self, rpeaks=None, q_session=None, s_session=None):
1✔
658
        """
659
        Detects the R session (start and end) in the ECG waveform based on Q and S sessions.
660

661
        Parameters
662
        ----------
663
        q_session : tuple
664
            The start and end index of the Q session.
665
        s_session : tuple
666
            The start and end index of the S session.
667

668
        Returns
669
        -------
670
        r_sessions : list of tuples
671
            Each tuple contains the start and end index of an R session.
672
        """
NEW
673
        if q_session is None:
×
NEW
674
            q_session = self.detect_q_session(r_peaks=rpeaks)
×
NEW
675
        if s_session is None:
×
NEW
676
            s_session = self.detect_s_session(r_peaks=rpeaks)
×
677
        # Ensure all input arrays are aligned in size
NEW
678
        if len(q_session) == 0 or len(s_session) == 0:
×
NEW
679
            return []
×
NEW
680
        r_sessions = []
×
681

682
        # Ensure all input arrays are aligned in size and skip the first item to avoid boundary issues
NEW
683
        for i in range(min(len(q_session), len(s_session))):
×
NEW
684
            q_end = q_session[i][1]
×
NEW
685
            s_start = s_session[i][0]
×
686

687
            # Ensure there is a valid interval for the R session
NEW
688
            if q_end < s_start:
×
NEW
689
                r_sessions.append((q_end, s_start))
×
690

NEW
691
        return r_sessions
×
692

693
    def detect_qrs_session(self, rpeaks=None, q_session=None, s_session=None):
1✔
694
        """
695
        Detects the QRS complex sessions (start and end) in the ECG waveform.
696

697
        Parameters
698
        ----------
699
        r_peaks : np.array
700
            Indices of detected R peaks in the ECG waveform.
701

702
        Returns
703
        -------
704
        qrs_sessions : list of tuples
705
            Each tuple contains the start and end index of a QRS session.
706

707
        Example
708
        -------
709
        >>> waveform = np.sin(np.linspace(0, 2*np.pi, 100))  # Simulated ECG signal
710
        >>> wm = WaveformMorphology(waveform, signal_type="ECG")
711
        >>> r_peaks = PeakDetection(waveform).detect_peaks()
712
        >>> qrs_sessions = wm.detect_qrs_session(r_peaks)
713
        >>> print(f"QRS Sessions: {qrs_sessions}")
714
        """
NEW
715
        if q_session is None:
×
NEW
716
            q_session = self.detect_q_session(r_peaks=rpeaks)
×
NEW
717
        if s_session is None:
×
NEW
718
            s_session = self.detect_s_session(r_peaks=rpeaks)
×
719
        # Ensure all input arrays are aligned in size
NEW
720
        if len(q_session) == 0 or len(s_session) == 0:
×
NEW
721
            return []
×
722

NEW
723
        qrs_sessions = [(q[0], s[1]) for q, s in zip(q_session, s_session)]
×
NEW
724
        return qrs_sessions
×
725

726
    def compute_amplitude(self, interval_type="R-to-S", signal_type="ECG"):
1✔
727
        """
728
        Computes the amplitude (max-min) of the waveform for specified intervals.
729

730
        Parameters
731
        ----------
732
        interval_type : str, optional
733
            The interval type to calculate the amplitude for:
734
            - For ECG:
735
            - "R-to-S": Between R peak and S valley.
736
            - "R-to-Q": Between Q valley and R peak.
737
            - "P-to-Q": Between P peak and Q valley.
738
            - "T-to-S": Between S valley and T peak.
739
            - For PPG:
740
            - "Sys-to-Notch": Between systolic peak and dicrotic notch.
741
            - "Notch-to-Dia": Between dicrotic notch and diastolic peak.
742
            - "Sys-to-Dia": Between systolic and diastolic peaks.
743

744
        signal_type : str, optional
745
            The type of signal: "ECG" or "PPG". Default is "ECG".
746

747
        Returns
748
        -------
749
        List[float]: A list of amplitude values for each interval within a single complex.
750

751
        Examples
752
        --------
753
        >>> amplitude_values = wm.compute_amplitude(interval_type="R-to-S", signal_type="ECG")
754
        >>> print(f"Amplitude values for each complex in R-to-S interval: {amplitude_values}")
755
        """
756

NEW
757
        if signal_type == "ECG":
×
758
            # Set peaks and valleys for ECG intervals
NEW
759
            if interval_type == "R-to-S":
×
NEW
760
                peaks = self.r_peaks
×
NEW
761
                valleys = self.s_valleys
×
NEW
762
                require_peak_first = True
×
NEW
763
            elif interval_type == "R-to-Q":
×
NEW
764
                peaks = self.r_peaks
×
NEW
765
                valleys = self.q_valleys
×
NEW
766
                require_peak_first = False
×
NEW
767
            elif interval_type == "P-to-Q":
×
NEW
768
                peaks = self.p_peaks
×
NEW
769
                valleys = self.q_valleys
×
NEW
770
                require_peak_first = True
×
NEW
771
            elif interval_type == "T-to-S":
×
NEW
772
                peaks = self.t_peaks
×
NEW
773
                valleys = self.s_valleys
×
NEW
774
                require_peak_first = False
×
775
            else:
NEW
776
                raise ValueError("Invalid interval_type for ECG.")
×
777

NEW
778
        elif signal_type == "PPG":
×
779
            # Set points for PPG intervals
NEW
780
            if interval_type == "Sys-to-Notch":
×
NEW
781
                peaks = self.sys_peaks
×
NEW
782
                valleys = self.dicrotic_notches
×
NEW
783
                require_peak_first = True
×
NEW
784
            elif interval_type == "Notch-to-Dia":
×
NEW
785
                peaks = self.dicrotic_notches
×
NEW
786
                valleys = self.dia_peaks
×
NEW
787
                require_peak_first = False
×
NEW
788
            elif interval_type == "Sys-to-Dia":
×
NEW
789
                peaks = self.sys_peaks
×
NEW
790
                valleys = self.dia_peaks
×
NEW
791
                require_peak_first = True
×
792
            else:
NEW
793
                raise ValueError("Invalid interval_type for PPG.")
×
794
        else:
NEW
795
            raise ValueError("signal_type must be 'ECG' or 'PPG'.")
×
796

797
        # Compute amplitude for each interval
NEW
798
        amplitudes = []
×
NEW
799
        for peak, valley in zip(peaks, valleys):
×
NEW
800
            if (require_peak_first and peak < valley) or (
×
801
                not require_peak_first and valley < peak
802
            ):
NEW
803
                amplitude = abs(self.waveform[peak] - self.waveform[valley])
×
NEW
804
                amplitudes.append(amplitude)
×
805

NEW
806
        return amplitudes
×
807

808
    def compute_volume(self, interval_type="P-to-T", signal_type="ECG", mode="peak"):
1✔
809
        """
810
        Compute the area under the curve between two sets of peaks and valleys for specified intervals.
811

812
        Parameters
813
        ----------
814
        interval_type : str, optional
815
            The interval type to calculate the volume for:
816
            - For ECG:
817
            - "P-to-T": Entire complex from P peak to T peak.
818
            - "R-to-S": Between R peak and S valley.
819
            - "R-to-Q": Between Q valley and R peak.
820
            - "P-to-Q": Between P peak and Q valley.
821
            - "T-to-S": Between S valley and T peak.
822
            - For PPG:
823
            - "Sys-to-Notch": Between systolic peak and dicrotic notch.
824
            - "Notch-to-Dia": Between dicrotic notch and diastolic peak.
825
            - "Sys-to-Dia": Between systolic and diastolic peaks.
826

827
            Default is "P-to-T" for ECG and "Sys-to-Notch" for PPG.
828

829
        signal_type : str, optional
830
            The type of signal: "ECG" or "PPG". Default is "ECG".
831

832
        mode : str, optional
833
            The area computation method ("peak" or "trough"). Default is "peak".
834
            - "peak": Computes the area under the curve.
835
            - "trough": Computes the area bounded by troughs.
836

837
        Returns
838
        -------
839
        List[float]: A list of volume values, each representing the area for the specified interval
840
                    within a single complex.
841

842
        Examples
843
        --------
844
        >>> volume_values = wm.compute_volume(interval_type="R-to-S", signal_type="ECG")
845
        >>> print(f"Volume values for each complex in R-to-S interval: {volume_values}")
846
        """
847

848
        if signal_type == "ECG":
1!
849
            # Set peaks and valleys for ECG intervals
NEW
850
            if interval_type == "P-to-T":
×
NEW
851
                start_points = self.p_peaks
×
NEW
852
                end_points = self.t_peaks
×
NEW
853
                require_peak_first = True
×
NEW
854
            elif interval_type == "R-to-S":
×
NEW
855
                start_points = self.r_peaks
×
NEW
856
                end_points = self.s_valleys
×
NEW
857
                require_peak_first = True
×
NEW
858
            elif interval_type == "R-to-Q":
×
NEW
859
                start_points = self.q_valleys
×
NEW
860
                end_points = self.r_peaks
×
NEW
861
                require_peak_first = False
×
NEW
862
            elif interval_type == "P-to-Q":
×
NEW
863
                start_points = self.p_peaks
×
NEW
864
                end_points = self.q_valleys
×
NEW
865
                require_peak_first = True
×
NEW
866
            elif interval_type == "T-to-S":
×
NEW
867
                start_points = self.s_valleys
×
NEW
868
                end_points = self.t_peaks
×
NEW
869
                require_peak_first = False
×
870
            else:
NEW
871
                raise ValueError("Invalid interval_type for ECG.")
×
872

NEW
873
        elif signal_type == "PPG":
×
874
            # Set points for PPG intervals
NEW
875
            if interval_type == "Sys-to-Notch":
×
NEW
876
                start_points = self.sys_peaks
×
NEW
877
                end_points = self.dicrotic_notches
×
NEW
878
                require_peak_first = True
×
NEW
879
            elif interval_type == "Notch-to-Dia":
×
NEW
880
                start_points = self.dicrotic_notches
×
NEW
881
                end_points = self.dia_peaks
×
NEW
882
                require_peak_first = False
×
NEW
883
            elif interval_type == "Sys-to-Dia":
×
NEW
884
                start_points = self.sys_peaks
×
NEW
885
                end_points = self.dia_peaks
×
NEW
886
                require_peak_first = True
×
887
            else:
NEW
888
                raise ValueError("Invalid interval_type for PPG.")
×
889
        else:
NEW
890
            raise ValueError("signal_type must be 'ECG' or 'PPG'.")
×
891

892
        # Compute area for each interval
NEW
893
        volumes = []
×
NEW
894
        for start, end in zip(start_points, end_points):
×
NEW
895
            if (require_peak_first and start < end) or (
×
896
                not require_peak_first and end < start
897
            ):
NEW
898
                if mode == "peak":
×
NEW
899
                    volume = np.trapz(
×
900
                        self.waveform[start : end + 1]
901
                    )  # Integrate over interval
NEW
902
                elif mode == "trough":
×
NEW
903
                    volume = np.trapz(
×
904
                        self.waveform[min(start, end) : max(start, end) + 1]
905
                    )
906
                else:
NEW
907
                    raise ValueError("Volume mode must be 'peak' or 'trough'.")
×
NEW
908
                volumes.append(volume)
×
909

NEW
910
        return volumes
×
911

912
    def compute_skewness(self, signal_type="ECG"):
1✔
913
        """
914
        Compute the skewness of each complex in the signal.
915

916
        Parameters
917
        ----------
918
        signal_type : str, optional
919
            The type of signal, either "ECG" or "PPG". Default is "ECG".
920

921
        Returns
922
        -------
923
        List[float]: A list of skewness values, one for each complex.
924

925
        Examples
926
        --------
927
        >>> signal = np.random.randn(1000)
928
        >>> extractor = PhysiologicalFeatureExtractor(signal)
929
        >>> skewness_values = extractor.compute_skewness()
930
        >>> print(f"Skewness values for each complex: {skewness_values}")
931
        """
932

933
        # Define complex points based on signal type
934
        if signal_type == "ECG":
1!
935
            starts = self.p_peaks
1✔
NEW
936
            ends = self.t_peaks
×
NEW
937
        elif signal_type == "PPG":
×
NEW
938
            starts = self.sys_peaks
×
NEW
939
            ends = self.dia_peaks
×
940
        else:
NEW
941
            raise ValueError("signal_type must be 'ECG' or 'PPG'.")
×
942

943
        # Compute skewness for each complex
NEW
944
        skewness_values = []
×
NEW
945
        for start, end in zip(starts, ends):
×
946
            # Ensure valid intervals
NEW
947
            if end > start:
×
NEW
948
                complex_segment = self.waveform[start : end + 1]
×
NEW
949
                skewness_values.append(skew(complex_segment))
×
950

NEW
951
        return skewness_values
×
952

953
    def compute_qrs_duration(self):
1✔
954
        """
955
        Computes the duration of the QRS complex in an ECG waveform.
956

957
        Returns:
958
            float: The QRS duration in milliseconds.
959

960
        Example:
961
            >>> ecg_signal = [...]  # Sample ECG signal
962
            >>> wm = WaveformMorphology(ecg_signal, signal_type="ECG")
963
            >>> qrs_duration = wm.compute_qrs_duration()
964
            >>> print(f"QRS Duration: {qrs_duration} ms")
965
        """
966
        if self.signal_type != "ECG":
1✔
967
            raise ValueError("QRS duration can only be computed for ECG signals.")
1✔
968

969
        # Detect R-peaks in the ECG signal
970
        peak_detector = PeakDetection(self.waveform, method="ecg_r_peak")
1✔
971
        r_peaks = peak_detector.detect_peaks()
1✔
972
        if len(r_peaks) == 0:
1✔
973
            return np.nan  # Return NaN if no peaks are detected
1✔
974
        # Detect Q and S points around R peaks
975
        q_points = []
1✔
976
        s_points = []
1✔
977
        for r_peak in r_peaks:
1✔
978
            # Q point detection
979
            q_start = max(0, r_peak - int(self.fs * 0.04))
1✔
980
            q_end = r_peak
1✔
981
            q_segment = self.waveform[q_start:q_end]
1✔
982
            if len(q_segment) > 0:
1!
983
                q_point = np.argmin(q_segment) + q_start
1✔
984
                q_points.append(q_point)
1✔
985
            else:
986
                q_points.append(q_start)
×
987

988
            # S point detection
989
            s_start = r_peak
1✔
990
            s_end = min(len(self.waveform), r_peak + int(self.fs * 0.04))
1✔
991
            s_segment = self.waveform[s_start:s_end]
1✔
992
            if len(s_segment) > 0:
1!
993
                s_point = np.argmin(s_segment) + s_start
1✔
994
                s_points.append(s_point)
1✔
995
            else:
996
                s_points.append(s_end)
×
997

998
        # Compute QRS durations
999
        qrs_durations = [
1✔
1000
            (s_points[i] - q_points[i]) / self.fs
1001
            for i in range(len(r_peaks))
1002
            if s_points[i] > q_points[i]
1003
        ]
1004
        qrs_duration = np.mean(qrs_durations) if qrs_durations else 0.0
1✔
1005

1006
        return qrs_duration
1✔
1007

1008
    def compute_duration(self, peaks1, peaks2, mode):
1✔
1009
        """
1010
        Compute the mean duration between two sets of peaks.
1011

1012
        Parameters
1013
        ----------
1014
        peaks1 : numpy.ndarray
1015
            The first set of peaks (e.g., systolic peaks).
1016
        peaks2 : numpy.ndarray
1017
            The second set of peaks (e.g., diastolic peaks).
1018
        mode : str, optional
1019
            The type of duration computation method ("peak" or "trough"). Default is "peak".
1020

1021
        Returns
1022
        -------
1023
        duration : float
1024
            The mean duration between the two sets of peaks in seconds.
1025

1026
        Examples
1027
        --------
1028
        >>> peaks1 = np.array([100, 200, 300])
1029
        >>> peaks2 = np.array([150, 250, 350])
1030
        >>> extractor = PhysiologicalFeatureExtractor(np.random.randn(1000))
1031
        >>> duration = extractor.compute_duration(peaks1, peaks2)
1032
        >>> print(duration)
1033
        """
1034
        if mode == "peak":
1✔
1035
            durations = [
1✔
1036
                (p2 - p1) / self.fs for p1, p2 in zip(peaks1, peaks2) if p1 < p2
1037
            ]
1038
        elif mode == "trough":
1✔
1039
            durations = [(t - p) / self.fs for p, t in zip(peaks1, peaks2)]
1✔
1040
        else:
1041
            raise ValueError("Duration mode must be 'peak' or 'trough'.")
1✔
1042
        return np.mean(durations) if durations else 0.0
1✔
1043

1044
    def compute_ppg_dicrotic_notch(self):
1✔
1045
        """
1046
        Detects the dicrotic notch in a PPG waveform and computes its timing.
1047

1048
        Returns:
1049
            float: The timing of the dicrotic notch in milliseconds.
1050

1051
        Example:
1052
            >>> ppg_signal = [...]  # Sample PPG signal
1053
            >>> wm = WaveformMorphology(ppg_signal, signal_type="PPG")
1054
            >>> notch_timing = wm.compute_ppg_dicrotic_notch()
1055
            >>> print(f"Dicrotic Notch Timing: {notch_timing} ms")
1056
        """
1057
        if self.signal_type != "PPG":
1!
1058
            raise ValueError("Dicrotic notch can only be computed for PPG signals.")
×
1059

1060
        # Detect peaks and dicrotic notches in the PPG signal
1061
        peak_detector = PeakDetection(self.waveform, method="ppg_first_derivative")
1✔
1062
        systolic_peaks = peak_detector.detect_peaks()
1✔
1063

1064
        # dicrotic_notch_detector = PeakDetection(
1065
        #     self.waveform, method="ppg_second_derivative"
1066
        # )
1067
        # dicrotic_notches = dicrotic_notch_detector.detect_peaks()
1068
        dicrotic_notches = self.detect_notches(systolic_peaks=systolic_peaks)
1✔
1069

1070
        # Compute the time difference between systolic peak and dicrotic notch
1071
        notch_timing = 0.0
1✔
1072
        # for peak, notch in zip(systolic_peaks, dicrotic_notches):
1073
        #     if notch > peak:
1074
        #         notch_timing += (
1075
        #             (notch - peak) * 1000 / self.fs
1076
        #         )  # Convert to milliseconds
1077

1078
        # return notch_timing / len(systolic_peaks) if len(systolic_peaks) > 0 else 0.0
1079

1080
        valid_pairs = 0
1✔
1081

1082
        for peak, notch in zip(systolic_peaks, dicrotic_notches):
1✔
1083
            if notch > peak:
1!
1084
                notch_timing += (
×
1085
                    (notch - peak) * 1000 / self.fs
1086
                )  # Convert to milliseconds
1087
                valid_pairs += 1
×
1088

1089
        return notch_timing / valid_pairs if valid_pairs > 0 else 0.0
1✔
1090

1091
    def compute_wave_ratio(self, peaks, notch_points):
1✔
1092
        """
1093
        Computes the ratio of systolic to diastolic volumes in a PPG waveform.
1094

1095
        Args:
1096
            peaks (np.array): Indices of systolic peaks.
1097
            notch_points (np.array): Indices of dicrotic notches.
1098

1099
        Returns:
1100
            float: The ratio of systolic to diastolic areas.
1101

1102
        Example:
1103
            >>> systolic_peaks = [5, 15, 25]
1104
            >>> dicrotic_notches = [10, 20, 30]
1105
            >>> wave_ratio = wm.compute_wave_ratio(systolic_peaks, dicrotic_notches)
1106
            >>> print(f"Systolic/Diastolic Ratio: {wave_ratio}")
1107
        """
1108
        if self.signal_type != "PPG":
1!
1109
            raise ValueError("Wave ratio can only be computed for PPG signals.")
×
1110

1111
        systolic_volume = self.compute_volume(peaks)
1✔
1112
        diastolic_volume = self.compute_volume(notch_points)
1✔
1113

1114
        if diastolic_volume == 0:
1!
1115
            return np.inf  # Avoid division by zero
×
1116

1117
        return systolic_volume / diastolic_volume
1✔
1118

1119
    def compute_eeg_wavelet_features(self):
1✔
1120
        """
1121
        Computes EEG wavelet features by applying a wavelet transform and extracting relevant
1122
        frequency bands.
1123

1124
        Returns:
1125
            dict: A dictionary of extracted EEG wavelet features (e.g., delta, theta, alpha).
1126

1127
        Example:
1128
            >>> eeg_signal = [...]  # Sample EEG signal
1129
            >>> wm = WaveformMorphology(eeg_signal, signal_type="EEG")
1130
            >>> wavelet_features = wm.compute_eeg_wavelet_features()
1131
            >>> print(wavelet_features)
1132
        """
1133
        if self.signal_type != "EEG":
1!
1134
            raise ValueError("Wavelet features can only be computed for EEG signals.")
×
1135

1136
        wavelet_coeffs = np.abs(np.convolve(self.waveform, np.ones(10), "same"))
1✔
1137

1138
        # Extract frequency bands based on wavelet decomposition
1139
        delta = wavelet_coeffs[: len(wavelet_coeffs) // 5]
1✔
1140
        theta = wavelet_coeffs[len(wavelet_coeffs) // 5 : len(wavelet_coeffs) // 4]
1✔
1141
        alpha = wavelet_coeffs[len(wavelet_coeffs) // 4 : len(wavelet_coeffs) // 3]
1✔
1142

1143
        return {
1✔
1144
            "delta_power": np.sum(delta),
1145
            "theta_power": np.sum(theta),
1146
            "alpha_power": np.sum(alpha),
1147
        }
1148

1149
    def compute_slope(self, peaks1, peaks2, mode="peak"):
1✔
1150
        """
1151
        Compute the mean slope between two sets of peaks.
1152

1153
        Parameters
1154
        ----------
1155
        peaks1 : numpy.ndarray
1156
            The first set of peaks.
1157
        peaks2 : numpy.ndarray
1158
            The second set of peaks.
1159
        mode : str, optional
1160
        The type of duration computation method ("peak" or "trough"). Default is "peak".
1161

1162
        Returns
1163
        -------
1164
        slope : float
1165
            The mean slope between the two sets of peaks.
1166

1167
        Examples
1168
        --------
1169
        >>> peaks1 = np.array([100, 200, 300])
1170
        >>> peaks2 = np.array([150, 250, 350])
1171
        >>> extractor = PhysiologicalFeatureExtractor(np.random.randn(1000))
1172
        >>> slope = extractor.compute_slope(peaks1, peaks2)
1173
        >>> print(slope)
1174
        """
1175
        if mode == "peak":
×
1176
            slopes = [
×
1177
                (self.waveform[p2] - self.waveform[p1]) / (p2 - p1)
1178
                for p1, p2 in zip(peaks1, peaks2)
1179
                if p1 < p2
1180
            ]
1181
        elif mode == "trough":
×
1182
            slopes = [
×
1183
                (self.waveform[p] - self.waveform[t]) / (p - t)
1184
                for p, t in zip(peaks1, peaks2)
1185
            ]
1186
        else:
1187
            raise ValueError("Slope mode must be 'peak' or 'trough'.")
×
1188
        return np.mean(slopes) if slopes else 0.0
×
1189

1190
    def compute_curvature(self):
1✔
1191
        """
1192
        Computes the curvature of the waveform by analyzing its second derivative.
1193

1194
        Returns:
1195
            float: The average curvature of the waveform.
1196

1197
        Example:
1198
            >>> signal = [0.5, 0.7, 1.0, 0.8, 0.6]
1199
            >>> wm = WaveformMorphology(signal)
1200
            >>> curvature = wm.compute_curvature()
1201
            >>> print(f"Curvature: {curvature}")
1202
        """
1203
        first_derivative = np.gradient(self.waveform)
1✔
1204
        second_derivative = np.gradient(first_derivative)
1✔
1205
        curvature = np.abs(second_derivative) / (1 + first_derivative**2) ** (3 / 2)
1✔
1206
        return np.mean(curvature)
1✔
1207

1208

1209
# from vitalDSP.utils.synthesize_data import generate_ecg_signal, generate_synthetic_ppg
1210
# from plotly import graph_objects as go
1211
# from vitalDSP.notebooks import process_in_chunks
1212
# import os
1213

1214
# if __name__ == "__main__":
1215
#     # Function to plot Q sessions
1216
#     sfecg = 256
1217
#     N = 15
1218
#     Anoise = 0.05
1219
#     hrmean = 70
1220
#     fs = 256
1221
#     duration = 60
1222
#     step_size = fs * 20
1223
#     FILE_PATH = os.path.join("sample_data", "public", "ecg_small.csv")
1224
#     ecg_signal, date_col = process_in_chunks(FILE_PATH, data_type='ecg', fs=fs)
1225
#     ecg_signal = np.array(ecg_signal)
1226

1227
#     # ecg_signal = generate_ecg_signal(
1228
#     #     sfecg=sfecg, N=N, Anoise=Anoise, hrmean=hrmean
1229
#     # )
1230

1231
#     detector = PeakDetection(
1232
#         ecg_signal,"ecg_r_peak", **{
1233
#             "distance": 50,
1234
#             "window_size": 7,
1235
#             "threshold_factor":1.6,
1236
#             "search_window":6}
1237
#         )
1238

1239
#     rpeaks = detector.detect_peaks()
1240

1241
#     waveform = WaveformMorphology(ecg_signal, fs=256, signal_type="ECG")
1242
#     q_valleys = waveform.detect_q_valley(r_peaks=rpeaks)
1243
#     p_peaks = waveform.detect_p_peak(r_peaks=rpeaks,q_valleys=q_valleys)
1244
#     s_valleys = waveform.detect_s_valley(r_peaks=rpeaks)
1245
#     t_peaks = waveform.detect_t_peak(r_peaks=rpeaks, s_valleys=s_valleys)
1246
#     q_sessions = waveform.detect_q_session(p_peaks, q_valleys, rpeaks)
1247
#     s_sessions = waveform.detect_s_session(t_peaks, s_valleys, rpeaks)
1248

1249
#     fig = go.Figure()
1250
#         # Plot the ECG signal
1251
#     fig.add_trace(go.Scatter(x=np.arange(len(ecg_signal)), y=ecg_signal, mode="lines", name="ECG Signal"))
1252

1253
#     # Plot R-peaks
1254
#     fig.add_trace(go.Scatter(x=rpeaks, y=ecg_signal[rpeaks], mode="markers", name="R Peaks", marker=dict(color="red", size=8)))
1255
#     fig.add_trace(go.Scatter(x=q_valleys, y=ecg_signal[q_valleys], mode="markers", name="Q Valleys", marker=dict(color="green", size=8)))
1256
#     fig.add_trace(go.Scatter(x=s_valleys, y=ecg_signal[s_valleys], mode="markers", name="S Valleys", marker=dict(size=8)))
1257
#     fig.add_trace(go.Scatter(x=p_peaks, y=ecg_signal[p_peaks], mode="markers", name="P Peaks", marker=dict(size=8)))
1258
#     fig.add_trace(go.Scatter(x=t_peaks, y=ecg_signal[t_peaks], mode="markers", name="T Peaks", marker=dict(size=8)))
1259

1260
#     # Define margin for highlighting
1261
#     margin = 10  # Adjust this value as needed for visual spacing
1262
#     # Highlight each Q session with lines above and below
1263
#     for (start, end) in q_sessions:
1264
#         x_vals = np.arange(start, end + 1)
1265
#         y_vals = ecg_signal[start:end + 1]
1266

1267
#         # Upper boundary trace
1268
#         fig.add_trace(go.Scatter(
1269
#             x=x_vals,
1270
#             y=y_vals + margin,  # Shift upward by margin
1271
#             mode="lines",
1272
#             line=dict(color="mediumaquamarine", dash="solid"),
1273
#             showlegend=False
1274
#         ))
1275

1276
#         # Lower boundary trace
1277
#         fig.add_trace(go.Scatter(
1278
#             x=x_vals,
1279
#             y=y_vals - margin,  # Shift downward by margin
1280
#             mode="lines",
1281
#             line=dict(color="mediumaquamarine", dash="solid"),
1282
#             showlegend=False
1283
#         ))
1284

1285
#     for (start, end) in s_sessions:
1286
#         x_vals = np.arange(start, end + 1)
1287
#         y_vals = ecg_signal[start:end + 1]
1288

1289
#         # Upper boundary trace
1290
#         fig.add_trace(go.Scatter(
1291
#             x=x_vals,
1292
#             y=y_vals + margin,  # Shift upward by margin
1293
#             mode="lines",
1294
#             line=dict(color="seagreen", dash="solid"),
1295
#             showlegend=False
1296
#         ))
1297

1298
#         # Lower boundary trace
1299
#         fig.add_trace(go.Scatter(
1300
#             x=x_vals,
1301
#             y=y_vals - margin,  # Shift downward by margin
1302
#             mode="lines",
1303
#             line=dict(color="seagreen", dash="solid"),
1304
#             showlegend=False
1305
#         ))
1306

1307
#     fig.update_layout(
1308
#             title="ECG Signal with QRS-peaks/valleys and P, T peaks",
1309
#             xaxis_title="Samples",
1310
#             yaxis_title="Amplitude",
1311
#             showlegend=True
1312
#     )
1313
#     fig.show()
1314

1315
#     # fs = 100
1316
#     # time, ppg_signal = generate_synthetic_ppg(
1317
#     #     duration=10, sampling_rate=fs, noise_level=0.01, heart_rate=60, display=False
1318
#     # )
1319
#     # waveform = WaveformMorphology(ppg_signal, fs=fs, signal_type="PPG")
1320
#     # detector = PeakDetection(
1321
#     #     ppg_signal,
1322
#     #     "ppg_systolic_peaks",
1323
#     #     **{
1324
#     #         "distance": 50,
1325
#     #         "window_size": 7,
1326
#     #         "threshold_factor": 1.6,
1327
#     #         "search_window": 6,
1328
#     #         "fs": fs,
1329
#     #     }
1330
#     # )
1331
#     # systolic_peaks = detector.detect_peaks()
1332
#     # troughs = waveform.detect_troughs(systolic_peaks=systolic_peaks)
1333
#     # notches = waveform.detect_notches(
1334
#     #     systolic_peaks=systolic_peaks, diastolic_troughs=troughs
1335
#     # )
1336
#     # diastolic_peaks = waveform.detect_diastolic_peak(
1337
#     #     notches=notches, diastolic_troughs=troughs
1338
#     # )
1339
#     # # detector = PeakDetection(
1340
#     # #     ppg_signal,"abp_diastolic", **{
1341
#     # #         "distance": 50,
1342
#     # #         "window_size": 7,
1343
#     # #         "threshold_factor":1.6,
1344
#     # #         "search_window":6}
1345
#     # # )
1346
#     # # diastolic_peaks = detector.detect_peaks()
1347

1348
#     # fig = go.Figure()
1349
#     # fig.add_trace(go.Scatter(x=time, y=ppg_signal, mode="lines", name="PPG Signal"))
1350
#     # fig.add_trace(
1351
#     #     go.Scatter(
1352
#     #         x=time[systolic_peaks],
1353
#     #         y=ppg_signal[systolic_peaks],
1354
#     #         mode="markers",
1355
#     #         name="Systolic Peaks",
1356
#     #     )
1357
#     # )
1358
#     # fig.add_trace(
1359
#     #     go.Scatter(
1360
#     #         x=time[troughs],
1361
#     #         y=ppg_signal[troughs],
1362
#     #         name="Diastolic Troughs",
1363
#     #         mode="markers",
1364
#     #     )
1365
#     # )
1366
#     # fig.add_trace(
1367
#     #     go.Scatter(
1368
#     #         x=time[notches], y=ppg_signal[notches], name="Notches", mode="markers"
1369
#     #     )
1370
#     # )
1371
#     # fig.add_trace(
1372
#     #     go.Scatter(
1373
#     #         x=time[diastolic_peaks],
1374
#     #         y=ppg_signal[diastolic_peaks],
1375
#     #         name="Diastolic Peaks",
1376
#     #         mode="markers",
1377
#     #     )
1378
#     # )
1379
#     # fig.show()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc