• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Oucru-Innovations / vital-DSP / 11545366242

28 Oct 2024 12:54AM UTC coverage: 90.288% (-0.8%) from 91.103%
11545366242

push

github

Koaha
fix broken workflow

1263 of 1606 branches covered (78.64%)

Branch coverage included in aggregate %.

5 of 12 new or added lines in 2 files covered. (41.67%)

143 existing lines in 3 files now uncovered.

5607 of 6003 relevant lines covered (93.4%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.94
/src/vitalDSP/feature_engineering/morphology_features.py
1
import numpy as np
1✔
2
from scipy.stats import linregress
1✔
3
from vitalDSP.utils.peak_detection import PeakDetection
1✔
4
from vitalDSP.preprocess.preprocess_operations import (
1✔
5
    PreprocessConfig,
6
    preprocess_signal,
7
)
8
from vitalDSP.physiological_features.waveform import WaveformMorphology
1✔
9
import warnings
1✔
10

11

12
class PhysiologicalFeatureExtractor:
1✔
13
    """
14
    A class to extract various physiological features from ECG and PPG signals, such as durations,
15
    areas, amplitude variability, slope ratios, and dicrotic notch locations.
16

17
    Features for PPG:
18
    - Systolic/diastolic duration, area, amplitude variability
19
    - Signal skewness, slope, peak trends, and dicrotic notch locations
20

21
    Features for ECG:
22
    - QRS duration, area, T-wave area
23
    - Amplitude variability, QRS-T ratios, and QRS slope
24
    - Signal skewness, peak trends
25

26
    Methods
27
    -------
28
    preprocess_signal(preprocess_config)
29
        Preprocess the signal by applying bandpass filtering and noise reduction.
30
    extract_features(signal_type='ECG', preprocess_config=None)
31
        Extract all features (morphology, volume, amplitude variability, dicrotic notch) for ECG or PPG signals.
32
    """
33

34
    def __init__(self, signal, fs=1000):
1✔
35
        """
36
        Initialize the PhysiologicalFeatureExtractor class.
37

38
        Parameters
39
        ----------
40
        signal : numpy.ndarray
41
            The input physiological signal (ECG or PPG).
42
        fs : int, optional
43
            Sampling frequency in Hz. Default is 1000.
44
        """
45
        self.signal = np.asarray(signal, dtype=np.float64)
1✔
46
        self.fs = fs
1✔
47

48
    def detect_troughs(self, peaks):
1✔
49
        """
50
        Detect troughs (valleys) in the signal based on the given peaks.
51

52
        Parameters
53
        ----------
54
        peaks : numpy.ndarray
55
            The indices of detected peaks.
56

57
        Returns
58
        -------
59
        troughs : numpy.ndarray
60
            The indices of detected troughs.
61

62
        Examples
63
        --------
64
        >>> peaks = np.array([100, 200, 300])
65
        >>> troughs = extractor.detect_troughs(peaks)
66
        >>> print(troughs)
67
        """
UNCOV
68
        warnings.warn(
×
69
            "Deprecated. Please use vitalDSP.physiological_features.waveform.WaveformMorphology instead.",
70
            DeprecationWarning,
71
        )
UNCOV
72
        troughs = []
×
UNCOV
73
        for i in range(len(peaks) - 1):
×
74
            # Find the local minimum between two consecutive peaks
UNCOV
75
            segment = self.signal[peaks[i] : peaks[i + 1]]
×
UNCOV
76
            trough = np.argmin(segment) + peaks[i]
×
UNCOV
77
            troughs.append(trough)
×
UNCOV
78
        return np.array(troughs)
×
79

80
    def compute_peak_trend(self, peaks):
1✔
81
        """
82
        Compute the trend slope of peak amplitudes over time.
83

84
        Parameters
85
        ----------
86
        peaks : numpy.ndarray
87
            The set of peaks.
88

89
        Returns
90
        -------
91
        trend_slope : float
92
            The slope of the peak amplitude trend over time.
93

94
        Examples
95
        --------
96
        >>> signal = np.random.randn(1000)
97
        >>> peaks = np.array([100, 200, 300])
98
        >>> extractor = PhysiologicalFeatureExtractor(signal)
99
        >>> trend_slope = extractor.compute_peak_trend(peaks)
100
        >>> print(trend_slope)
101
        """
102
        amplitudes = self.signal[peaks]
1✔
103
        if len(peaks) > 1:
1✔
104
            slope, _, _, _, _ = linregress(peaks, amplitudes)
1✔
105
            return slope
1✔
106
        return 0.0
1✔
107

108
    def compute_amplitude_variability(self, peaks):
1✔
109
        """
110
        Compute the variability of the amplitudes at the given peak locations.
111

112
        Parameters
113
        ----------
114
        peaks : numpy.ndarray
115
            The set of peaks.
116

117
        Returns
118
        -------
119
        variability : float
120
            The amplitude variability (standard deviation of the peak amplitudes).
121

122
        Examples
123
        --------
124
        >>> signal = np.random.randn(1000)
125
        >>> peaks = np.array([100, 200, 300])
126
        >>> extractor = PhysiologicalFeatureExtractor(signal)
127
        >>> variability = extractor.compute_amplitude_variability(peaks)
128
        >>> print(variability)
129
        """
130
        amplitudes = self.signal[peaks]
1✔
131
        return np.std(amplitudes) if len(amplitudes) > 1 else 0.0
1✔
132

133
    def get_preprocess_signal(self, preprocess_config):
1✔
134
        """
135
        Preprocess the signal by applying bandpass filtering and noise reduction.
136

137
        Parameters
138
        ----------
139
        preprocess_config : PreprocessConfig
140
            Configuration for both signal filtering and artifact removal.
141

142
        Returns
143
        -------
144
        clean_signal : numpy.ndarray
145
            The preprocessed signal, cleaned of noise and artifacts.
146

147
        Examples
148
        --------
149
        >>> signal = np.sin(np.linspace(0, 10, 1000)) + np.random.normal(0, 0.1, 1000)
150
        >>> preprocess_config = PreprocessConfig()
151
        >>> extractor = PhysiologicalFeatureExtractor(signal, fs=1000)
152
        >>> preprocessed_signal = extractor.preprocess_signal(preprocess_config)
153
        >>> print(preprocessed_signal)
154
        """
155
        return preprocess_signal(
1✔
156
            signal=self.signal,
157
            sampling_rate=self.fs,
158
            filter_type=preprocess_config.filter_type,
159
            lowcut=preprocess_config.lowcut,
160
            highcut=preprocess_config.highcut,
161
            order=preprocess_config.order,
162
            noise_reduction_method=preprocess_config.noise_reduction_method,
163
            wavelet_name=preprocess_config.wavelet_name,
164
            level=preprocess_config.level,
165
            window_length=preprocess_config.window_length,
166
            polyorder=preprocess_config.polyorder,
167
            kernel_size=preprocess_config.kernel_size,
168
            sigma=preprocess_config.sigma,
169
            respiratory_mode=preprocess_config.respiratory_mode,
170
        )
171

172
    def extract_features(self, signal_type="ECG", preprocess_config=None):
1✔
173
        """
174
        Extract all physiological features from the signal for either ECG or PPG.
175

176
        Parameters
177
        ----------
178
        signal_type : str, optional
179
            The type of signal ("ECG" or "PPG"). Default is "ECG".
180
        preprocess_config : PreprocessConfig, optional
181
            The configuration object for signal preprocessing. If None, default settings are used.
182

183
        Returns
184
        -------
185
        features : dict
186
            A dictionary containing the extracted features, such as durations, areas, amplitude variability,
187
            slopes, skewness, peak trends, and dicrotic notch locations.
188

189
        Examples
190
        --------
191
        >>> signal = np.random.randn(1000)
192
        >>> preprocess_config = PreprocessConfig()
193
        >>> extractor = PhysiologicalFeatureExtractor(signal, fs=1000)
194
        >>> features = extractor.extract_features(signal_type="ECG", preprocess_config=preprocess_config)
195
        >>> print(features)
196
        """
197
        if preprocess_config is None:
1!
UNCOV
198
            preprocess_config = PreprocessConfig()
×
199

200
        # Preprocess the signal
201
        clean_signal = self.get_preprocess_signal(preprocess_config)
1✔
202

203
        # Baseline correction
204
        clean_signal = clean_signal - np.min(clean_signal)
1✔
205

206
        # Initialize the morphology class
207
        morphology = WaveformMorphology(
1✔
208
            clean_signal, fs=self.fs, signal_type=signal_type
209
        )
210

211
        # Initialize features as an empty dictionary before the try block
212
        features = {}
1✔
213

214
        try:
1✔
215
            if signal_type == "PPG":
1✔
216
                features = {
1✔
217
                    "systolic_duration": np.nan,
218
                    "diastolic_duration": np.nan,
219
                    "systolic_area": np.nan,
220
                    "diastolic_area": np.nan,
221
                    "systolic_slope": np.nan,
222
                    "diastolic_slope": np.nan,
223
                    "signal_skewness": np.nan,
224
                    "peak_trend_slope": np.nan,
225
                    "systolic_amplitude_variability": np.nan,
226
                    "diastolic_amplitude_variability": np.nan,
227
                }
228
                # Detect peaks and troughs in the PPG signal
229
                peaks = PeakDetection(
1✔
230
                    clean_signal, method="ppg_first_derivative"
231
                ).detect_peaks()
232
                waveform = WaveformMorphology(
1✔
233
                    clean_signal, fs=self.fs, signal_type="PPG"
234
                )
235
                troughs = waveform.detect_troughs(systolic_peaks=peaks)
1✔
236

237
                # Ensure peaks and troughs are numpy arrays
238
                peaks = np.array(peaks, dtype=int)
1✔
239
                troughs = np.array(troughs, dtype=int)
1✔
240

241
                # Adjust lengths and alignments
242
                # Since troughs are between peaks, len(troughs) = len(peaks) - 1
243
                # For computations, we need to align the indices correctly
244
                if peaks[0] < troughs[0]:
1!
245
                    # The first peak comes before the first trough, remove the first peak
246
                    peaks = peaks[1:]
1✔
247
                if len(troughs) > len(peaks):
1!
UNCOV
248
                    troughs = troughs[: len(peaks)]
×
249
                else:
250
                    peaks = peaks[: len(troughs)]
1✔
251

252
                # Compute systolic area (from trough to peak)
253
                systolic_areas = [
1✔
254
                    np.trapz(clean_signal[troughs[i] : peaks[i]])
255
                    for i in range(len(peaks))
256
                    if troughs[i] < peaks[i]
257
                ]
258
                systolic_area = np.mean(systolic_areas) if systolic_areas else 0.0
1✔
259

260
                # Compute diastolic area (from peak to next trough)
261
                diastolic_areas = []
1✔
262
                for i in range(len(peaks) - 1):
1!
UNCOV
263
                    start = peaks[i]
×
UNCOV
264
                    end = troughs[i + 1]
×
UNCOV
265
                    if start < end:
×
UNCOV
266
                        area = np.trapz(clean_signal[start:end])
×
UNCOV
267
                        diastolic_areas.append(area)
×
268
                diastolic_area = np.mean(diastolic_areas) if diastolic_areas else 0.0
1✔
269

270
                # Compute systolic duration (time from trough to peak)
271
                systolic_durations = [
1✔
272
                    (peaks[i] - troughs[i]) / self.fs
273
                    for i in range(len(peaks))
274
                    if peaks[i] > troughs[i]
275
                ]
276
                systolic_duration = (
1✔
277
                    np.mean(systolic_durations) if systolic_durations else 0.0
278
                )
279

280
                # Compute diastolic duration (time from peak to next trough)
281
                diastolic_durations = [
1✔
282
                    (troughs[i + 1] - peaks[i]) / self.fs
283
                    for i in range(len(peaks) - 1)
284
                    if troughs[i + 1] > peaks[i]
285
                ]
286
                diastolic_duration = (
1✔
287
                    np.mean(diastolic_durations) if diastolic_durations else 0.0
288
                )
289

290
                # Compute systolic slope (rate of increase from trough to peak)
291
                systolic_slopes = [
1✔
292
                    (clean_signal[peaks[i]] - clean_signal[troughs[i]])
293
                    / (peaks[i] - troughs[i])
294
                    for i in range(len(peaks))
295
                    if peaks[i] > troughs[i]
296
                ]
297
                systolic_slope = np.mean(systolic_slopes) if systolic_slopes else 0.0
1✔
298

299
                # Compute diastolic slope (rate of decrease from peak to next trough)
300
                diastolic_slopes = [
1✔
301
                    (clean_signal[troughs[i + 1]] - clean_signal[peaks[i]])
302
                    / (troughs[i + 1] - peaks[i])
303
                    for i in range(len(peaks) - 1)
304
                    if troughs[i + 1] > peaks[i]
305
                ]
306
                diastolic_slope = np.mean(diastolic_slopes) if diastolic_slopes else 0.0
1✔
307

308
                # Compute other features
309
                # Dicrotic notch detection can be challenging; if not reliable, it can be omitted or handled separately
310
                # For simplicity, we'll omit dicrotic notch features here
311

312
                systolic_variability = self.compute_amplitude_variability(peaks)
1✔
313
                diastolic_variability = self.compute_amplitude_variability(troughs)
1✔
314
                signal_skewness = morphology.compute_skewness()
1✔
315
                peak_trend_slope = self.compute_peak_trend(peaks)
1✔
316

317
                features = {
1✔
318
                    "systolic_duration": systolic_duration,
319
                    "diastolic_duration": diastolic_duration,
320
                    "systolic_area": systolic_area,
321
                    "diastolic_area": diastolic_area,
322
                    "systolic_slope": systolic_slope,
323
                    "diastolic_slope": diastolic_slope,
324
                    "signal_skewness": signal_skewness,
325
                    "peak_trend_slope": peak_trend_slope,
326
                    "systolic_amplitude_variability": systolic_variability,
327
                    "diastolic_amplitude_variability": diastolic_variability,
328
                }
329

330
            elif signal_type == "ECG":
1✔
331
                features = {
1✔
332
                    "qrs_duration": np.nan,
333
                    "qrs_area": np.nan,
334
                    "qrs_amplitude": np.nan,
335
                    "qrs_slope": np.nan,
336
                    "t_wave_area": np.nan,
337
                    "heart_rate": np.nan,
338
                    "r_peak_amplitude_variability": np.nan,
339
                    "signal_skewness": np.nan,
340
                    "peak_trend_slope": np.nan,
341
                }
342
                # Detect R-peaks in the ECG signal
343
                peak_detector = PeakDetection(clean_signal, method="ecg_r_peak")
1✔
344
                r_peaks = peak_detector.detect_peaks()
1✔
345
                r_peaks = np.array(r_peaks, dtype=int)
1✔
346

347
                # Detect Q and S points around R peaks
348
                q_points = []
1✔
349
                s_points = []
1✔
350
                for r_peak in r_peaks:
1✔
351
                    # Q point detection (40 ms before R peak)
352
                    q_start = max(0, r_peak - int(self.fs * 0.04))
1✔
353
                    q_end = r_peak
1✔
354
                    q_segment = clean_signal[q_start:q_end]
1✔
355
                    if len(q_segment) > 0:
1!
356
                        q_point = np.argmin(q_segment) + q_start
1✔
357
                        q_points.append(q_point)
1✔
358
                    else:
UNCOV
359
                        q_points.append(q_start)
×
360

361
                    # S point detection (40 ms after R peak)
362
                    s_start = r_peak
1✔
363
                    s_end = min(len(clean_signal), r_peak + int(self.fs * 0.04))
1✔
364
                    s_segment = clean_signal[s_start:s_end]
1✔
365
                    if len(s_segment) > 0:
1!
366
                        s_point = np.argmin(s_segment) + s_start
1✔
367
                        s_points.append(s_point)
1✔
368
                    else:
UNCOV
369
                        s_points.append(s_end)
×
370

371
                # Compute QRS durations
372
                # qrs_durations = [
373
                #     (s_points[i] - q_points[i]) / self.fs
374
                #     for i in range(len(r_peaks))
375
                #     if s_points[i] > q_points[i]
376
                # ]
377
                qrs_durations = morphology.compute_qrs_duration()
1✔
378

379
                qrs_duration = np.mean(qrs_durations) if qrs_durations else 0.0
1✔
380

381
                # Compute QRS areas
382
                qrs_areas = [
1✔
383
                    np.trapz(clean_signal[q_points[i] : s_points[i]])
384
                    for i in range(len(r_peaks))
385
                    if s_points[i] > q_points[i]
386
                ]
387
                qrs_area = np.mean(qrs_areas) if qrs_areas else 0.0
1✔
388

389
                # Compute QRS amplitude
390
                qrs_amplitudes = clean_signal[r_peaks]
1✔
391
                qrs_amplitude = (
1✔
392
                    np.mean(qrs_amplitudes) if len(qrs_amplitudes) > 0 else 0.0
393
                )
394

395
                # Compute QRS slopes
396
                qrs_slopes = [
1✔
397
                    (clean_signal[r_peaks[i]] - clean_signal[q_points[i]])
398
                    / (r_peaks[i] - q_points[i])
399
                    for i in range(len(r_peaks))
400
                    if r_peaks[i] > q_points[i] and (r_peaks[i] - q_points[i]) != 0
401
                ]
402
                qrs_slope = np.mean(qrs_slopes) if qrs_slopes else 0.0
1✔
403

404
                # Compute RR intervals and heart rate
405
                rr_intervals = np.diff(r_peaks) / self.fs  # in seconds
1✔
406
                average_rr_interval = (
1✔
407
                    np.mean(rr_intervals) if len(rr_intervals) > 0 else 0.0
408
                )
409
                heart_rate = (
1✔
410
                    60 / average_rr_interval if average_rr_interval > 0 else 0.0
411
                )
412

413
                # Compute amplitude variability
414
                amplitude_variability = (
1✔
415
                    np.std(qrs_amplitudes) / np.mean(qrs_amplitudes)
416
                    if np.mean(qrs_amplitudes) != 0
417
                    else 0.0
418
                )
419

420
                # Compute T-wave areas
421
                t_wave_areas = []
1✔
422
                for i in range(len(r_peaks)):
1✔
423
                    s_point = s_points[i]
1✔
424
                    t_start = s_point
1✔
425
                    t_end = min(
1✔
426
                        len(clean_signal), s_point + int(self.fs * 0.3)
427
                    )  # 300 ms after S point
428

429
                    # Ensure valid segment
430
                    if t_start < t_end and len(clean_signal[t_start:t_end]) > 0:
1!
431
                        t_segment = clean_signal[t_start:t_end]
1✔
432
                        if len(t_segment) > 0:
1!
433
                            # Compute area under the T-wave segment
434
                            area = np.trapz(t_segment)
1✔
435
                            t_wave_areas.append(area)
1✔
436

437
                t_wave_area = np.mean(t_wave_areas) if t_wave_areas else 0.0
1✔
438

439
                # Compute signal skewness
440
                signal_skewness = morphology.compute_skewness()
1✔
441

442
                # Compute peak trend
443
                peak_trend_slope = self.compute_peak_trend(r_peaks)
1✔
444

445
                features = {
1✔
446
                    "qrs_duration": qrs_duration,
447
                    "qrs_area": qrs_area,
448
                    "qrs_amplitude": qrs_amplitude,
449
                    "qrs_slope": qrs_slope,
450
                    "t_wave_area": t_wave_area,
451
                    "heart_rate": heart_rate,
452
                    "r_peak_amplitude_variability": amplitude_variability,
453
                    "signal_skewness": signal_skewness,
454
                    "peak_trend_slope": peak_trend_slope,
455
                }
456

457
            else:
458
                raise ValueError(f"Unsupported signal type: {signal_type}")
1✔
459

460
        except Exception as e:
1✔
461
            print(f"Error during feature extraction: {e}")
1✔
462
            features = {
1✔
463
                key: np.nan for key in features
464
            }  # Set all features to np.nan in case of error
465

466
        return features
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc