• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sequana / sequana / 16568275773

28 Jul 2025 11:51AM UTC coverage: 69.115% (-0.6%) from 69.722%
16568275773

push

github

web-flow
Merge pull request #870 from cokelaer/dev

add threshold parameter in G4 module

1 of 2 new or added lines in 1 file covered. (50.0%)

725 existing lines in 17 files now uncovered.

14389 of 20819 relevant lines covered (69.11%)

2.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.43
/sequana/sequence.py
1
#  This file is part of Sequana software
2
#
3
#  Copyright (c) 2018-2022 - Sequana Development Team
4
#
5
#  Distributed under the terms of the 3-clause BSD license.
6
#  The full license is in the LICENSE file, distributed with this software.
7
#
8
#  website: https://github.com/sequana/sequana
9
#  documentation: http://sequana.readthedocs.io
10
#
11
##############################################################################
12
import re
3✔
13
import string
3✔
14
import subprocess
3✔
15
from collections import Counter, deque
3✔
16

17
import colorlog
3✔
18

19
from sequana.fasta import FastA
3✔
20
from sequana.lazy import numpy as np
3✔
21
from sequana.lazy import pandas as pd
3✔
22
from sequana.lazy import pylab
3✔
23

24
logger = colorlog.getLogger(__name__)
3✔
25

26

27
__all__ = ["DNA", "RNA", "Repeats", "Sequence"]
3✔
28

29

30
from sequana.iuapc import codons  # your existing dictionary
3✔
31

32

33
def translate(dna: str, to_stop=True) -> str:
3✔
34
    """Translate a DNA sequence to a protein using sequana.iupac.codons.
35

36
    Args:
37
        dna (str): DNA sequence (A/T/C/G)
38
        to_stop (bool): If True, stop translation at the first stop codon
39

40
    Returns:
41
        str: Translated protein sequence
42
    """
UNCOV
43
    dna = dna.upper().replace(" ", "").replace("\n", "")
×
UNCOV
44
    rna = dna.replace("T", "U")
×
UNCOV
45
    protein = []
×
46

UNCOV
47
    for i in range(0, len(rna) - 2, 3):
×
UNCOV
48
        codon = rna[i : i + 3]
×
UNCOV
49
        if len(codon) < 3:
×
UNCOV
50
            break
×
UNCOV
51
        aa = codons.get(codon, "X")  # 'X' for unknown codons
×
UNCOV
52
        if aa == "*" and to_stop:
×
UNCOV
53
            break
×
UNCOV
54
        protein.append(aa)
×
UNCOV
55
    return "".join(protein)
×
56

57

58
class Sequence(object):
3✔
59
    """Abstract base classe for other specialised sequences such as DNA.
60

61

62
    Sequenced is the base class for other classes such as :class:`DNA` and
63
    :class:`RNA`.
64

65
    .. doctest::
66

67
        from sequana import Sequence
68
        s = Sequence("ACGT")
69
        s.stats()
70
        s.get_complement()
71

72
    .. note:: You may use a Fasta file as input (see constructor)
73

74

75
    """
76

77
    def __init__(self, sequence, complement_in=b"ACGT", complement_out=b"TGCA", letters="ACGT"):
3✔
78
        """.. rubric:: Constructor
79

80
        A sequence is just a string stored in the :attr:`sequence` attribute. It
81
        has properties related to the type of alphabet authorised.
82

83
        :param str sequence: May be a string of a Fasta File, in which case only
84
            the first sequence is used.
85
        :param complement_in:
86
        :param complement_out:
87
        :param letters: authorise letters. Used in :meth:`check` only.
88

89
        .. todo:: use counter only once as a property
90

91
        """
92
        if sequence.endswith(".fa") or sequence.endswith(".fasta"):
3✔
93
            self.fasta = FastA(sequence)
3✔
94
            sequence = self.fasta.next().sequence.upper()
3✔
95
        else:  # assume correct string sequence
96
            pass
3✔
97

98
        self._data = sequence
3✔
99
        try:
3✔
100
            self._translate = string.maketrans(complement_in, complement_out)
3✔
101
        except:
3✔
102
            self._translate = bytes.maketrans(complement_in, complement_out)
3✔
103
        self._letters = letters
3✔
104

105
    def __iter__(self):
3✔
106
        return self
3✔
107

108
    def __next__(self):
3✔
109
        self._data = self.fasta.next().sequence.upper()
3✔
UNCOV
110
        return self._data
×
111

112
    def _get_sequence(self):
3✔
113
        return self._data
3✔
114

115
    sequence = property(_get_sequence)
3✔
116

117
    def get_complement(self):
3✔
118
        """Return complement"""
119
        return self._data.translate(self._translate)
3✔
120

121
    def get_reverse_complement(self):
3✔
122
        """Return reverse complement"""
123
        return self.get_complement()[::-1]
3✔
124

125
    def get_reverse(self):
3✔
126
        """Return reverse sequence"""
127
        return self._data[::-1]
3✔
128

129
    def complement(self):
3✔
130
        """Alias to :meth:`get_complement`"""
131
        self._data = self.get_complement()
3✔
132

133
    def reverse(self):
3✔
134
        """Alias to :meth:`get_reverse`"""
135
        self._data = self.get_reverse()
3✔
136

137
    def reverse_complement(self):
3✔
138
        """Alias to get_reverse_complement"""
139
        self._data = self.get_reverse_complement()
3✔
140

141
    def check(self):
3✔
142
        """Check that all letters are valid"""
143
        counter = Counter(self._data).keys()
3✔
144
        for key in counter:
3✔
145
            if key not in self._letters:
3✔
146
                raise ValueError("Found unexpected letter in the sequence (%s)" % key)
3✔
147

148
    def __len__(self):
3✔
149
        return len(self._data)
3✔
150

151
    def gc_content(self):
3✔
152
        """Return mean GC content"""
153
        c = Counter(self._data)
3✔
154
        ratio = (c["G"] + c["C"]) / len(self.sequence)
3✔
155
        return ratio
3✔
156

157
    def stats(self):
3✔
158
        """Return basic stats about the number of letters"""
159
        from collections import Counter
3✔
160

161
        return Counter(self.sequence)
3✔
162

163
    def get_statistics(self):
3✔
UNCOV
164
        from collections import Counter
×
165

UNCOV
166
        counter = Counter(self.sequence.upper())
×
UNCOV
167
        N = sum(list(counter.values()))
×
UNCOV
168
        counter["GC"] = counter["G"] + counter["C"]
×
UNCOV
169
        names = ["A", "C", "G", "T", "GC", "N"]
×
UNCOV
170
        basic_stats = [counter.get(x, 0) for x in ["A", "C", "G", "T", "GC", "N"]] + [N]
×
UNCOV
171
        basic_stats_pct = [float(x) / N * 100 for x in basic_stats]
×
UNCOV
172
        basic_stats = pd.DataFrame(
×
173
            {"count": basic_stats, "percentage": basic_stats_pct}, index=["A", "C", "G", "T", "GC", "N", "all"]
174
        )
UNCOV
175
        return {"single": basic_stats}
×
176

177
    def get_occurences(self, pattern, overlap=False):
3✔
178
        """Return position of the input pattern in the sequence
179

180
        ::
181

182
            >>> from sequana import Sequence
183
            >>> s = Sequence('ACGTTTTACGT')
184
            >>> s.get_occurences("ACGT")
185
            [0, 7]
186

187
        """
188
        if overlap is False:
3✔
189
            res = [m.start() for m in re.finditer(pattern, self.sequence)]
3✔
190
        elif overlap is True:
3✔
191
            res = [m.start() for m in re.finditer("(?=%s)" % pattern, self.sequence)]
3✔
192
        return res
3✔
193

194
        # reverse find-all without overlaps, you can combine positive and
195
        # negative lookahead into an expression like this:
196
        # res = [m.start() for m in re.finditer('(?=%s)(?!.{1,%d}%s)' % (search,
197
        #    len(pattern)-1, pattern), 'ttt')]
198

199

200
class DNA(Sequence):
3✔
201
    """Simple DNA class
202

203
    ::
204

205
        >>> from sequana.sequence import DNA
206
        >>> d = DNA("ACGTTTT")
207
        >>> d.reverse_complement()
208

209
    Some long computations are done when setting the window size::
210

211
        d.window = 100
212

213
    The ORF detection has been validated agains a plasmodium 3D7 ORF file
214
    found on plasmodb.org across the 14 chromosomes.
215

216
    """
217

218
    def __init__(
3✔
219
        self,
220
        sequence,
221
        codons_stop=["TAA", "TGA", "TAG"],
222
        codons_stop_rev=["TTA", "TCA", "CTA"],
223
        codons_start=["ATG"],
224
        codons_start_rev=["CAT"],
225
    ):
226
        super(DNA, self).__init__(sequence, complement_in=b"ACGT", complement_out=b"TGCA", letters="ACGTN")
3✔
227

228
        self._window = None
3✔
229
        self._type_window = None
3✔
230
        self._slide_window = None
3✔
231
        self._seq_right = None
3✔
232
        self._dict_nuc = dict_nuc = {"A": 0, "T": 1, "G": 2, "C": 3}
3✔
233
        self._cumul = None
3✔
234
        self._Xn = None
3✔
235
        self._Yn = None
3✔
236
        self._Zn = None
3✔
237
        self._ignored_nuc = None
3✔
238
        self._AT_skew_slide = None
3✔
239
        self._GC_skew_slide = None
3✔
240
        self._GC_content_slide = None
3✔
241
        self._AT_content_slide = None
3✔
242
        self._template_fft = None
3✔
243
        self._c_fft = None
3✔
244
        self._codons_stop = codons_stop
3✔
245
        self._codons_stop_rev = codons_stop_rev
3✔
246
        self._codons_start = codons_start
3✔
247
        self._codons_start_rev = codons_start_rev
3✔
248
        self._ORF_pos = None
3✔
249
        self._threshold = None
3✔
250
        self._type_filter = None
3✔
251

252
    def _get_window(self):
3✔
253
        return self._window
3✔
254

255
    def _set_window(self, window):
3✔
256
        if (window > 0) & (window < 1):
3✔
257
            self._type_window = "adapted to genome length : %.1f %% of total length" % (window * 100)
3✔
258
            self._window = int(round(self.__len__() * window))
3✔
259

260
        elif (window >= 1) & (window <= self.__len__()):
3✔
261
            self._type_window = "fixed window length : %d" % (window)
3✔
262
            self._window = int(window)
3✔
263
        else:
264
            raise ValueError(
3✔
265
                "Incorrect value for window: choose either float ]0,1]"
266
                + " (fraction of genome) or integer [1,genome_length] (window size)"
267
            )
268

269
        logger.info("Computing GC skew")
3✔
270
        self._compute_skews()
3✔
271

272
    window = property(_get_window, _set_window)
3✔
273

274
    def _get_type_window(self):
3✔
275
        return self._type_window
3✔
276

277
    type_window = property(_get_type_window)
3✔
278

279
    def _init_sliding_window(self):
3✔
280
        """slide_window : deque of n first nucleotides (size of window)
281
        seq_right : deque of the rest of the genome, with the n-1 nucleotides at the end (circular DNA)
282
        """
283
        # split sequence_reference in two : sliding window and seq_right
284
        # for circular genome : add begining at the end of genome
285
        self._slide_window = deque(self.sequence[0 : self._window])
3✔
286
        self._seq_right = deque(self.sequence[self._window : self.__len__()] + self.sequence[0 : (self._window - 1)])
3✔
287

288
    def _init_list_results(self):
3✔
289
        # init IJ content and IJ skew
290
        IJ_content_res = np.empty((1, self.__len__()))
3✔
291
        IJ_content_res[:] = np.nan
3✔
292
        IJ_skew_res = np.empty((1, self.__len__()))
3✔
293
        IJ_skew_res[:] = np.nan
3✔
294
        return IJ_content_res, IJ_skew_res
3✔
295

296
    def _init_cumul_nuc(self):
3✔
297
        """Cumulative of nucleotide count along genome (init from first window)"""
298
        # ATGC (index stored in self._dict_nuc)
299
        cumul = np.zeros((4, (self.__len__() + self._window)))
3✔
300
        for j in range(self._window):
3✔
301
            nuc = self.sequence[j]
3✔
302
            if nuc in self._dict_nuc:
3✔
303
                cumul[self._dict_nuc[nuc]][j] += 1
3✔
304
        self._cumul = cumul
3✔
305

306
    def _compute_skews(self):
3✔
307
        ### initialisation =  Calculating GC skew and AT skew for first window
308
        self._init_sliding_window()
3✔
309
        GC_content_slide, GC_skew_slide = self._init_list_results()
3✔
310
        AT_content_slide, AT_skew_slide = self._init_list_results()
3✔
311
        karlin_content_slide, karlin_skew_slide = self._init_list_results()
3✔
312

313
        self._init_cumul_nuc()
3✔
314

315
        c = Counter(self._slide_window)
3✔
316
        dict_counts = {"G": c["G"], "C": c["C"], "A": c["A"], "T": c["T"]}
3✔
317
        i = 0
3✔
318

319
        # GC
320
        sumGC = float(dict_counts["G"] + dict_counts["C"])
3✔
321
        GC_content_slide[0][i] = sumGC
3✔
322
        if sumGC > 0:
3✔
323
            GC_skew_slide[0][i] = (dict_counts["G"] - dict_counts["C"]) / sumGC
3✔
324
        # AT
325
        sumAT = float(dict_counts["A"] + dict_counts["T"])
3✔
326
        AT_content_slide[0][i] = sumAT
3✔
327
        if sumAT > 0:
3✔
328
            AT_skew_slide[0][i] = (dict_counts["A"] - dict_counts["T"]) / sumAT
3✔
329

330
        # karlin difference measures excess of purines over pyrimidines as G - C + A - T
331
        sumAG = float(dict_counts["A"] + dict_counts["G"])
3✔
332
        sumCT = float(dict_counts["C"] + dict_counts["T"])
3✔
333
        karlin_content_slide[0][i] = sumAG - sumCT
3✔
334

335
        ### Compute for all genome
336
        while self._seq_right:
3✔
337
            out_nuc = self._slide_window.popleft()
3✔
338
            in_nuc = self._seq_right.popleft()
3✔
339
            self._slide_window.append(in_nuc)
3✔
340

341
            i += 1
3✔
342

343
            if i % 500000 == 0:  # pragma: no cover
344
                logger.info("%d / %d" % (i, self.__len__()))
345
            # if in and out are the same : do nothing, append same result
346
            if out_nuc != in_nuc:
3✔
347
                # remove out from counters
348
                if out_nuc in self._dict_nuc:
3✔
349
                    dict_counts[out_nuc] -= 1
3✔
350
                if in_nuc in self._dict_nuc:
3✔
351
                    dict_counts[in_nuc] += 1
3✔
352
                sumGC = float(dict_counts["G"] + dict_counts["C"])
3✔
353
                sumAT = float(dict_counts["A"] + dict_counts["T"])
3✔
354
                sumAG = float(dict_counts["A"] + dict_counts["G"])
3✔
355
                sumCT = float(dict_counts["C"] + dict_counts["T"])
3✔
356

357
            # fill results
358
            # GC
359
            GC_content_slide[0][i] = sumGC
3✔
360
            if sumGC > 0:
3✔
361
                GC_skew_slide[0][i] = (dict_counts["G"] - dict_counts["C"]) / sumGC
3✔
362
            # AT
363
            AT_content_slide[0][i] = sumAT
3✔
364
            if sumAT > 0:
3✔
365
                AT_skew_slide[0][i] = (dict_counts["A"] - dict_counts["T"]) / sumAT
3✔
366
            # cumul
367
            if in_nuc in self._dict_nuc:
3✔
368
                self._cumul[self._dict_nuc[in_nuc]][i + self._window - 1] += 1
3✔
369

370
            karlin_content_slide[0][i] = sumAG - sumCT
3✔
371

372
        self._GC_content_slide = GC_content_slide / float(self._window)
3✔
373
        self._AT_content_slide = AT_content_slide / float(self._window)
3✔
374
        self._karlin_content_slide = karlin_content_slide / float(self._window)
3✔
375
        self._cumul = np.delete(self._cumul, range(self.__len__(), self._cumul.shape[1]), 1)
3✔
376
        self._cumul = np.cumsum(self._cumul, axis=1)
3✔
377

378
        ### save result for Z curve
379
        self._Xn = list(
3✔
380
            (self._cumul[self._dict_nuc["A"]] + self._cumul[self._dict_nuc["G"]])
381
            - (self._cumul[self._dict_nuc["C"]] + self._cumul[self._dict_nuc["T"]])
382
        )
383

384
        self._Yn = list(
3✔
385
            (self._cumul[self._dict_nuc["A"]] + self._cumul[self._dict_nuc["C"]])
386
            - (self._cumul[self._dict_nuc["G"]] + self._cumul[self._dict_nuc["T"]])
387
        )
388

389
        self._Zn = list(
3✔
390
            (self._cumul[self._dict_nuc["A"]] + self._cumul[self._dict_nuc["T"]])
391
            - (self._cumul[self._dict_nuc["C"]] + self._cumul[self._dict_nuc["G"]])
392
        )
393

394
        self._AT_skew_slide = AT_skew_slide
3✔
395
        self._GC_skew_slide = GC_skew_slide
3✔
396

397
        ### check proportion of ignored nucleotides
398
        GC_content_total = (self._cumul[self._dict_nuc["G"]][-1] + self._cumul[self._dict_nuc["C"]][-1]) / float(
3✔
399
            self.__len__()
400
        )
401
        AT_content_total = (self._cumul[self._dict_nuc["A"]][-1] + self._cumul[self._dict_nuc["T"]][-1]) / float(
3✔
402
            self.__len__()
403
        )
404
        self._ignored_nuc = 1.0 - GC_content_total - AT_content_total
3✔
405

406
    def _get_AT_skew(self):
3✔
407
        if self._AT_skew_slide is None:  # pragma: no cover
408
            raise AttributeError("Please set a valid window to compute skew")
409
        else:
410
            return self._AT_skew_slide
3✔
411

412
    AT_skew = property(_get_AT_skew)
3✔
413

414
    def _get_GC_skew(self):
3✔
415
        if self._GC_skew_slide is None:  # pragma: no cover
416
            raise AttributeError("Please set a valid window to compute skew")
417
        else:
418
            return self._GC_skew_slide
3✔
419

420
    GC_skew = property(_get_GC_skew)
3✔
421

422
    def _create_template_fft(self, M=1000):  # pragma: no cover
423
        M_3 = int(M / 3)
424
        W = [-0.5] * M_3 + list(np.linspace(-0.5, 0.5, M - 2 * M_3)) + [0.5] * M_3
425
        return list(W * np.hanning(M))
426

427
    def _myfft_gc_skew(self, M):  # pragma: no cover
428
        """
429
        x : GC_skew vector (list)
430
        param N: length of the GC skew vector
431
        param M: length of the template
432
        param A: amplitude between positive and negative GC skew vector
433

434
        """
435
        x = self._GC_skew_slide[0]
436

437
        N = len(x)
438
        template = self._create_template_fft(M) + [0] * (N - M)
439
        template /= pylab.norm(template)
440
        c = np.fft.fft(x)
441
        c = (
442
            abs(np.fft.ifft(np.fft.fft(x) * pylab.conj(np.fft.fft(template))) ** 2)
443
            / pylab.norm(x)
444
            / pylab.norm(template)
445
        )
446
        # shift the SNR vector by the template length so that the peak is at the END of the template
447
        c = np.roll(c, M // 2)
448
        self._template_fft = template
449
        self._c_fft = c * 2.0 / N
450

451
    def plot_all_skews(self, figsize=(10, 12), fontsize=16, alpha=0.5):
3✔
452
        if self._window is None:  # pragma: no cover
453
            raise AttributeError("Please set a valid window to compute skew")
454

455
        # create figure
456
        fig, axarr = pylab.subplots(9, 1, sharex=True, figsize=figsize)
3✔
457

458
        main_title = (
3✔
459
            "Window size = %d (%.0f %% of genome )\n\
460
        GC content = %.0f %%, AT content = %.0f %%, ignored = %.0f %%"
461
            % (
462
                self._window,
463
                self._window * 100 / self.__len__(),
464
                self.gc_content() * 100,
465
                (1 - self.gc_content()) * 100,
466
                self._ignored_nuc * 100,
467
            )
468
        )
469

470
        pylab.suptitle(main_title, fontsize=fontsize)
3✔
471

472
        # GC skew
473
        axarr[0].set_title("GC skew (blue) - Cumulative sum (red)")
3✔
474
        axarr[0].plot(list(self._GC_skew_slide[0]), "b-", alpha=alpha)
3✔
475
        axarr[0].set_ylabel("(G -C) / (G + C)")
3✔
476

477
        axarr[1].plot(list(np.cumsum(self._GC_skew_slide[0])), "r-", alpha=alpha)
3✔
478
        axarr[1].set_ylabel("(G -C) / (G + C)")
3✔
479

480
        # AT skew
481
        axarr[2].set_title("AT skew (blue) - Cumulative sum (red)")
3✔
482
        axarr[2].plot(list(self._AT_skew_slide[0]), "b-", alpha=alpha)
3✔
483
        axarr[2].set_ylabel("(A -T) / (A + T)")
3✔
484

485
        axarr[3].plot(list(np.cumsum(self._AT_skew_slide[0])), "r-", alpha=alpha)
3✔
486
        axarr[3].set_ylabel("(A -T) / (A + T)", rotation=0)
3✔
487

488
        # Xn
489
        axarr[4].set_title("Cumulative RY skew (Purine - Pyrimidine)")
3✔
490
        axarr[4].plot(self._Xn, "g-", alpha=alpha)
3✔
491
        axarr[4].set_ylabel("(A + G) - (C + T)")
3✔
492

493
        # Yn
494
        axarr[5].set_title("Cumulative MK skew (Amino - Keto)")
3✔
495
        axarr[5].plot(self._Yn, "g-", alpha=alpha)
3✔
496
        axarr[5].set_ylabel("(A + C) - (G + T)")
3✔
497

498
        # Zn
499
        axarr[6].set_title("Cumulative H-bond skew (Weak H-bond - Strong H-bond)")
3✔
500
        axarr[6].plot(self._Zn, "g-", alpha=alpha)
3✔
501
        axarr[6].set_ylabel("(A + T) - (G + C)")
3✔
502

503
        # GC content
504
        axarr[7].set_title("GC content")
3✔
505
        axarr[7].plot(list(self._GC_content_slide[0]), "k-", alpha=alpha)
3✔
506
        axarr[7].set_ylabel("GC")
3✔
507

508
        # AT content
509
        axarr[8].set_title("AT content")
3✔
510
        axarr[8].plot(list(self._AT_content_slide[0]), "k-", alpha=alpha)
3✔
511
        axarr[8].set_ylabel("AT")
3✔
512

513
        # # FFT
514
        # axarr[9].set_title("FFT")
515
        # axarr[9].plot(list(self._c_fft),'g-',alpha=alpha)
516
        # axarr[9].set_ylabel("FFT")
517

518
        fig.tight_layout()
3✔
519
        fig.subplots_adjust(top=0.88)
3✔
520

521
    def _update_ORF_frame(self, i, nuc, j, frame, d_vars):
3✔
522
        d_vars["codon"][j] = d_vars["codon"][j] + nuc
3✔
523

524
        if len(d_vars["codon"][j]) == 3:
3✔
525
            # test for start forward (if none already found)
526
            is_start = d_vars["codon"][j] in self._codons_start
3✔
527
            if is_start & np.isnan(d_vars["pos_ATG_f"][j]):
3✔
528
                d_vars["pos_ATG_f"][j] = i - 2
3✔
529
            # test for stop forward
530
            is_stop = d_vars["codon"][j] in self._codons_stop
3✔
531
            if is_stop:
3✔
532
                d_vars["end_f"][j] = i
3✔
533
                # test is length of CDS or ORF found is enough to be in output
534
                if self._type_filter == "CDS":
3✔
UNCOV
535
                    len_to_filter = d_vars["end_f"][j] - d_vars["pos_ATG_f"][j]  # len_CDS
×
536
                else:
537
                    len_to_filter = d_vars["end_f"][j] - d_vars["begin_f"][j]  # len_ORF
3✔
538

539
                if len_to_filter > self._threshold:
3✔
540
                    len_ORF = d_vars["end_f"][j] - d_vars["begin_f"][j]
3✔
541
                    len_CDS = d_vars["end_f"][j] - d_vars["pos_ATG_f"][j]
3✔
542
                    self._ORF_pos.append(
3✔
543
                        [
544
                            d_vars["begin_f"][j],
545
                            d_vars["end_f"][j],
546
                            frame + 1,
547
                            d_vars["pos_ATG_f"][j],
548
                            len_ORF,
549
                            len_CDS,
550
                        ]
551
                    )
552
                d_vars["begin_f"][j] = i + 1
3✔
553
                d_vars["pos_ATG_f"][j] = np.nan
3✔
554

555
            # test for start reverse
556
            is_start_rev = d_vars["codon"][j] in self._codons_start_rev
3✔
557
            if is_start_rev:
3✔
558
                d_vars["pos_ATG_r"][j] = i
3✔
559
            # test for stop reverse
560
            is_stop_rev = d_vars["codon"][j] in self._codons_stop_rev
3✔
561
            if is_stop_rev:
3✔
562
                d_vars["end_r"][j] = i - 3
3✔
563
                # test is length of CDS or ORF found is enough to be in output
564
                if self._type_filter == "CDS":
3✔
UNCOV
565
                    len_to_filter = d_vars["pos_ATG_r"][j] - d_vars["begin_r"][j]  # len_CDS
×
566
                else:
567
                    len_to_filter = d_vars["end_r"][j] - d_vars["begin_r"][j]  # len_ORF
3✔
568

569
                if len_to_filter > self._threshold:
3✔
570
                    len_ORF = d_vars["end_r"][j] - d_vars["begin_r"][j]
3✔
571
                    len_CDS = d_vars["pos_ATG_r"][j] - d_vars["begin_r"][j]
3✔
572
                    self._ORF_pos.append(
3✔
573
                        [
574
                            d_vars["begin_r"][j],
575
                            d_vars["end_r"][j],
576
                            -(frame + 1),
577
                            d_vars["pos_ATG_r"][j],
578
                            len_ORF,
579
                            len_CDS,
580
                        ]
581
                    )
582
                d_vars["begin_r"][j] = i - 3 + 1
3✔
583
                d_vars["pos_ATG_r"][j] = np.nan
3✔
584

585
            # reset codon
586
            d_vars["codon"][j] = ""
3✔
587

588
        return d_vars
3✔
589

590
    def _find_ORF_CDS(self):
3✔
591
        """Function for finding ORF and CDS in both strands of DNA"""
592
        # init variables
593
        d_vars = {
3✔
594
            "codon": ["", "-", "--"],
595
            "begin_f": [0] * 3,
596
            "begin_r": [0] * 3,
597
            "end_f": [0] * 3,
598
            "end_r": [0] * 3,
599
            "pos_ATG_f": [np.nan] * 3,
600
            "pos_ATG_r": [np.nan] * 3,
601
        }
602
        print("Finding ORF and CDS")
3✔
603
        # result list
604
        self._ORF_pos = []
3✔
605

606
        if self._threshold is None:
3✔
607
            self._threshold = 0
3✔
608

609
        if self._type_filter is None:
3✔
610
            self._type_filter = "ORF"
3✔
611

612
        # search ORF and CDS
613
        for i, nuc in enumerate(self.sequence):
3✔
614
            # print(i, nuc)
615
            frame = (i + 3 - 2) % 3
3✔
616
            # if (i % 200000)==0:
617
            #     print("%d / %d" %(i, len_genome))
618
            for j in range(3):
3✔
619
                d_vars = self._update_ORF_frame(i, nuc, j, frame, d_vars)
3✔
620

621
        # convert result to dataframe
622
        self._ORF_pos = pd.DataFrame(self._ORF_pos)
3✔
623
        self._ORF_pos.columns = [
3✔
624
            "begin_pos",
625
            "end_pos",
626
            "frame",
627
            "pos_ATG",
628
            "len_ORF",
629
            "len_CDS",
630
        ]
631

632
    def _get_ORF_pos(self):
3✔
633
        if self._ORF_pos is None:
3✔
634
            self._find_ORF_CDS()
3✔
635
        return self._ORF_pos
3✔
636

637
    ORF_pos = property(_get_ORF_pos)
3✔
638

639
    def _get_threshold(self):
3✔
640
        return self._threshold
3✔
641

642
    def _set_threshold(self, value):
3✔
643
        if value < 0:
3✔
UNCOV
644
            raise ValueError("Threshold cannot be negative")
×
645
        if (value < self._threshold) | (self._threshold is None):
3✔
646
            # need to compute again the result
647
            self._threshold = value
3✔
648
            self._find_ORF_CDS()
3✔
649
        elif value > self._threshold:
3✔
650
            # do not compute result again : filter existing df
651
            self._threshold = value
3✔
652
            if self._type_filter == "ORF":
3✔
UNCOV
653
                self._ORF_pos = self._ORF_pos[self._ORF_pos["len_ORF"] > self._threshold]
×
654
            elif self._type_filter == "CDS":
3✔
655
                self._ORF_pos = self._ORF_pos[self._ORF_pos["len_CDS"] > self._threshold]
3✔
656

657
    threshold = property(_get_threshold, _set_threshold)
3✔
658

659
    def _get_type_filter(self):
3✔
660
        return self._type_filter
3✔
661

662
    def _set_type_filter(self, value):
3✔
663
        if value not in ["ORF", "CDS"]:
3✔
664
            raise ValueError("Please select valid type of filter : ORF (default), CDS")
3✔
665
        if self._ORF_pos is None:
3✔
UNCOV
666
            self._type_filter = value
×
667
        elif self._type_filter != value:
3✔
668
            self._type_filter = value
3✔
669
            if value == "ORF":
3✔
670
                # need to compute again the result
671
                self._find_ORF_CDS()
3✔
672
            else:
673
                # need to filter the result by CDS length
674
                self._ORF_pos = self._ORF_pos[self._ORF_pos["len_CDS"] > self._threshold]
3✔
675

676
    type_filter = property(_get_type_filter, _set_type_filter)
3✔
677

678
    def hist_ORF_CDS_linearscale(self, alpha=0.5, bins=40, xlabel="Length", ylabel="#"):
3✔
679
        if self._ORF_pos is None:
3✔
UNCOV
680
            self._find_ORF_CDS()
×
681

682
        n_ORF = self._ORF_pos["len_ORF"].dropna().shape[0]
3✔
683
        n_CDS = self._ORF_pos["len_CDS"].dropna().shape[0]
3✔
684

685
        # plot for all ORF and CDS
686
        pylab.hist(
3✔
687
            self._ORF_pos["len_ORF"].dropna(),
688
            alpha=alpha,
689
            label="ORF, N = " + str(n_ORF),
690
            bins=bins,
691
        )
692
        pylab.hist(
3✔
693
            self._ORF_pos["len_CDS"].dropna(),
694
            alpha=alpha,
695
            label="CDS, N = " + str(n_CDS),
696
            bins=bins,
697
        )
698
        pylab.xlabel(xlabel)
3✔
699
        pylab.ylabel(ylabel)
3✔
700
        pylab.legend()
3✔
701
        pylab.title("Length of ORF and CDS (after filter %s > %d)" % (self._type_filter, self._threshold))
3✔
702

703
    def hist_ORF_CDS_logscale(self, alpha=0.5, bins=40, xlabel="Length", ylabel="#"):
3✔
704
        self.hist_ORF_CDS_linearscale(alpha, bins, xlabel, ylabel)
3✔
705
        pylab.yscale("log")
3✔
706

707
    def barplot_count_ORF_CDS_by_frame(self, alpha=0.5, bins=40, xlabel="Frame", ylabel="#", bar_width=0.35):
3✔
708
        if self._ORF_pos is None:  # pragma: no cover
709
            self._find_ORF_CDS()
710
        # number of ORF and CDS found by frame
711
        frames = [-3, -2, -1, 1, 2, 3]
3✔
712
        nb_res_ORF = []
3✔
713
        nb_res_CDS = []
3✔
714
        for fr in frames:
3✔
715
            nb_res_ORF.append(self._ORF_pos[self._ORF_pos["frame"] == fr]["len_ORF"].dropna().shape[0])
3✔
716
            nb_res_CDS.append(self._ORF_pos[self._ORF_pos["frame"] == fr]["len_CDS"].dropna().shape[0])
3✔
717

718
        pylab.bar(
3✔
719
            np.array(frames) - (bar_width / 2),
720
            nb_res_ORF,
721
            bar_width,
722
            alpha=alpha,
723
            label="ORF N = %d" % sum(nb_res_ORF),
724
        )
725
        pylab.bar(
3✔
726
            np.array(frames) + (bar_width / 2),
727
            nb_res_CDS,
728
            bar_width,
729
            alpha=alpha,
730
            label="CDS N = %d" % sum(nb_res_CDS),
731
        )
732
        pylab.xlabel(xlabel)
3✔
733
        pylab.ylabel(ylabel)
3✔
734
        pylab.legend(loc=1)
3✔
735
        pylab.title("Number of ORF and CDS by frame")
3✔
736

737
    def entropy(self, sequence):
3✔
738
        # https://www.sciencedirect.com/science/article/pii/S0022519397904938?via%3Dihub
739
        from pylab import log
3✔
740

741
        pi = [sequence.count(l) / float(len(sequence)) for l in "ACGT"]
3✔
742
        pi = [x for x in pi if x != 0]
3✔
743
        return -sum(pi * log(pi))
3✔
744

745
    def get_dna_flexibility(self, window=100, step=1, threshold=13.7):
3✔
746

747
        # flexibility angle
748
        # Source: Drew and Travers (1984), based on DNase I digestion experiments.
749

750
        # if N is present, a minimum value is used ( 7.2 with GC, 7.6 with AT)
751
        flex_angles = {
×
752
            "AA": 7.6,
753
            "CA": 10.9,
754
            "AC": 14.6,
755
            "CC": 7.2,
756
            "AG": 8.2,
757
            "CG": 8.9,
758
            "AT": 25,
759
            "CT": 8.2,
760
            "GA": 8.8,
761
            "TA": 12.5,
762
            "GC": 11.1,
763
            "TC": 8.8,
764
            "GG": 7.2,
765
            "TG": 10.9,
766
            "GT": 14.6,
767
            "TT": 7.6,
768
            "CN": 7.2,
769
            "NC": 7.2,
770
            "GN": 7.2,
771
            "NG": 7.2,
772
            "NN": 7.2,
773
            "AN": 7.6,
774
            "NA": 7.6,
775
            "TN": 7.6,
776
            "NT": 7.6,
777
        }
778

779
        N = len(self.sequence)
×
780
        wby2 = int(window / 2)
×
UNCOV
781
        flex = np.zeros(N)
×
782

783
        from collections import deque
×
784

785
        slide = deque()
×
786

787
        # init first chunk
788
        for i in range(0, window):
×
789
            slide.append(flex_angles[self.sequence[i : i + 2]])
×
790
        flex[wby2] = sum(slide)
×
791

UNCOV
792
        for i in range(wby2, N - wby2):
×
UNCOV
793
            slide.append(flex_angles[self.sequence[i : i + 2]])
×
794
            slide.popleft()
×
795
            flex[i] = sum(slide)
×
796

797
        # handles the sides
798
        for i in range(0, wby2):
×
799
            flex[i] = flex[wby2]
×
800
            flex[N - i - 1] = flex[N - wby2 - 1]
×
801

802
        return flex / (window - 1)
×
803

804
    def get_entropy(self, window):
3✔
805
        step = 1
×
806
        N = len(self.sequence)
×
807
        wby2 = int(window / 2)
×
UNCOV
808
        entropy = np.zeros(N)
×
809

UNCOV
810
        import math
×
UNCOV
811
        from collections import Counter
×
812

UNCOV
813
        def calculate_entropy(counter, size):
×
UNCOV
814
            entropy = 0
×
UNCOV
815
            for count in counter.values():
×
816
                if count > 0:
×
817
                    p = count / size
×
818
                    entropy -= p * math.log2(p)
×
UNCOV
819
            return entropy
×
820

821
        # init first chunk
UNCOV
822
        counter = Counter(self.sequence[0:window])
×
UNCOV
823
        entropy[wby2] = calculate_entropy(counter, window)
×
824

825
        for i in range(wby2, N - wby2):
×
826
            outgoing = self.sequence[i - wby2]
×
827
            ingoing = self.sequence[i + wby2]
×
828
            counter[ingoing] += 1
×
829
            counter[outgoing] -= 1
×
830
            entropy[i] = calculate_entropy(counter, window)
×
831

832
        # handles the sides
UNCOV
833
        for i in range(0, wby2):
×
834
            entropy[i] = entropy[wby2]
×
835
            entropy[N - i - 1] = entropy[N - wby2 - 1]
×
836

UNCOV
837
        return entropy
×
838

839
    def get_informational_entropy(self, window=500, poly=3):
3✔
840
        # Informational Entropy calculated overlapping DNA triplet frequencies within a window.
841
        # overlapping triplets smooths the frame effect. IE = -p*log10(p)/log10(2)
842
        # rational of log10(2) ? reference ? based on UGENE
843

844
        N = len(self.sequence)
×
845
        wby2 = int(window / 2)
×
846
        entropy = np.zeros(N)
×
847

UNCOV
848
        import math
×
849
        from collections import defaultdict
×
850

851
        # Based on
UNCOV
852
        def calculate_entropy(counter, size):
×
853
            entropy = 0
×
UNCOV
854
            log10_2 = 0.301029995  # could use math.log10(2). slower
×
UNCOV
855
            for count in counter.values():
×
UNCOV
856
                if count > 0:
×
UNCOV
857
                    p = count / size
×
UNCOV
858
                    entropy -= p * math.log10(p) / log10_2
×
UNCOV
859
            return entropy
×
860

861
        # init first chunk
UNCOV
862
        counter = defaultdict(int)
×
UNCOV
863
        for i in range(window):
×
UNCOV
864
            counter[self.sequence[i : i + 3]] += 1
×
865

866
        entropy[wby2] = calculate_entropy(counter, window)
×
867

868
        for i in range(wby2, N - wby2):
×
869
            outgoing = self.sequence[i - wby2 : i - wby2 + 3]
×
870
            ingoing = self.sequence[i + wby2 : i + wby2 + 3]
×
871

872
            counter[ingoing] += 1
×
873
            counter[outgoing] -= 1
×
874
            entropy[i] = calculate_entropy(counter, window)
×
875

876
        # handles the sides
877
        for i in range(0, wby2):
×
878
            entropy[i] = entropy[wby2]
×
UNCOV
879
            entropy[N - i - 1] = entropy[N - wby2 - 1]
×
880

UNCOV
881
        return entropy
×
882

883
    def get_dinucleotide_count(self, window=100):
3✔
884
        res = [
×
885
            sum([self.sequence[i : i + window].count(x) for x in ["AA", "CC", "GG", "TT"]])
886
            for i in range(0, len(self.sequence))
887
        ]
888
        return res
×
889

890
    def get_trinucleotide_count(self, window=100):
3✔
891
        res = [
×
892
            sum([self.sequence[i : i + window].count(x) for x in ["AAA", "CCC", "GGG", "TTT"]])
893
            for i in range(0, len(self.sequence))
894
        ]
UNCOV
895
        return res
×
896

897
    def get_karlin_signature_difference(self, window=500, dinucleotide_only=False):
3✔
898
        # Karlin Signature Difference is dinucleotide absolute relative abundance difference
899
        # between the whole sequence and a sliding window.
900
        # f(XY) = frequency of the dinucleotide XY
901
        # f(X)  = frequency of the nucleotide X
902
        # p(XY) = f(XY) / (f(X) * f(Y))
903
        # p_seq(XY) = p(XY) for the whole sequence
904
        # p_win(XY) = p(XY) for a window
905
        # final metric is  sum(p_seq(XY) - p_win(XY)) / 16
906

UNCOV
907
        import math
×
908
        from collections import defaultdict
×
909

910
        N = len(self.sequence)
×
911
        wby2 = int(window / 2)
×
912
        karling_window = np.zeros(N)
×
913

UNCOV
914
        def calculate_karlin_difference(counter, global_counter, window):
×
UNCOV
915
            pXY = defaultdict(int)
×
UNCOV
916
            deltaXY = 0
×
UNCOV
917
            for x in "ACGT":
×
918
                for y in "ACGT":
×
UNCOV
919
                    if dinucleotide_only and x != y:
×
920
                        continue
×
921
                    xy = f"{x}{y}"
×
922
                    pXY[xy] = (
×
923
                        counter[xy]
924
                        / (2 * (window - 1))
925
                        / max([0.000000001, counter[x] / (2 * window) * counter[y] / (2 * window)])
926
                    )
927

UNCOV
928
                    deltaXY += abs(pXY[xy] - global_counter[xy])
×
929
            return deltaXY / 16.0
×
930

UNCOV
931
        counter_window = defaultdict(int)
×
932
        counter_seq = defaultdict(int)
×
933

934
        # init first chunk both local and global
935
        for i in range(window):
×
936
            # nucleotide
937
            counter_window[self.sequence[i]] += 1
×
UNCOV
938
            counter_seq[self.sequence[i]] += 1
×
939
            # dinucleotide
UNCOV
940
            counter_window[self.sequence[i : i + 2]] += 1
×
UNCOV
941
            counter_seq[self.sequence[i : i + 2]] += 1
×
942

943
        # need to compute the global pXY first (on the entire sequence)
UNCOV
944
        for i in range(wby2, N - 2):
×
945
            # nucleotide
UNCOV
946
            counter_seq[self.sequence[i]] += 1
×
947

948
            # dinucleotide update is more complicated so we start from stract
UNCOV
949
            counter_seq[self.sequence[i : i + 2]] += 1
×
950

951
        # compute final pXY
UNCOV
952
        for x in "ACGT":
×
UNCOV
953
            for y in "ACGT":
×
UNCOV
954
                xy = f"{x}{y}"
×
UNCOV
955
                N = len(self.sequence)
×
UNCOV
956
                counter_seq[xy] = (
×
957
                    counter_seq[xy]
958
                    / (2 * (N - 1))
959
                    / max([0.000000001, counter_seq[x] / (2 * N) * counter_seq[y] / (2 * N)])
960
                )
961

UNCOV
962
        karling_window[wby2] = calculate_karlin_difference(counter_window, counter_seq, window)
×
963

UNCOV
964
        for i in range(wby2, N - wby2):
×
UNCOV
965
            outgoing = self.sequence[i - wby2]
×
UNCOV
966
            ingoing = self.sequence[i + wby2]
×
967

968
            # nucleotide update
UNCOV
969
            counter_window[ingoing] += 1
×
UNCOV
970
            counter_window[outgoing] -= 1
×
971

972
            # dinucleotide update is more complicated so we start from stract
UNCOV
973
            counter_window[self.sequence[i + wby2 - 1 : i + wby2 + 1]] += 1
×
UNCOV
974
            counter_window[self.sequence[i - wby2 : i - wby2 + 2]] -= 1
×
975

UNCOV
976
            karling_window[i] = calculate_karlin_difference(counter_window, counter_seq, window)
×
977

978
        # handles the sides
UNCOV
979
        for i in range(0, wby2):
×
UNCOV
980
            karling_window[i] = karling_window[wby2]
×
UNCOV
981
            karling_window[N - i - 1] = karling_window[N - wby2 - 1]
×
982

UNCOV
983
        return karling_window
×
984

985

986
class RNA(Sequence):
3✔
987
    """Simple RNA class
988

989

990
    >>> from sequana.sequence import RNA
991
    >>> d = RNA("ACGUUUU")
992
    >>> d.reverse_complement()
993

994
    """
995

996
    def __init__(self, sequence):
3✔
997
        super(RNA, self).__init__(sequence, complement_in=b"ACGU", complement_out=b"UGCA", letters="ACGUN")
3✔
998

999

1000
class Repeats:
3✔
1001
    """Class for finding repeats in DNA or RNA linear sequences.
1002

1003
    Computation is performed each time the :attr:`threshold` is set
1004
    to a new value.
1005

1006
    .. plot::
1007
        :include-source:
1008

1009
        from sequana import sequana_data, Repeats
1010
        rr = Repeats(sequana_data("measles.fa"))
1011
        rr.threshold = 4
1012
        rr.hist_length_repeats()
1013

1014
    .. note:: Works with shustring package from Bioconda (April 2017)
1015
    .. todo:: use a specific sequence (first one by default). Others can be
1016
        selected by name
1017

1018
    """
1019

1020
    def __init__(self, filename_fasta, merge=False, name=None):
3✔
1021
        """.. rubric:: Constructor
1022

1023
        Input must be a fasta file with valid DNA or RNA characters
1024

1025
        :param str filename_fasta: a Fasta file, only the first
1026
            sequence is used !
1027
        :param int threshold: Minimal length of repeat to output
1028
        :param str name: if name is provided, scan the Fasta file
1029
            and select the corresponding sequence. if you want to
1030
            analyse all sequences, you need to use a loop by setting
1031
            _header for each sequence with the sequence name found in
1032
            sequence header.
1033

1034

1035
        .. note:: known problems. Header with a > character (e.g. in the
1036
            comment) are left strip and only the comments is kept. Another issue
1037
            is for multi-fasta where one sequence is ignored (last or first ?)
1038

1039
        """
1040
        # used to check everything is fine with the header/name
1041
        self._fasta = FastA(filename_fasta)
3✔
1042

1043
        # Define the attributes, and set the header if already provided
1044
        self._threshold = None
3✔
1045
        self._df_shustring = None
3✔
1046
        self._header = None
3✔
1047
        self._length = None
3✔
1048
        self._longest_shustring = None
3✔
1049
        self._begin_end_repeat_position = None
3✔
1050
        self._begin_end_repeat_position_merge = None
3✔
1051
        self._filename_fasta = filename_fasta
3✔
1052
        self._previous_thr = None
3✔
1053
        self._list_len_repeats = None
3✔
1054
        self._contig_names = None
3✔
1055
        if not isinstance(merge, bool):
3✔
UNCOV
1056
            raise TypeError("do_merge must be boolean")
×
1057
        self._do_merge = merge
3✔
1058
        if name is not None:
3✔
UNCOV
1059
            self.header = name
×
1060
        else:
1061
            self.header = self._fasta.names[0]
3✔
1062

1063
    def _get_header(self):
3✔
1064
        return self._header
3✔
1065

1066
    def _set_header(self, name):
3✔
1067
        if name not in self._fasta.names:
3✔
UNCOV
1068
            raise ValueError("invalid name. Use one of %s" % self._fasta.names)
×
1069
        self._header = name
3✔
1070
        self._df_shustring = None
3✔
1071

1072
    header = property(_get_header, _set_header)
3✔
1073

1074
    def _get_names(self):
3✔
1075
        if self._contig_names is None:
3✔
1076
            self._contig_names = self._fasta.names
3✔
1077
        return self._contig_names
3✔
1078

1079
    names = property(_get_names)
3✔
1080

1081
    def _get_shustrings_length(self):
3✔
1082
        """Return dataframe with shortest unique substring length at each position
1083
        shortest unique substrings are unique in the sequence and its complement
1084
        Uses shustring tool"""
1085
        if self._df_shustring is None:
3✔
1086
            # read fasta
1087
            task_read = subprocess.Popen(["cat", self._filename_fasta], stdout=subprocess.PIPE)
3✔
1088

1089
            # shustring command
1090
            # the -l option uses a regular expression
1091
            task_shus = subprocess.Popen(
3✔
1092
                # ["shustring", "-r", "-q", "-l", ">{}[\s,\n]*?".format(self.header)],
1093
                ["shustring", "-r", "-q", "-l", r">{}($|\s+)".format(self.header)],
1094
                stdin=task_read.stdout,
1095
                stdout=subprocess.PIPE,
1096
            )
1097

1098
            # read stdout line by line and append to list
1099
            list_df = []
3✔
1100
            for line in task_shus.stdout:
3✔
1101
                list_df.append(line.decode("utf8").replace("\n", "").split("\t"))
3✔
1102
                # df=pd.concat([df, line])
1103
            task_shus.wait()
3✔
1104

1105
            # convert to dataframe
1106
            df = pd.DataFrame(list_df[2 : len(list_df)])
3✔
1107
            self._df_shustring = df.astype(int)
3✔
1108
            self._df_shustring.columns = ["position", "repeat_length"]
3✔
1109

1110
            # get input sequence length and longest shustring in the first line
1111
            self._length = int(list_df[0][1])
3✔
1112
            self._longest_shustring = int(list_df[0][3].split("<=")[2])
3✔
1113
        return self._df_shustring
3✔
1114

1115
    df_shustring = property(_get_shustrings_length)
3✔
1116

1117
    def _get_genome_length(self):
3✔
1118
        if self._df_shustring is None:
3✔
UNCOV
1119
            self._get_shustrings_length()
×
1120
        return self._length
3✔
1121

1122
    length = property(_get_genome_length)
3✔
1123

1124
    def _get_longest_shustring(self):
3✔
1125
        if self._df_shustring is None:
3✔
UNCOV
1126
            self._get_shustrings_length()
×
1127
        return self._longest_shustring
3✔
1128

1129
    longest_shustring = property(_get_longest_shustring)
3✔
1130

1131
    def _find_begin_end_repeats(self, force=False):
3✔
1132
        """Returns position of repeats longer than threshold
1133
        as an ordered list
1134
        """
1135
        if self.df_shustring is None:
3✔
UNCOV
1136
            self._get_shustrings_length()
×
1137

1138
        if self._threshold is None:
3✔
1139
            # print("No threshold : please set minimul length of repeats to output")
UNCOV
1140
            raise ValueError("threshold : please set threshold (minimum length of repeats to output)")
×
1141

1142
        # if there is no result yet, or the threshold has changed
1143
        if (self._begin_end_repeat_position is None) | (self.threshold != self._previous_thr) | force:
3✔
1144
            nb_row = self.df_shustring.shape[0]
3✔
1145
            i = 0
3✔
1146
            step_repeat_seq = []
3✔
1147
            be_repeats = []
3✔
1148
            e = 0
3✔
1149

1150
            # use list because faster
1151
            list_len_shus = list(self.df_shustring.loc[:, "repeat_length"])
3✔
1152

1153
            while i < nb_row:
3✔
1154
                # begining of repeat
1155
                if list_len_shus[i] > self.threshold:
3✔
1156
                    b = i
3✔
1157
                    # compute new end of repeat
1158
                    len_repeat = list_len_shus[i]
3✔
1159
                    e = b + len_repeat
3✔
1160
                    # save (b,e)
1161
                    be_repeats.append((b, e))
3✔
1162
                    # update i
1163
                    i = e - 1
3✔
1164
                i += 1
3✔
1165

1166
            self._begin_end_repeat_position = be_repeats
3✔
1167

1168
        self._get_merge_repeats()
3✔
1169

1170
    def _get_be_repeats(self):
3✔
1171
        self._find_begin_end_repeats()
3✔
1172
        return self._begin_end_repeat_position
3✔
1173

1174
    begin_end_repeat_position = property(_get_be_repeats)
3✔
1175

1176
    def _set_threshold(self, value):
3✔
1177
        if value < 0:
3✔
UNCOV
1178
            raise ValueError("Threshold must be positive integer")
×
1179
        if value != self._threshold:
3✔
1180
            self._previous_thr = self._threshold
3✔
1181
        self._threshold = value
3✔
1182
        self._find_begin_end_repeats()
3✔
1183
        self._list_len_repeats = [tup[1] - tup[0] for tup in self._begin_end_repeat_position]
3✔
1184

1185
    def _get_threshold(self):
3✔
1186
        return self._threshold
3✔
1187

1188
    threshold = property(_get_threshold, _set_threshold)
3✔
1189

1190
    def _get_list_len_repeats(self):
3✔
1191
        if self._list_len_repeats is None:
3✔
UNCOV
1192
            raise UserWarning("Please set threshold (minimum length of repeats to output)")
×
1193
        return self._list_len_repeats
3✔
1194

1195
    list_len_repeats = property(_get_list_len_repeats)
3✔
1196

1197
    def _get_merge_repeats(self):
3✔
1198
        if self._do_merge:
3✔
1199
            # if there are repeats, merge repeats that are fragmented
1200
            if len(self._begin_end_repeat_position) > 0:
3✔
1201
                prev_tup = self._begin_end_repeat_position[0]
3✔
1202
                b = prev_tup[0]
3✔
1203
                begin_end_repeat_position_merge = []
3✔
1204
                for i in range(1, len(self._begin_end_repeat_position)):
3✔
1205
                    tup = self._begin_end_repeat_position[i]
3✔
1206

1207
                    if tup[0] == prev_tup[1]:
3✔
1208
                        # concat
1209
                        e = tup[1]
3✔
1210
                        prev_tup = tup
3✔
1211
                        if i == (len(self._begin_end_repeat_position) - 1):
3✔
1212
                            # last tup : append to result
UNCOV
1213
                            begin_end_repeat_position_merge.append((b, e))
×
1214

1215
                    else:
1216
                        # real end of repeat : append result and update b, e
1217
                        e = prev_tup[1]
3✔
1218
                        begin_end_repeat_position_merge.append((b, e))
3✔
1219
                        prev_tup = tup
3✔
1220
                        b = prev_tup[0]
3✔
1221
                        if i == (len(self._begin_end_repeat_position) - 1):
3✔
1222
                            # last tup : append to result
1223
                            begin_end_repeat_position_merge.append(tup)
3✔
1224

1225
                self._begin_end_repeat_position = begin_end_repeat_position_merge
3✔
1226

1227
    def _get_do_merge(self):
3✔
UNCOV
1228
        return self._do_merge
×
1229

1230
    def _set_do_merge(self, do_merge):
3✔
1231
        if not isinstance(do_merge, bool):
3✔
UNCOV
1232
            raise TypeError("do_merge must be boolean")
×
1233
        # if different
1234
        if do_merge != self._do_merge:
3✔
1235
            self._do_merge = do_merge
3✔
1236
            if self._do_merge:
3✔
1237
                # did not merge before, merge now
1238
                if self._begin_end_repeat_position is None:
3✔
UNCOV
1239
                    self._find_begin_end_repeats()
×
1240
            else:
1241
                # data is already merged : need to compute again to un-merge
1242
                self._find_begin_end_repeats(force=True)
3✔
1243

1244
    do_merge = property(_get_do_merge, _set_do_merge)
3✔
1245

1246
    def hist_length_repeats(
3✔
1247
        self,
1248
        bins=20,
1249
        alpha=0.5,
1250
        hold=False,
1251
        fontsize=12,
1252
        grid=True,
1253
        title="Repeat length",
1254
        xlabel="Repeat length",
1255
        ylabel="#",
1256
        logy=True,
1257
    ):
1258
        """Plots histogram of the repeat lengths"""
1259
        # check that user has set a threshold
1260
        if hold is False:
3✔
1261
            pylab.clf()
3✔
1262
        pylab.hist(self.list_len_repeats, alpha=alpha, bins=bins)
3✔
1263
        pylab.title(title)
3✔
1264
        pylab.xlabel(xlabel, fontsize=fontsize)
3✔
1265
        pylab.ylabel(ylabel, fontsize=fontsize)
3✔
1266
        if grid is True:
3✔
1267
            pylab.grid(True)
3✔
1268
        if logy:
3✔
1269
            pylab.semilogy()
3✔
1270

1271
    def plot(self, clf=True, fontsize=12):
3✔
1272
        if clf:
3✔
1273
            pylab.clf()
3✔
1274
        pylab.grid(True)
3✔
1275
        pylab.plot(self.df_shustring.repeat_length)
3✔
1276
        pylab.xlabel("Position (bp)", fontsize=fontsize)
3✔
1277
        pylab.ylabel("Repeat lengths (bp)", fontsize=fontsize)
3✔
1278
        pylab.ylim(bottom=0)
3✔
1279

1280
    def to_wig(self, filename, step=1000):
3✔
1281
        """export repeats into WIG format to import in IGV"""
1282
        assert self.threshold and self.df_shustring is not None
3✔
1283

1284
        from numpy import arange
3✔
1285

1286
        N = len(self.df_shustring)
3✔
1287
        with open(filename, "w") as fout:
3✔
1288
            fout.write(
3✔
1289
                f'track type=wiggle_0 name="Repeat Density" visibility=full fixedStep chrom=1 start=0 step={step} span={step}\n'
1290
            )
1291
            # min_period = 1 to prevent NAN at first position
1292
            rolling_max = self.df_shustring.rolling(step, step=step).max()
3✔
1293
            for _, row in rolling_max.iterrows():
3✔
1294
                if row.name == 0:
3✔
1295
                    continue
3✔
1296
                M = row.repeat_length
3✔
1297
                start = row.name - step
3✔
1298
                stop = row.name
3✔
1299
                fout.write(f"{self.header}\t{start}\t{stop}\t{row.repeat_length}\n")
3✔
1300

1301
    def get_peak_position_and_length(self, THRESHOLD=3000):
3✔
1302

UNCOV
1303
        df = self.df_shustring.copy()
×
1304
        # Filter for repeats above threshold
UNCOV
1305
        df = df[df["repeat_length"] > THRESHOLD].copy()
×
1306

1307
        # Sort by genomic position
UNCOV
1308
        df = df.sort_values(by="position").reset_index(drop=True)
×
1309

1310
        # Identify local maxima: where the value is greater than the left and right neighbors
UNCOV
1311
        df["prev"] = df["repeat_length"].shift(1, fill_value=0)
×
UNCOV
1312
        df["next"] = df["repeat_length"].shift(-1, fill_value=0)
×
1313

1314
        # Keep only local maxima
UNCOV
1315
        peaks = df[(df["repeat_length"] > df["prev"]) & (df["repeat_length"] > df["next"])]
×
1316

1317
        # Select only relevant columns
UNCOV
1318
        final_df = peaks[["position", "repeat_length"]]
×
1319

1320
        # Print results
UNCOV
1321
        print(f"Final number of peaks after filtering: {len(final_df)}")
×
UNCOV
1322
        return final_df
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc