• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sequana / sequana / 12978896060

26 Jan 2025 10:24PM UTC coverage: 71.796% (-1.8%) from 73.623%
12978896060

push

github

web-flow
Merge pull request #863 from cokelaer/dev

Major update with new modules and pyproject deprecation fixes

381 of 1011 new or added lines in 19 files covered. (37.69%)

10 existing lines in 8 files now uncovered.

14342 of 19976 relevant lines covered (71.8%)

2.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.42
/sequana/sequence.py
1
#  This file is part of Sequana software
2
#
3
#  Copyright (c) 2018-2022 - Sequana Development Team
4
#
5
#  Distributed under the terms of the 3-clause BSD license.
6
#  The full license is in the LICENSE file, distributed with this software.
7
#
8
#  website: https://github.com/sequana/sequana
9
#  documentation: http://sequana.readthedocs.io
10
#
11
##############################################################################
12
import re
4✔
13
import string
4✔
14
import subprocess
4✔
15
from collections import Counter, deque
4✔
16

17
import colorlog
4✔
18

19
from sequana.fasta import FastA
4✔
20
from sequana.lazy import numpy as np
4✔
21
from sequana.lazy import pandas as pd
4✔
22
from sequana.lazy import pylab
4✔
23

24
logger = colorlog.getLogger(__name__)
4✔
25

26

27
__all__ = ["DNA", "RNA", "Repeats", "Sequence"]
4✔
28

29

30
class Sequence(object):
4✔
31
    """Abstract base classe for other specialised sequences such as DNA.
32

33

34
    Sequenced is the base class for other classes such as :class:`DNA` and
35
    :class:`RNA`.
36

37
    .. doctest::
38

39
        from sequana import Sequence
40
        s = Sequence("ACGT")
41
        s.stats()
42
        s.get_complement()
43

44
    .. note:: You may use a Fasta file as input (see constructor)
45

46

47
    """
48

49
    def __init__(self, sequence, complement_in=b"ACGT", complement_out=b"TGCA", letters="ACGT"):
4✔
50
        """.. rubric:: Constructor
51

52
        A sequence is just a string stored in the :attr:`sequence` attribute. It
53
        has properties related to the type of alphabet authorised.
54

55
        :param str sequence: May be a string of a Fasta File, in which case only
56
            the first sequence is used.
57
        :param complement_in:
58
        :param complement_out:
59
        :param letters: authorise letters. Used in :meth:`check` only.
60

61
        .. todo:: use counter only once as a property
62

63
        """
64
        if sequence.endswith(".fa") or sequence.endswith(".fasta"):
4✔
65
            self.fasta = FastA(sequence)
4✔
66
            sequence = self.fasta.next().sequence.upper()
4✔
67
        else:  # assume correct string sequence
68
            pass
3✔
69

70
        self._data = sequence
4✔
71
        try:
4✔
72
            self._translate = string.maketrans(complement_in, complement_out)
4✔
73
        except:
4✔
74
            self._translate = bytes.maketrans(complement_in, complement_out)
4✔
75
        self._letters = letters
4✔
76

77
    def __iter__(self):
4✔
78
        return self
4✔
79

80
    def __next__(self):
4✔
81
        self._data = self.fasta.next().sequence.upper()
4✔
82
        return self._data
×
83

84
    def _get_sequence(self):
4✔
85
        return self._data
4✔
86

87
    sequence = property(_get_sequence)
4✔
88

89
    def get_complement(self):
4✔
90
        """Return complement"""
91
        return self._data.translate(self._translate)
4✔
92

93
    def get_reverse_complement(self):
4✔
94
        """Return reverse complement"""
95
        return self.get_complement()[::-1]
4✔
96

97
    def get_reverse(self):
4✔
98
        """Return reverse sequence"""
99
        return self._data[::-1]
4✔
100

101
    def complement(self):
4✔
102
        """Alias to :meth:`get_complement`"""
103
        self._data = self.get_complement()
4✔
104

105
    def reverse(self):
4✔
106
        """Alias to :meth:`get_reverse`"""
107
        self._data = self.get_reverse()
4✔
108

109
    def reverse_complement(self):
4✔
110
        """Alias to get_reverse_complement"""
111
        self._data = self.get_reverse_complement()
4✔
112

113
    def check(self):
4✔
114
        """Check that all letters are valid"""
115
        counter = Counter(self._data).keys()
4✔
116
        for key in counter:
4✔
117
            if key not in self._letters:
4✔
118
                raise ValueError("Found unexpected letter in the sequence (%s)" % key)
4✔
119

120
    def __len__(self):
4✔
121
        return len(self._data)
4✔
122

123
    def gc_content(self):
4✔
124
        """Return mean GC content"""
125
        c = Counter(self._data)
4✔
126
        ratio = (c["G"] + c["C"]) / len(self.sequence)
4✔
127
        return ratio
4✔
128

129
    def stats(self):
4✔
130
        """Return basic stats about the number of letters"""
131
        from collections import Counter
4✔
132

133
        return Counter(self.sequence)
4✔
134

135
    def get_statistics(self):
4✔
NEW
136
        from collections import Counter
×
137

NEW
138
        counter = Counter(self.sequence.upper())
×
NEW
139
        N = sum(list(counter.values()))
×
NEW
140
        counter["GC"] = counter["G"] + counter["C"]
×
NEW
141
        names = ["A", "C", "G", "T", "GC", "N"]
×
NEW
142
        basic_stats = [counter.get(x, 0) for x in ["A", "C", "G", "T", "GC", "N"]] + [N]
×
NEW
143
        basic_stats_pct = [float(x) / N * 100 for x in basic_stats]
×
NEW
144
        basic_stats = pd.DataFrame(
×
145
            {"count": basic_stats, "percentage": basic_stats_pct}, index=["A", "C", "G", "T", "GC", "N", "all"]
146
        )
NEW
147
        return {"single": basic_stats}
×
148

149
    def get_occurences(self, pattern, overlap=False):
4✔
150
        """Return position of the input pattern in the sequence
151

152
        ::
153

154
            >>> from sequana import Sequence
155
            >>> s = Sequence('ACGTTTTACGT')
156
            >>> s.get_occurences("ACGT")
157
            [0, 7]
158

159
        """
160
        if overlap is False:
4✔
161
            res = [m.start() for m in re.finditer(pattern, self.sequence)]
4✔
162
        elif overlap is True:
4✔
163
            res = [m.start() for m in re.finditer("(?=%s)" % pattern, self.sequence)]
4✔
164
        return res
4✔
165

166
        # reverse find-all without overlaps, you can combine positive and
167
        # negative lookahead into an expression like this:
168
        # res = [m.start() for m in re.finditer('(?=%s)(?!.{1,%d}%s)' % (search,
169
        #    len(pattern)-1, pattern), 'ttt')]
170

171

172
class DNA(Sequence):
4✔
173
    """Simple DNA class
174

175
    ::
176

177
        >>> from sequana.sequence import DNA
178
        >>> d = DNA("ACGTTTT")
179
        >>> d.reverse_complement()
180

181
    Some long computations are done when setting the window size::
182

183
        d.window = 100
184

185
    The ORF detection has been validated agains a plasmodium 3D7 ORF file
186
    found on plasmodb.org across the 14 chromosomes.
187

188
    """
189

190
    def __init__(
4✔
191
        self,
192
        sequence,
193
        codons_stop=["TAA", "TGA", "TAG"],
194
        codons_stop_rev=["TTA", "TCA", "CTA"],
195
        codons_start=["ATG"],
196
        codons_start_rev=["CAT"],
197
    ):
198
        super(DNA, self).__init__(sequence, complement_in=b"ACGT", complement_out=b"TGCA", letters="ACGTN")
4✔
199

200
        self._window = None
4✔
201
        self._type_window = None
4✔
202
        self._slide_window = None
4✔
203
        self._seq_right = None
4✔
204
        self._dict_nuc = dict_nuc = {"A": 0, "T": 1, "G": 2, "C": 3}
4✔
205
        self._cumul = None
4✔
206
        self._Xn = None
4✔
207
        self._Yn = None
4✔
208
        self._Zn = None
4✔
209
        self._ignored_nuc = None
4✔
210
        self._AT_skew_slide = None
4✔
211
        self._GC_skew_slide = None
4✔
212
        self._GC_content_slide = None
4✔
213
        self._AT_content_slide = None
4✔
214
        self._template_fft = None
4✔
215
        self._c_fft = None
4✔
216
        self._codons_stop = codons_stop
4✔
217
        self._codons_stop_rev = codons_stop_rev
4✔
218
        self._codons_start = codons_start
4✔
219
        self._codons_start_rev = codons_start_rev
4✔
220
        self._ORF_pos = None
4✔
221
        self._threshold = None
4✔
222
        self._type_filter = None
4✔
223

224
    def _get_window(self):
4✔
225
        return self._window
4✔
226

227
    def _set_window(self, window):
4✔
228
        if (window > 0) & (window < 1):
4✔
229
            self._type_window = "adapted to genome length : %.1f %% of total length" % (window * 100)
4✔
230
            self._window = int(round(self.__len__() * window))
4✔
231

232
        elif (window >= 1) & (window <= self.__len__()):
4✔
233
            self._type_window = "fixed window length : %d" % (window)
4✔
234
            self._window = int(window)
4✔
235
        else:
236
            raise ValueError(
4✔
237
                "Incorrect value for window: choose either float ]0,1]"
238
                + " (fraction of genome) or integer [1,genome_length] (window size)"
239
            )
240

241
        logger.info("Computing GC skew")
4✔
242
        self._compute_skews()
4✔
243

244
    window = property(_get_window, _set_window)
4✔
245

246
    def _get_type_window(self):
4✔
247
        return self._type_window
4✔
248

249
    type_window = property(_get_type_window)
4✔
250

251
    def _init_sliding_window(self):
4✔
252
        """slide_window : deque of n first nucleotides (size of window)
253
        seq_right : deque of the rest of the genome, with the n-1 nucleotides at the end (circular DNA)
254
        """
255
        # split sequence_reference in two : sliding window and seq_right
256
        # for circular genome : add begining at the end of genome
257
        self._slide_window = deque(self.sequence[0 : self._window])
4✔
258
        self._seq_right = deque(self.sequence[self._window : self.__len__()] + self.sequence[0 : (self._window - 1)])
4✔
259

260
    def _init_list_results(self):
4✔
261
        # init IJ content and IJ skew
262
        IJ_content_res = np.empty((1, self.__len__()))
4✔
263
        IJ_content_res[:] = np.nan
4✔
264
        IJ_skew_res = np.empty((1, self.__len__()))
4✔
265
        IJ_skew_res[:] = np.nan
4✔
266
        return IJ_content_res, IJ_skew_res
4✔
267

268
    def _init_cumul_nuc(self):
4✔
269
        """Cumulative of nucleotide count along genome (init from first window)"""
270
        # ATGC (index stored in self._dict_nuc)
271
        cumul = np.zeros((4, (self.__len__() + self._window)))
4✔
272
        for j in range(self._window):
4✔
273
            nuc = self.sequence[j]
4✔
274
            if nuc in self._dict_nuc:
4✔
275
                cumul[self._dict_nuc[nuc]][j] += 1
4✔
276
        self._cumul = cumul
4✔
277

278
    def _compute_skews(self):
4✔
279
        ### initialisation =  Calculating GC skew and AT skew for first window
280
        self._init_sliding_window()
4✔
281
        GC_content_slide, GC_skew_slide = self._init_list_results()
4✔
282
        AT_content_slide, AT_skew_slide = self._init_list_results()
4✔
283
        karlin_content_slide, karlin_skew_slide = self._init_list_results()
4✔
284

285
        self._init_cumul_nuc()
4✔
286

287
        c = Counter(self._slide_window)
4✔
288
        dict_counts = {"G": c["G"], "C": c["C"], "A": c["A"], "T": c["T"]}
4✔
289
        i = 0
4✔
290

291
        # GC
292
        sumGC = float(dict_counts["G"] + dict_counts["C"])
4✔
293
        GC_content_slide[0][i] = sumGC
4✔
294
        if sumGC > 0:
4✔
295
            GC_skew_slide[0][i] = (dict_counts["G"] - dict_counts["C"]) / sumGC
4✔
296
        # AT
297
        sumAT = float(dict_counts["A"] + dict_counts["T"])
4✔
298
        AT_content_slide[0][i] = sumAT
4✔
299
        if sumAT > 0:
4✔
300
            AT_skew_slide[0][i] = (dict_counts["A"] - dict_counts["T"]) / sumAT
4✔
301

302
        # karlin difference measures excess of purines over pyrimidines as G - C + A - T
303
        sumAG = float(dict_counts["A"] + dict_counts["G"])
4✔
304
        sumCT = float(dict_counts["C"] + dict_counts["T"])
4✔
305
        karlin_content_slide[0][i] = sumAG - sumCT
4✔
306

307
        ### Compute for all genome
308
        while self._seq_right:
4✔
309
            out_nuc = self._slide_window.popleft()
4✔
310
            in_nuc = self._seq_right.popleft()
4✔
311
            self._slide_window.append(in_nuc)
4✔
312

313
            i += 1
4✔
314

315
            if i % 500000 == 0:  # pragma: no cover
316
                logger.info("%d / %d" % (i, self.__len__()))
317
            # if in and out are the same : do nothing, append same result
318
            if out_nuc != in_nuc:
4✔
319
                # remove out from counters
320
                if out_nuc in self._dict_nuc:
4✔
321
                    dict_counts[out_nuc] -= 1
4✔
322
                if in_nuc in self._dict_nuc:
4✔
323
                    dict_counts[in_nuc] += 1
4✔
324
                sumGC = float(dict_counts["G"] + dict_counts["C"])
4✔
325
                sumAT = float(dict_counts["A"] + dict_counts["T"])
4✔
326
                sumAG = float(dict_counts["A"] + dict_counts["G"])
4✔
327
                sumCT = float(dict_counts["C"] + dict_counts["T"])
4✔
328

329
            # fill results
330
            # GC
331
            GC_content_slide[0][i] = sumGC
4✔
332
            if sumGC > 0:
4✔
333
                GC_skew_slide[0][i] = (dict_counts["G"] - dict_counts["C"]) / sumGC
4✔
334
            # AT
335
            AT_content_slide[0][i] = sumAT
4✔
336
            if sumAT > 0:
4✔
337
                AT_skew_slide[0][i] = (dict_counts["A"] - dict_counts["T"]) / sumAT
4✔
338
            # cumul
339
            if in_nuc in self._dict_nuc:
4✔
340
                self._cumul[self._dict_nuc[in_nuc]][i + self._window - 1] += 1
4✔
341

342
            karlin_content_slide[0][i] = sumAG - sumCT
4✔
343

344
        self._GC_content_slide = GC_content_slide / float(self._window)
4✔
345
        self._AT_content_slide = AT_content_slide / float(self._window)
4✔
346
        self._karlin_content_slide = karlin_content_slide / float(self._window)
4✔
347
        self._cumul = np.delete(self._cumul, range(self.__len__(), self._cumul.shape[1]), 1)
4✔
348
        self._cumul = np.cumsum(self._cumul, axis=1)
4✔
349

350
        ### save result for Z curve
351
        self._Xn = list(
4✔
352
            (self._cumul[self._dict_nuc["A"]] + self._cumul[self._dict_nuc["G"]])
353
            - (self._cumul[self._dict_nuc["C"]] + self._cumul[self._dict_nuc["T"]])
354
        )
355

356
        self._Yn = list(
4✔
357
            (self._cumul[self._dict_nuc["A"]] + self._cumul[self._dict_nuc["C"]])
358
            - (self._cumul[self._dict_nuc["G"]] + self._cumul[self._dict_nuc["T"]])
359
        )
360

361
        self._Zn = list(
4✔
362
            (self._cumul[self._dict_nuc["A"]] + self._cumul[self._dict_nuc["T"]])
363
            - (self._cumul[self._dict_nuc["C"]] + self._cumul[self._dict_nuc["G"]])
364
        )
365

366
        self._AT_skew_slide = AT_skew_slide
4✔
367
        self._GC_skew_slide = GC_skew_slide
4✔
368

369
        ### check proportion of ignored nucleotides
370
        GC_content_total = (self._cumul[self._dict_nuc["G"]][-1] + self._cumul[self._dict_nuc["C"]][-1]) / float(
4✔
371
            self.__len__()
372
        )
373
        AT_content_total = (self._cumul[self._dict_nuc["A"]][-1] + self._cumul[self._dict_nuc["T"]][-1]) / float(
4✔
374
            self.__len__()
375
        )
376
        self._ignored_nuc = 1.0 - GC_content_total - AT_content_total
4✔
377

378
    def _get_AT_skew(self):
4✔
379
        if self._AT_skew_slide is None:  # pragma: no cover
380
            raise AttributeError("Please set a valid window to compute skew")
381
        else:
382
            return self._AT_skew_slide
4✔
383

384
    AT_skew = property(_get_AT_skew)
4✔
385

386
    def _get_GC_skew(self):
4✔
387
        if self._GC_skew_slide is None:  # pragma: no cover
388
            raise AttributeError("Please set a valid window to compute skew")
389
        else:
390
            return self._GC_skew_slide
4✔
391

392
    GC_skew = property(_get_GC_skew)
4✔
393

394
    def _create_template_fft(self, M=1000):  # pragma: no cover
395
        M_3 = int(M / 3)
396
        W = [-0.5] * M_3 + list(np.linspace(-0.5, 0.5, M - 2 * M_3)) + [0.5] * M_3
397
        return list(W * np.hanning(M))
398

399
    def _myfft_gc_skew(self, M):  # pragma: no cover
400
        """
401
        x : GC_skew vector (list)
402
        param N: length of the GC skew vector
403
        param M: length of the template
404
        param A: amplitude between positive and negative GC skew vector
405

406
        """
407
        x = self._GC_skew_slide[0]
408

409
        N = len(x)
410
        template = self._create_template_fft(M) + [0] * (N - M)
411
        template /= pylab.norm(template)
412
        c = np.fft.fft(x)
413
        c = (
414
            abs(np.fft.ifft(np.fft.fft(x) * pylab.conj(np.fft.fft(template))) ** 2)
415
            / pylab.norm(x)
416
            / pylab.norm(template)
417
        )
418
        # shift the SNR vector by the template length so that the peak is at the END of the template
419
        c = np.roll(c, M // 2)
420
        self._template_fft = template
421
        self._c_fft = c * 2.0 / N
422

423
    def plot_all_skews(self, figsize=(10, 12), fontsize=16, alpha=0.5):
4✔
424
        if self._window is None:  # pragma: no cover
425
            raise AttributeError("Please set a valid window to compute skew")
426

427
        # create figure
428
        fig, axarr = pylab.subplots(9, 1, sharex=True, figsize=figsize)
4✔
429

430
        main_title = (
4✔
431
            "Window size = %d (%.0f %% of genome )\n\
432
        GC content = %.0f %%, AT content = %.0f %%, ignored = %.0f %%"
433
            % (
434
                self._window,
435
                self._window * 100 / self.__len__(),
436
                self.gc_content() * 100,
437
                (1 - self.gc_content()) * 100,
438
                self._ignored_nuc * 100,
439
            )
440
        )
441

442
        pylab.suptitle(main_title, fontsize=fontsize)
4✔
443

444
        # GC skew
445
        axarr[0].set_title("GC skew (blue) - Cumulative sum (red)")
4✔
446
        axarr[0].plot(list(self._GC_skew_slide[0]), "b-", alpha=alpha)
4✔
447
        axarr[0].set_ylabel("(G -C) / (G + C)")
4✔
448

449
        axarr[1].plot(list(np.cumsum(self._GC_skew_slide[0])), "r-", alpha=alpha)
4✔
450
        axarr[1].set_ylabel("(G -C) / (G + C)")
4✔
451

452
        # AT skew
453
        axarr[2].set_title("AT skew (blue) - Cumulative sum (red)")
4✔
454
        axarr[2].plot(list(self._AT_skew_slide[0]), "b-", alpha=alpha)
4✔
455
        axarr[2].set_ylabel("(A -T) / (A + T)")
4✔
456

457
        axarr[3].plot(list(np.cumsum(self._AT_skew_slide[0])), "r-", alpha=alpha)
4✔
458
        axarr[3].set_ylabel("(A -T) / (A + T)", rotation=0)
4✔
459

460
        # Xn
461
        axarr[4].set_title("Cumulative RY skew (Purine - Pyrimidine)")
4✔
462
        axarr[4].plot(self._Xn, "g-", alpha=alpha)
4✔
463
        axarr[4].set_ylabel("(A + G) - (C + T)")
4✔
464

465
        # Yn
466
        axarr[5].set_title("Cumulative MK skew (Amino - Keto)")
4✔
467
        axarr[5].plot(self._Yn, "g-", alpha=alpha)
4✔
468
        axarr[5].set_ylabel("(A + C) - (G + T)")
4✔
469

470
        # Zn
471
        axarr[6].set_title("Cumulative H-bond skew (Weak H-bond - Strong H-bond)")
4✔
472
        axarr[6].plot(self._Zn, "g-", alpha=alpha)
4✔
473
        axarr[6].set_ylabel("(A + T) - (G + C)")
4✔
474

475
        # GC content
476
        axarr[7].set_title("GC content")
4✔
477
        axarr[7].plot(list(self._GC_content_slide[0]), "k-", alpha=alpha)
4✔
478
        axarr[7].set_ylabel("GC")
4✔
479

480
        # AT content
481
        axarr[8].set_title("AT content")
4✔
482
        axarr[8].plot(list(self._AT_content_slide[0]), "k-", alpha=alpha)
4✔
483
        axarr[8].set_ylabel("AT")
4✔
484

485
        # # FFT
486
        # axarr[9].set_title("FFT")
487
        # axarr[9].plot(list(self._c_fft),'g-',alpha=alpha)
488
        # axarr[9].set_ylabel("FFT")
489

490
        fig.tight_layout()
4✔
491
        fig.subplots_adjust(top=0.88)
4✔
492

493
    def _update_ORF_frame(self, i, nuc, j, frame, d_vars):
4✔
494
        d_vars["codon"][j] = d_vars["codon"][j] + nuc
4✔
495

496
        if len(d_vars["codon"][j]) == 3:
4✔
497
            # test for start forward (if none already found)
498
            is_start = d_vars["codon"][j] in self._codons_start
4✔
499
            if is_start & np.isnan(d_vars["pos_ATG_f"][j]):
4✔
500
                d_vars["pos_ATG_f"][j] = i - 2
4✔
501
            # test for stop forward
502
            is_stop = d_vars["codon"][j] in self._codons_stop
4✔
503
            if is_stop:
4✔
504
                d_vars["end_f"][j] = i
4✔
505
                # test is length of CDS or ORF found is enough to be in output
506
                if self._type_filter == "CDS":
4✔
507
                    len_to_filter = d_vars["end_f"][j] - d_vars["pos_ATG_f"][j]  # len_CDS
×
508
                else:
509
                    len_to_filter = d_vars["end_f"][j] - d_vars["begin_f"][j]  # len_ORF
4✔
510

511
                if len_to_filter > self._threshold:
4✔
512
                    len_ORF = d_vars["end_f"][j] - d_vars["begin_f"][j]
4✔
513
                    len_CDS = d_vars["end_f"][j] - d_vars["pos_ATG_f"][j]
4✔
514
                    self._ORF_pos.append(
4✔
515
                        [
516
                            d_vars["begin_f"][j],
517
                            d_vars["end_f"][j],
518
                            frame + 1,
519
                            d_vars["pos_ATG_f"][j],
520
                            len_ORF,
521
                            len_CDS,
522
                        ]
523
                    )
524
                d_vars["begin_f"][j] = i + 1
4✔
525
                d_vars["pos_ATG_f"][j] = np.nan
4✔
526

527
            # test for start reverse
528
            is_start_rev = d_vars["codon"][j] in self._codons_start_rev
4✔
529
            if is_start_rev:
4✔
530
                d_vars["pos_ATG_r"][j] = i
4✔
531
            # test for stop reverse
532
            is_stop_rev = d_vars["codon"][j] in self._codons_stop_rev
4✔
533
            if is_stop_rev:
4✔
534
                d_vars["end_r"][j] = i - 3
4✔
535
                # test is length of CDS or ORF found is enough to be in output
536
                if self._type_filter == "CDS":
4✔
537
                    len_to_filter = d_vars["pos_ATG_r"][j] - d_vars["begin_r"][j]  # len_CDS
×
538
                else:
539
                    len_to_filter = d_vars["end_r"][j] - d_vars["begin_r"][j]  # len_ORF
4✔
540

541
                if len_to_filter > self._threshold:
4✔
542
                    len_ORF = d_vars["end_r"][j] - d_vars["begin_r"][j]
4✔
543
                    len_CDS = d_vars["pos_ATG_r"][j] - d_vars["begin_r"][j]
4✔
544
                    self._ORF_pos.append(
4✔
545
                        [
546
                            d_vars["begin_r"][j],
547
                            d_vars["end_r"][j],
548
                            -(frame + 1),
549
                            d_vars["pos_ATG_r"][j],
550
                            len_ORF,
551
                            len_CDS,
552
                        ]
553
                    )
554
                d_vars["begin_r"][j] = i - 3 + 1
4✔
555
                d_vars["pos_ATG_r"][j] = np.nan
4✔
556

557
            # reset codon
558
            d_vars["codon"][j] = ""
4✔
559

560
        return d_vars
4✔
561

562
    def _find_ORF_CDS(self):
4✔
563
        """Function for finding ORF and CDS in both strands of DNA"""
564
        # init variables
565
        d_vars = {
4✔
566
            "codon": ["", "-", "--"],
567
            "begin_f": [0] * 3,
568
            "begin_r": [0] * 3,
569
            "end_f": [0] * 3,
570
            "end_r": [0] * 3,
571
            "pos_ATG_f": [np.nan] * 3,
572
            "pos_ATG_r": [np.nan] * 3,
573
        }
574
        print("Finding ORF and CDS")
4✔
575
        # result list
576
        self._ORF_pos = []
4✔
577

578
        if self._threshold is None:
4✔
579
            self._threshold = 0
4✔
580

581
        if self._type_filter is None:
4✔
582
            self._type_filter = "ORF"
4✔
583

584
        # search ORF and CDS
585
        for i, nuc in enumerate(self.sequence):
4✔
586
            # print(i, nuc)
587
            frame = (i + 3 - 2) % 3
4✔
588
            # if (i % 200000)==0:
589
            #     print("%d / %d" %(i, len_genome))
590
            for j in range(3):
4✔
591
                d_vars = self._update_ORF_frame(i, nuc, j, frame, d_vars)
4✔
592

593
        # convert result to dataframe
594
        self._ORF_pos = pd.DataFrame(self._ORF_pos)
4✔
595
        self._ORF_pos.columns = [
4✔
596
            "begin_pos",
597
            "end_pos",
598
            "frame",
599
            "pos_ATG",
600
            "len_ORF",
601
            "len_CDS",
602
        ]
603

604
    def _get_ORF_pos(self):
4✔
605
        if self._ORF_pos is None:
4✔
606
            self._find_ORF_CDS()
4✔
607
        return self._ORF_pos
4✔
608

609
    ORF_pos = property(_get_ORF_pos)
4✔
610

611
    def _get_threshold(self):
4✔
612
        return self._threshold
4✔
613

614
    def _set_threshold(self, value):
4✔
615
        if value < 0:
4✔
616
            raise ValueError("Threshold cannot be negative")
×
617
        if (value < self._threshold) | (self._threshold is None):
4✔
618
            # need to compute again the result
619
            self._threshold = value
4✔
620
            self._find_ORF_CDS()
4✔
621
        elif value > self._threshold:
4✔
622
            # do not compute result again : filter existing df
623
            self._threshold = value
4✔
624
            if self._type_filter == "ORF":
4✔
625
                self._ORF_pos = self._ORF_pos[self._ORF_pos["len_ORF"] > self._threshold]
×
626
            elif self._type_filter == "CDS":
4✔
627
                self._ORF_pos = self._ORF_pos[self._ORF_pos["len_CDS"] > self._threshold]
4✔
628

629
    threshold = property(_get_threshold, _set_threshold)
4✔
630

631
    def _get_type_filter(self):
4✔
632
        return self._type_filter
4✔
633

634
    def _set_type_filter(self, value):
4✔
635
        if value not in ["ORF", "CDS"]:
4✔
636
            raise ValueError("Please select valid type of filter : ORF (default), CDS")
4✔
637
        if self._ORF_pos is None:
4✔
638
            self._type_filter = value
×
639
        elif self._type_filter != value:
4✔
640
            self._type_filter = value
4✔
641
            if value == "ORF":
4✔
642
                # need to compute again the result
643
                self._find_ORF_CDS()
4✔
644
            else:
645
                # need to filter the result by CDS length
646
                self._ORF_pos = self._ORF_pos[self._ORF_pos["len_CDS"] > self._threshold]
4✔
647

648
    type_filter = property(_get_type_filter, _set_type_filter)
4✔
649

650
    def hist_ORF_CDS_linearscale(self, alpha=0.5, bins=40, xlabel="Length", ylabel="#"):
4✔
651
        if self._ORF_pos is None:
4✔
652
            self._find_ORF_CDS()
×
653

654
        n_ORF = self._ORF_pos["len_ORF"].dropna().shape[0]
4✔
655
        n_CDS = self._ORF_pos["len_CDS"].dropna().shape[0]
4✔
656

657
        # plot for all ORF and CDS
658
        pylab.hist(
4✔
659
            self._ORF_pos["len_ORF"].dropna(),
660
            alpha=alpha,
661
            label="ORF, N = " + str(n_ORF),
662
            bins=bins,
663
        )
664
        pylab.hist(
4✔
665
            self._ORF_pos["len_CDS"].dropna(),
666
            alpha=alpha,
667
            label="CDS, N = " + str(n_CDS),
668
            bins=bins,
669
        )
670
        pylab.xlabel(xlabel)
4✔
671
        pylab.ylabel(ylabel)
4✔
672
        pylab.legend()
4✔
673
        pylab.title("Length of ORF and CDS (after filter %s > %d)" % (self._type_filter, self._threshold))
4✔
674

675
    def hist_ORF_CDS_logscale(self, alpha=0.5, bins=40, xlabel="Length", ylabel="#"):
4✔
676
        self.hist_ORF_CDS_linearscale(alpha, bins, xlabel, ylabel)
4✔
677
        pylab.yscale("log")
4✔
678

679
    def barplot_count_ORF_CDS_by_frame(self, alpha=0.5, bins=40, xlabel="Frame", ylabel="#", bar_width=0.35):
4✔
680
        if self._ORF_pos is None:  # pragma: no cover
681
            self._find_ORF_CDS()
682
        # number of ORF and CDS found by frame
683
        frames = [-3, -2, -1, 1, 2, 3]
4✔
684
        nb_res_ORF = []
4✔
685
        nb_res_CDS = []
4✔
686
        for fr in frames:
4✔
687
            nb_res_ORF.append(self._ORF_pos[self._ORF_pos["frame"] == fr]["len_ORF"].dropna().shape[0])
4✔
688
            nb_res_CDS.append(self._ORF_pos[self._ORF_pos["frame"] == fr]["len_CDS"].dropna().shape[0])
4✔
689

690
        pylab.bar(
4✔
691
            np.array(frames) - (bar_width / 2),
692
            nb_res_ORF,
693
            bar_width,
694
            alpha=alpha,
695
            label="ORF N = %d" % sum(nb_res_ORF),
696
        )
697
        pylab.bar(
4✔
698
            np.array(frames) + (bar_width / 2),
699
            nb_res_CDS,
700
            bar_width,
701
            alpha=alpha,
702
            label="CDS N = %d" % sum(nb_res_CDS),
703
        )
704
        pylab.xlabel(xlabel)
4✔
705
        pylab.ylabel(ylabel)
4✔
706
        pylab.legend(loc=1)
4✔
707
        pylab.title("Number of ORF and CDS by frame")
4✔
708

709
    def entropy(self, sequence):
4✔
710
        # https://www.sciencedirect.com/science/article/pii/S0022519397904938?via%3Dihub
711
        from pylab import log
4✔
712

713
        pi = [sequence.count(l) / float(len(sequence)) for l in "ACGT"]
4✔
714
        pi = [x for x in pi if x != 0]
4✔
715
        return -sum(pi * log(pi))
4✔
716

717
    def get_dna_flexibility(self, window=100, step=1, threshold=13.7):
4✔
718

719
        # flexibility angle
720
        # Source: Drew and Travers (1984), based on DNase I digestion experiments.
721

722
        # if N is present, a minimum value is used ( 7.2 with GC, 7.6 with AT)
NEW
723
        flex_angles = {
×
724
            "AA": 7.6,
725
            "CA": 10.9,
726
            "AC": 14.6,
727
            "CC": 7.2,
728
            "AG": 8.2,
729
            "CG": 8.9,
730
            "AT": 25,
731
            "CT": 8.2,
732
            "GA": 8.8,
733
            "TA": 12.5,
734
            "GC": 11.1,
735
            "TC": 8.8,
736
            "GG": 7.2,
737
            "TG": 10.9,
738
            "GT": 14.6,
739
            "TT": 7.6,
740
            "CN": 7.2,
741
            "NC": 7.2,
742
            "GN": 7.2,
743
            "NG": 7.2,
744
            "NN": 7.2,
745
            "AN": 7.6,
746
            "NA": 7.6,
747
            "TN": 7.6,
748
            "NT": 7.6,
749
        }
750

NEW
751
        N = len(self.sequence)
×
NEW
752
        wby2 = int(window / 2)
×
NEW
753
        flex = np.zeros(N)
×
754

NEW
755
        from collections import deque
×
756

NEW
757
        slide = deque()
×
758

759
        # init first chunk
NEW
760
        for i in range(0, window):
×
NEW
761
            slide.append(flex_angles[self.sequence[i : i + 2]])
×
NEW
762
        flex[wby2] = sum(slide)
×
763

NEW
764
        for i in range(wby2, N - wby2):
×
NEW
765
            slide.append(flex_angles[self.sequence[i : i + 2]])
×
NEW
766
            slide.popleft()
×
NEW
767
            flex[i] = sum(slide)
×
768

769
        # handles the sides
NEW
770
        for i in range(0, wby2):
×
NEW
771
            flex[i] = flex[wby2]
×
NEW
772
            flex[N - i - 1] = flex[N - wby2 - 1]
×
773

NEW
774
        return flex / (window - 1)
×
775

776
    def get_entropy(self, window):
4✔
NEW
777
        step = 1
×
NEW
778
        N = len(self.sequence)
×
NEW
779
        wby2 = int(window / 2)
×
NEW
780
        entropy = np.zeros(N)
×
781

NEW
782
        import math
×
NEW
783
        from collections import Counter
×
784

NEW
785
        def calculate_entropy(counter, size):
×
NEW
786
            entropy = 0
×
NEW
787
            for count in counter.values():
×
NEW
788
                if count > 0:
×
NEW
789
                    p = count / size
×
NEW
790
                    entropy -= p * math.log2(p)
×
NEW
791
            return entropy
×
792

793
        # init first chunk
NEW
794
        counter = Counter(self.sequence[0:window])
×
NEW
795
        entropy[wby2] = calculate_entropy(counter, window)
×
796

NEW
797
        for i in range(wby2, N - wby2):
×
NEW
798
            outgoing = self.sequence[i - wby2]
×
NEW
799
            ingoing = self.sequence[i + wby2]
×
NEW
800
            counter[ingoing] += 1
×
NEW
801
            counter[outgoing] -= 1
×
NEW
802
            entropy[i] = calculate_entropy(counter, window)
×
803

804
        # handles the sides
NEW
805
        for i in range(0, wby2):
×
NEW
806
            entropy[i] = entropy[wby2]
×
NEW
807
            entropy[N - i - 1] = entropy[N - wby2 - 1]
×
808

NEW
809
        return entropy
×
810

811
    def get_informational_entropy(self, window=500, poly=3):
4✔
812
        # Informational Entropy calculated overlapping DNA triplet frequencies within a window.
813
        # overlapping triplets smooths the frame effect. IE = -p*log10(p)/log10(2)
814
        # rational of log10(2) ? reference ? based on UGENE
815

NEW
816
        N = len(self.sequence)
×
NEW
817
        wby2 = int(window / 2)
×
NEW
818
        entropy = np.zeros(N)
×
819

NEW
820
        import math
×
NEW
821
        from collections import defaultdict
×
822

823
        # Based on
NEW
824
        def calculate_entropy(counter, size):
×
NEW
825
            entropy = 0
×
NEW
826
            log10_2 = 0.301029995  # could use math.log10(2). slower
×
NEW
827
            for count in counter.values():
×
NEW
828
                if count > 0:
×
NEW
829
                    p = count / size
×
NEW
830
                    entropy -= p * math.log10(p) / log10_2
×
NEW
831
            return entropy
×
832

833
        # init first chunk
NEW
834
        counter = defaultdict(int)
×
NEW
835
        for i in range(window):
×
NEW
836
            counter[self.sequence[i : i + 3]] += 1
×
837

NEW
838
        entropy[wby2] = calculate_entropy(counter, window)
×
839

NEW
840
        for i in range(wby2, N - wby2):
×
NEW
841
            outgoing = self.sequence[i - wby2 : i - wby2 + 3]
×
NEW
842
            ingoing = self.sequence[i + wby2 : i + wby2 + 3]
×
843

NEW
844
            counter[ingoing] += 1
×
NEW
845
            counter[outgoing] -= 1
×
NEW
846
            entropy[i] = calculate_entropy(counter, window)
×
847

848
        # handles the sides
NEW
849
        for i in range(0, wby2):
×
NEW
850
            entropy[i] = entropy[wby2]
×
NEW
851
            entropy[N - i - 1] = entropy[N - wby2 - 1]
×
852

NEW
853
        return entropy
×
854

855
    def get_karlin_signature_difference(self, window=500):
4✔
856
        # Karlin Signature Difference is dinucleotide absolute relative abundance difference
857
        # between the whole sequence and a sliding window.
858
        # f(XY) = frequency of the dinucleotide XY
859
        # f(X)  = frequency of the nucleotide X
860
        # p(XY) = f(XY) / (f(X) * f(Y))
861
        # p_seq(XY) = p(XY) for the whole sequence
862
        # p_win(XY) = p(XY) for a window
863
        # final metric is  sum(p_seq(XY) - p_win(XY)) / 16
864

NEW
865
        import math
×
NEW
866
        from collections import defaultdict
×
867

NEW
868
        N = len(self.sequence)
×
NEW
869
        wby2 = int(window / 2)
×
NEW
870
        karling_window = np.zeros(N)
×
871

NEW
872
        def calculate_karlin_difference(counter, global_counter, window):
×
NEW
873
            pXY = defaultdict(int)
×
NEW
874
            deltaXY = 0
×
NEW
875
            for x in "ACGT":
×
NEW
876
                for y in "ACGT":
×
NEW
877
                    xy = f"{x}{y}"
×
NEW
878
                    pXY[xy] = (
×
879
                        counter[xy]
880
                        / (2 * (window - 1))
881
                        / max([0.000000001, counter[x] / (2 * window) * counter[y] / (2 * window)])
882
                    )
883

NEW
884
                    deltaXY += abs(pXY[xy] - global_counter[xy])
×
NEW
885
            return deltaXY / 16.0
×
886

NEW
887
        counter_window = defaultdict(int)
×
NEW
888
        counter_seq = defaultdict(int)
×
889

890
        # init first chunk both local and global
NEW
891
        for i in range(window):
×
892
            # nucleotide
NEW
893
            counter_window[self.sequence[i]] += 1
×
NEW
894
            counter_seq[self.sequence[i]] += 1
×
895
            # dinucleotide
NEW
896
            counter_window[self.sequence[i : i + 2]] += 1
×
NEW
897
            counter_seq[self.sequence[i : i + 2]] += 1
×
898

899
        # need to compute the global pXY first (on the entire sequence)
NEW
900
        for i in range(wby2, N - 2):
×
901
            # nucleotide
NEW
902
            counter_seq[self.sequence[i]] += 1
×
903

904
            # dinucleotide update is more complicated so we start from stract
NEW
905
            counter_seq[self.sequence[i : i + 2]] += 1
×
906

907
        # compute final pXY
NEW
908
        for x in "ACGT":
×
NEW
909
            for y in "ACGT":
×
NEW
910
                xy = f"{x}{y}"
×
NEW
911
                N = len(self.sequence)
×
NEW
912
                counter_seq[xy] = (
×
913
                    counter_seq[xy]
914
                    / (2 * (N - 1))
915
                    / max([0.000000001, counter_seq[x] / (2 * N) * counter_seq[y] / (2 * N)])
916
                )
917

NEW
918
        karling_window[wby2] = calculate_karlin_difference(counter_window, counter_seq, window)
×
919

NEW
920
        for i in range(wby2, N - wby2):
×
NEW
921
            outgoing = self.sequence[i - wby2]
×
NEW
922
            ingoing = self.sequence[i + wby2]
×
923

924
            # nucleotide update
NEW
925
            counter_window[ingoing] += 1
×
NEW
926
            counter_window[outgoing] -= 1
×
927

928
            # dinucleotide update is more complicated so we start from stract
NEW
929
            counter_window[self.sequence[i + wby2 - 1 : i + wby2 + 1]] += 1
×
NEW
930
            counter_window[self.sequence[i - wby2 : i - wby2 + 2]] -= 1
×
931

NEW
932
            karling_window[i] = calculate_karlin_difference(counter_window, counter_seq, window)
×
933

934
        # handles the sides
NEW
935
        for i in range(0, wby2):
×
NEW
936
            karling_window[i] = karling_window[wby2]
×
NEW
937
            karling_window[N - i - 1] = karling_window[N - wby2 - 1]
×
938

NEW
939
        return karling_window
×
940

941

942
class RNA(Sequence):
4✔
943
    """Simple RNA class
944

945

946
    >>> from sequana.sequence import RNA
947
    >>> d = RNA("ACGUUUU")
948
    >>> d.reverse_complement()
949

950
    """
951

952
    def __init__(self, sequence):
4✔
953
        super(RNA, self).__init__(sequence, complement_in=b"ACGU", complement_out=b"UGCA", letters="ACGUN")
4✔
954

955

956
class Repeats:
4✔
957
    """Class for finding repeats in DNA or RNA linear sequences.
958

959
    Computation is performed each time the :attr:`threshold` is set
960
    to a new value.
961

962
    .. plot::
963
        :include-source:
964

965
        from sequana import sequana_data, Repeats
966
        rr = Repeats(sequana_data("measles.fa"))
967
        rr.threshold = 4
968
        rr.hist_length_repeats()
969

970
    .. note:: Works with shustring package from Bioconda (April 2017)
971
    .. todo:: use a specific sequence (first one by default). Others can be
972
        selected by name
973

974
    """
975

976
    def __init__(self, filename_fasta, merge=False, name=None):
4✔
977
        """.. rubric:: Constructor
978

979
        Input must be a fasta file with valid DNA or RNA characters
980

981
        :param str filename_fasta: a Fasta file, only the first
982
            sequence is used !
983
        :param int threshold: Minimal length of repeat to output
984
        :param str name: if name is provided, scan the Fasta file
985
            and select the corresponding sequence. if you want to
986
            analyse all sequences, you need to use a loop by setting
987
            _header for each sequence with the sequence name found in
988
            sequence header.
989

990

991
        .. note:: known problems. Header with a > character (e.g. in the
992
            comment) are left strip and only the comments is kept. Another issue
993
            is for multi-fasta where one sequence is ignored (last or first ?)
994

995
        """
996
        # used to check everything is fine with the header/name
997
        self._fasta = FastA(filename_fasta)
4✔
998

999
        # Define the attributes, and set the header if already provided
1000
        self._threshold = None
4✔
1001
        self._df_shustring = None
4✔
1002
        self._header = None
4✔
1003
        self._length = None
4✔
1004
        self._longest_shustring = None
4✔
1005
        self._begin_end_repeat_position = None
4✔
1006
        self._begin_end_repeat_position_merge = None
4✔
1007
        self._filename_fasta = filename_fasta
4✔
1008
        self._previous_thr = None
4✔
1009
        self._list_len_repeats = None
4✔
1010
        self._contig_names = None
4✔
1011
        if not isinstance(merge, bool):
4✔
1012
            raise TypeError("do_merge must be boolean")
×
1013
        self._do_merge = merge
4✔
1014
        if name is not None:
4✔
1015
            self.header = name
×
1016
        else:
1017
            self.header = self._fasta.names[0]
4✔
1018

1019
    def _get_header(self):
4✔
1020
        return self._header
4✔
1021

1022
    def _set_header(self, name):
4✔
1023
        if name not in self._fasta.names:
4✔
1024
            raise ValueError("invalid name. Use one of %s" % self._fasta.names)
×
1025
        self._header = name
4✔
1026
        self._df_shustring = None
4✔
1027

1028
    header = property(_get_header, _set_header)
4✔
1029

1030
    def _get_names(self):
4✔
1031
        if self._contig_names is None:
4✔
1032
            self._contig_names = self._fasta.names
4✔
1033
        return self._contig_names
4✔
1034

1035
    names = property(_get_names)
4✔
1036

1037
    def _get_shustrings_length(self):
4✔
1038
        """Return dataframe with shortest unique substring length at each position
1039
        shortest unique substrings are unique in the sequence and its complement
1040
        Uses shustring tool"""
1041
        if self._df_shustring is None:
4✔
1042
            # read fasta
1043
            task_read = subprocess.Popen(["cat", self._filename_fasta], stdout=subprocess.PIPE)
4✔
1044

1045
            # shustring command
1046
            # the -l option uses a regular expression
1047
            task_shus = subprocess.Popen(
4✔
1048
                # ["shustring", "-r", "-q", "-l", ">{}[\s,\n]*?".format(self.header)],
1049
                ["shustring", "-r", "-q", "-l", r">{}($|\s+)".format(self.header)],
1050
                stdin=task_read.stdout,
1051
                stdout=subprocess.PIPE,
1052
            )
1053

1054
            # read stdout line by line and append to list
1055
            list_df = []
4✔
1056
            for line in task_shus.stdout:
4✔
1057
                list_df.append(line.decode("utf8").replace("\n", "").split("\t"))
4✔
1058
                # df=pd.concat([df, line])
1059
            task_shus.wait()
4✔
1060

1061
            # convert to dataframe
1062
            df = pd.DataFrame(list_df[2 : len(list_df)])
4✔
1063
            self._df_shustring = df.astype(int)
4✔
1064
            self._df_shustring.columns = ["position", "shustring_length"]
4✔
1065

1066
            # get input sequence length and longest shustring in the first line
1067
            self._length = int(list_df[0][1])
4✔
1068
            self._longest_shustring = int(list_df[0][3].split("<=")[2])
4✔
1069
        return self._df_shustring
4✔
1070

1071
    df_shustring = property(_get_shustrings_length)
4✔
1072

1073
    def _get_genome_length(self):
4✔
1074
        if self._df_shustring is None:
4✔
1075
            self._get_shustrings_length()
×
1076
        return self._length
4✔
1077

1078
    length = property(_get_genome_length)
4✔
1079

1080
    def _get_longest_shustring(self):
4✔
1081
        if self._df_shustring is None:
4✔
1082
            self._get_shustrings_length()
×
1083
        return self._longest_shustring
4✔
1084

1085
    longest_shustring = property(_get_longest_shustring)
4✔
1086

1087
    def _find_begin_end_repeats(self, force=False):
4✔
1088
        """Returns position of repeats longer than threshold
1089
        as an ordered list
1090
        """
1091
        if self.df_shustring is None:
4✔
1092
            self._get_shustrings_length()
×
1093

1094
        if self._threshold is None:
4✔
1095
            # print("No threshold : please set minimul length of repeats to output")
1096
            raise ValueError("threshold : please set threshold (minimum length of repeats to output)")
×
1097

1098
        # if there is no result yet, or the threshold has changed
1099
        if (self._begin_end_repeat_position is None) | (self.threshold != self._previous_thr) | force:
4✔
1100
            nb_row = self.df_shustring.shape[0]
4✔
1101
            i = 0
4✔
1102
            step_repeat_seq = []
4✔
1103
            be_repeats = []
4✔
1104
            e = 0
4✔
1105

1106
            # use list because faster
1107
            list_len_shus = list(self.df_shustring.loc[:, "shustring_length"])
4✔
1108

1109
            while i < nb_row:
4✔
1110
                # begining of repeat
1111
                if list_len_shus[i] > self.threshold:
4✔
1112
                    b = i
4✔
1113
                    # compute new end of repeat
1114
                    len_repeat = list_len_shus[i]
4✔
1115
                    e = b + len_repeat
4✔
1116
                    # save (b,e)
1117
                    be_repeats.append((b, e))
4✔
1118
                    # update i
1119
                    i = e - 1
4✔
1120
                i += 1
4✔
1121

1122
            self._begin_end_repeat_position = be_repeats
4✔
1123

1124
        self._get_merge_repeats()
4✔
1125

1126
    def _get_be_repeats(self):
4✔
1127
        self._find_begin_end_repeats()
4✔
1128
        return self._begin_end_repeat_position
4✔
1129

1130
    begin_end_repeat_position = property(_get_be_repeats)
4✔
1131

1132
    def _set_threshold(self, value):
4✔
1133
        if value < 0:
4✔
1134
            raise ValueError("Threshold must be positive integer")
×
1135
        if value != self._threshold:
4✔
1136
            self._previous_thr = self._threshold
4✔
1137
        self._threshold = value
4✔
1138
        self._find_begin_end_repeats()
4✔
1139
        self._list_len_repeats = [tup[1] - tup[0] for tup in self._begin_end_repeat_position]
4✔
1140

1141
    def _get_threshold(self):
4✔
1142
        return self._threshold
4✔
1143

1144
    threshold = property(_get_threshold, _set_threshold)
4✔
1145

1146
    def _get_list_len_repeats(self):
4✔
1147
        if self._list_len_repeats is None:
4✔
1148
            raise UserWarning("Please set threshold (minimum length of repeats to output)")
×
1149
        return self._list_len_repeats
4✔
1150

1151
    list_len_repeats = property(_get_list_len_repeats)
4✔
1152

1153
    def _get_merge_repeats(self):
4✔
1154
        if self._do_merge:
4✔
1155
            # if there are repeats, merge repeats that are fragmented
1156
            if len(self._begin_end_repeat_position) > 0:
4✔
1157
                prev_tup = self._begin_end_repeat_position[0]
4✔
1158
                b = prev_tup[0]
4✔
1159
                begin_end_repeat_position_merge = []
4✔
1160
                for i in range(1, len(self._begin_end_repeat_position)):
4✔
1161
                    tup = self._begin_end_repeat_position[i]
4✔
1162

1163
                    if tup[0] == prev_tup[1]:
4✔
1164
                        # concat
1165
                        e = tup[1]
4✔
1166
                        prev_tup = tup
4✔
1167
                        if i == (len(self._begin_end_repeat_position) - 1):
4✔
1168
                            # last tup : append to result
1169
                            begin_end_repeat_position_merge.append((b, e))
×
1170

1171
                    else:
1172
                        # real end of repeat : append result and update b, e
1173
                        e = prev_tup[1]
4✔
1174
                        begin_end_repeat_position_merge.append((b, e))
4✔
1175
                        prev_tup = tup
4✔
1176
                        b = prev_tup[0]
4✔
1177
                        if i == (len(self._begin_end_repeat_position) - 1):
4✔
1178
                            # last tup : append to result
1179
                            begin_end_repeat_position_merge.append(tup)
4✔
1180

1181
                self._begin_end_repeat_position = begin_end_repeat_position_merge
4✔
1182

1183
    def _get_do_merge(self):
4✔
1184
        return self._do_merge
×
1185

1186
    def _set_do_merge(self, do_merge):
4✔
1187
        if not isinstance(do_merge, bool):
4✔
1188
            raise TypeError("do_merge must be boolean")
×
1189
        # if different
1190
        if do_merge != self._do_merge:
4✔
1191
            self._do_merge = do_merge
4✔
1192
            if self._do_merge:
4✔
1193
                # did not merge before, merge now
1194
                if self._begin_end_repeat_position is None:
4✔
1195
                    self._find_begin_end_repeats()
×
1196
            else:
1197
                # data is already merged : need to compute again to un-merge
1198
                self._find_begin_end_repeats(force=True)
4✔
1199

1200
    do_merge = property(_get_do_merge, _set_do_merge)
4✔
1201

1202
    def hist_length_repeats(
4✔
1203
        self,
1204
        bins=20,
1205
        alpha=0.5,
1206
        hold=False,
1207
        fontsize=12,
1208
        grid=True,
1209
        title="Repeat length",
1210
        xlabel="Repeat length",
1211
        ylabel="#",
1212
        logy=True,
1213
    ):
1214
        """Plots histogram of the repeat lengths"""
1215
        # check that user has set a threshold
1216
        if hold is False:
4✔
1217
            pylab.clf()
4✔
1218
        pylab.hist(self.list_len_repeats, alpha=alpha, bins=bins)
4✔
1219
        pylab.title(title)
4✔
1220
        pylab.xlabel(xlabel, fontsize=fontsize)
4✔
1221
        pylab.ylabel(ylabel, fontsize=fontsize)
4✔
1222
        if grid is True:
4✔
1223
            pylab.grid(True)
4✔
1224
        if logy:
4✔
1225
            pylab.semilogy()
4✔
1226

1227
    def plot(self, clf=True, fontsize=12):
4✔
1228
        if clf:
4✔
1229
            pylab.clf()
4✔
1230
        pylab.grid(True)
4✔
1231
        pylab.plot(self.df_shustring.shustring_length)
4✔
1232
        pylab.xlabel("Position (bp)", fontsize=fontsize)
4✔
1233
        pylab.ylabel("Repeat lengths (bp)", fontsize=fontsize)
4✔
1234
        pylab.ylim(bottom=0)
4✔
1235

1236
    def to_wig(self, filename, step=1000):
4✔
1237
        """export repeats into WIG format to import in IGV"""
1238
        assert self.threshold and self.df_shustring is not None
4✔
1239

1240
        from numpy import arange
4✔
1241

1242
        N = len(self.df_shustring)
4✔
1243
        with open(filename, "w") as fout:
4✔
1244
            fout.write(
4✔
1245
                'track type=wiggle_0 name="Repeat Density" visibility=full fixedStep chrom=1 start=0 step=100 span=100'
1246
            )
1247
            for i in arange(step / 2, N, step):
4✔
1248
                start = int(i - step / 2)
4✔
1249
                stop = int(i + step / 2)
4✔
1250

1251
                M = self.df_shustring.query("position>=@start and position<=@stop").shustring_length.max()
4✔
1252
                if start == 0:
4✔
1253
                    start = 1
4✔
1254
                fout.write(f"{self.header}\t{start}\t{stop}\t{M}\n")
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc