• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cokelaer / sequana / 7117302237

06 Dec 2023 04:23PM UTC coverage: 75.482% (+1.8%) from 73.729%
7117302237

push

github

cokelaer
Update version

13709 of 18162 relevant lines covered (75.48%)

2.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.78
/sequana/sequence.py
1
#  This file is part of Sequana software
2
#
3
#  Copyright (c) 2018-2022 - Sequana Development Team
4
#
5
#  Distributed under the terms of the 3-clause BSD license.
6
#  The full license is in the LICENSE file, distributed with this software.
7
#
8
#  website: https://github.com/sequana/sequana
9
#  documentation: http://sequana.readthedocs.io
10
#
11
##############################################################################
12
import re
3✔
13
import string
3✔
14
import subprocess
3✔
15
from collections import Counter, deque
3✔
16

17
import colorlog
3✔
18
from easydev import do_profile
3✔
19

20
from sequana.fasta import FastA
3✔
21
from sequana.lazy import numpy as np
3✔
22
from sequana.lazy import pandas as pd
3✔
23
from sequana.lazy import pylab
3✔
24

25
logger = colorlog.getLogger(__name__)
3✔
26

27

28
__all__ = ["DNA", "RNA", "Repeats", "Sequence"]
3✔
29

30

31
class Sequence(object):
3✔
32
    """Abstract base classe for other specialised sequences such as DNA.
33

34

35
    Sequenced is the base class for other classes such as :class:`DNA` and
36
    :class:`RNA`.
37

38
    .. doctest::
39

40
        from sequana import Sequence
41
        s = Sequence("ACGT")
42
        s.stats()
43
        s.get_complement()
44

45
    .. note:: You may use a Fasta file as input (see constructor)
46

47

48
    """
49

50
    def __init__(self, sequence, complement_in=b"ACGT", complement_out=b"TGCA", letters="ACGT"):
3✔
51
        """.. rubric:: Constructor
52

53
        A sequence is just a string stored in the :attr:`sequence` attribute. It
54
        has properties related to the type of alphabet authorised.
55

56
        :param str sequence: May be a string of a Fasta File, in which case only
57
            the first sequence is used.
58
        :param complement_in:
59
        :param complement_out:
60
        :param letters: authorise letters. Used in :meth:`check` only.
61

62
        .. todo:: use counter only once as a property
63

64
        """
65
        if sequence.endswith(".fa") or sequence.endswith(".fasta"):
3✔
66
            self.fasta = FastA(sequence)
3✔
67
            sequence = self.fasta.next().sequence.upper()
3✔
68
        else:  # assume correct string sequence
69
            pass
1✔
70

71
        self._data = sequence
3✔
72
        try:
3✔
73
            self._translate = string.maketrans(complement_in, complement_out)
3✔
74
        except:
3✔
75
            self._translate = bytes.maketrans(complement_in, complement_out)
3✔
76
        self._letters = letters
3✔
77

78
    def __iter__(self):
3✔
79
        return self
3✔
80

81
    def __next__(self):
3✔
82
        self._data = self.fasta.next().sequence.upper()
3✔
83
        return self._data
×
84

85
    def _get_sequence(self):
3✔
86
        return self._data
3✔
87

88
    sequence = property(_get_sequence)
3✔
89

90
    def get_complement(self):
3✔
91
        """Return complement"""
92
        return self._data.translate(self._translate)
3✔
93

94
    def get_reverse_complement(self):
3✔
95
        """Return reverse complement"""
96
        return self.get_complement()[::-1]
3✔
97

98
    def get_reverse(self):
3✔
99
        """Return reverse sequence"""
100
        return self._data[::-1]
3✔
101

102
    def complement(self):
3✔
103
        """Alias to :meth:`get_complement`"""
104
        self._data = self.get_complement()
3✔
105

106
    def reverse(self):
3✔
107
        """Alias to :meth:`get_reverse`"""
108
        self._data = self.get_reverse()
3✔
109

110
    def reverse_complement(self):
3✔
111
        """Alias to get_reverse_complement"""
112
        self._data = self.get_reverse_complement()
3✔
113

114
    def check(self):
3✔
115
        """Check that all letters are valid"""
116
        counter = Counter(self._data).keys()
3✔
117
        for key in counter:
3✔
118
            if key not in self._letters:
3✔
119
                raise ValueError("Found unexpected letter in the sequence (%s)" % key)
3✔
120

121
    def __len__(self):
3✔
122
        return len(self._data)
3✔
123

124
    def gc_content(self):
3✔
125
        """Return mean GC content"""
126
        c = Counter(self._data)
3✔
127
        ratio = (c["G"] + c["C"]) / len(self.sequence)
3✔
128
        return ratio
3✔
129

130
    def stats(self):
3✔
131
        """Return basic stats about the number of letters"""
132
        from collections import Counter
3✔
133

134
        return Counter(self.sequence)
3✔
135

136
    def get_occurences(self, pattern, overlap=False):
3✔
137
        """Return position of the input pattern in the sequence
138

139
        ::
140

141
            >>> from sequana import Sequence
142
            >>> s = Sequence('ACGTTTTACGT')
143
            >>> s.get_occurences("ACGT")
144
            [0, 7]
145

146
        """
147
        if overlap is False:
3✔
148
            res = [m.start() for m in re.finditer(pattern, self.sequence)]
3✔
149
        elif overlap is True:
3✔
150
            res = [m.start() for m in re.finditer("(?=%s)" % pattern, self.sequence)]
3✔
151
        return res
3✔
152

153
        # reverse find-all without overlaps, you can combine positive and
154
        # negative lookahead into an expression like this:
155
        # res = [m.start() for m in re.finditer('(?=%s)(?!.{1,%d}%s)' % (search,
156
        #    len(pattern)-1, pattern), 'ttt')]
157

158

159
class DNA(Sequence):
3✔
160
    """Simple DNA class
161

162
    ::
163

164
        >>> from sequana.sequence import DNA
165
        >>> d = DNA("ACGTTTT")
166
        >>> d.reverse_complement()
167

168
    Some long computations are done when setting the window size::
169

170
        d.window = 100
171

172
    The ORF detection has been validated agains a plasmodium 3D7 ORF file
173
    found on plasmodb.org across the 14 chromosomes.
174

175
    """
176

177
    def __init__(
3✔
178
        self,
179
        sequence,
180
        codons_stop=["TAA", "TGA", "TAG"],
181
        codons_stop_rev=["TTA", "TCA", "CTA"],
182
        codons_start=["ATG"],
183
        codons_start_rev=["CAT"],
184
    ):
185
        super(DNA, self).__init__(sequence, complement_in=b"ACGT", complement_out=b"TGCA", letters="ACGTN")
3✔
186

187
        self._window = None
3✔
188
        self._type_window = None
3✔
189
        self._slide_window = None
3✔
190
        self._seq_right = None
3✔
191
        self._dict_nuc = dict_nuc = {"A": 0, "T": 1, "G": 2, "C": 3}
3✔
192
        self._cumul = None
3✔
193
        self._Xn = None
3✔
194
        self._Yn = None
3✔
195
        self._Zn = None
3✔
196
        self._ignored_nuc = None
3✔
197
        self._AT_skew_slide = None
3✔
198
        self._GC_skew_slide = None
3✔
199
        self._GC_content_slide = None
3✔
200
        self._AT_content_slide = None
3✔
201
        self._template_fft = None
3✔
202
        self._c_fft = None
3✔
203
        self._codons_stop = codons_stop
3✔
204
        self._codons_stop_rev = codons_stop_rev
3✔
205
        self._codons_start = codons_start
3✔
206
        self._codons_start_rev = codons_start_rev
3✔
207
        self._ORF_pos = None
3✔
208
        self._threshold = None
3✔
209
        self._type_filter = None
3✔
210

211
    def _get_window(self):
3✔
212
        return self._window
3✔
213

214
    def _set_window(self, window):
3✔
215
        if (window > 0) & (window < 1):
3✔
216
            self._type_window = "adapted to genome length : %.1f %% of total length" % (window * 100)
3✔
217
            self._window = int(round(self.__len__() * window))
3✔
218

219
        elif (window >= 1) & (window <= self.__len__()):
3✔
220
            self._type_window = "fixed window length : %d" % (window)
3✔
221
            self._window = int(window)
3✔
222
        else:
223
            raise ValueError(
×
224
                "Incorrect value for window: choose either float ]0,1]"
225
                + " (fraction of genome) or integer [1,genome_length] (window size)"
226
            )
227

228
        logger.info("Computing GC skew")
3✔
229
        self._compute_skews()
3✔
230

231
    window = property(_get_window, _set_window)
3✔
232

233
    def _get_type_window(self):
3✔
234
        return self._type_window
3✔
235

236
    type_window = property(_get_type_window)
3✔
237

238
    def _init_sliding_window(self):
3✔
239
        """slide_window : deque of n first nucleotides (size of window)
240
        seq_right : deque of the rest of the genome, with the n-1 nucleotides at the end (circular DNA)
241
        """
242
        # split sequence_reference in two : sliding window and seq_right
243
        # for circular genome : add begining at the end of genome
244
        self._slide_window = deque(self.sequence[0 : self._window])
3✔
245
        self._seq_right = deque(self.sequence[self._window : self.__len__()] + self.sequence[0 : (self._window - 1)])
3✔
246

247
    def _init_list_results(self):
3✔
248
        # init IJ content and IJ skew
249
        IJ_content_res = np.empty((1, self.__len__()))
3✔
250
        IJ_content_res[:] = np.NAN
3✔
251
        IJ_skew_res = np.empty((1, self.__len__()))
3✔
252
        IJ_skew_res[:] = np.NAN
3✔
253
        return IJ_content_res, IJ_skew_res
3✔
254

255
    def _init_cumul_nuc(self):
3✔
256
        """Cumulative of nucleotide count along genome (init from first window)"""
257
        # ATGC (index stored in self._dict_nuc)
258
        cumul = np.zeros((4, (self.__len__() + self._window)))
3✔
259
        for j in range(self._window):
3✔
260
            nuc = self.sequence[j]
3✔
261
            if nuc in self._dict_nuc:
3✔
262
                cumul[self._dict_nuc[nuc]][j] += 1
3✔
263
        self._cumul = cumul
3✔
264

265
    # @do_profile()
266
    def _compute_skews(self):
3✔
267
        ### initialisation =  Calculating GC skew and AT skew for first window
268
        self._init_sliding_window()
3✔
269
        GC_content_slide, GC_skew_slide = self._init_list_results()
3✔
270
        AT_content_slide, AT_skew_slide = self._init_list_results()
3✔
271
        self._init_cumul_nuc()
3✔
272

273
        c = Counter(self._slide_window)
3✔
274
        dict_counts = {"G": c["G"], "C": c["C"], "A": c["A"], "T": c["T"]}
3✔
275
        i = 0
3✔
276

277
        # GC
278
        sumGC = float(dict_counts["G"] + dict_counts["C"])
3✔
279
        GC_content_slide[0][i] = sumGC
3✔
280
        if sumGC > 0:
3✔
281
            GC_skew_slide[0][i] = (dict_counts["G"] - dict_counts["C"]) / sumGC
3✔
282
        # AT
283
        sumAT = float(dict_counts["A"] + dict_counts["T"])
3✔
284
        AT_content_slide[0][i] = sumAT
3✔
285
        if sumAT > 0:
3✔
286
            AT_skew_slide[0][i] = (dict_counts["A"] - dict_counts["T"]) / sumAT
3✔
287

288
        ### Compute for all genome
289
        while self._seq_right:
3✔
290
            out_nuc = self._slide_window.popleft()
3✔
291
            in_nuc = self._seq_right.popleft()
3✔
292
            self._slide_window.append(in_nuc)
3✔
293

294
            i += 1
3✔
295

296
            if i % 500000 == 0:  # pragma: no cover
297
                logger.info("%d / %d" % (i, self.__len__()))
298
            # if in and out are the same : do nothing, append same result
299
            if out_nuc != in_nuc:
3✔
300
                # remove out from counters
301
                if out_nuc in self._dict_nuc:
3✔
302
                    dict_counts[out_nuc] -= 1
3✔
303
                if in_nuc in self._dict_nuc:
3✔
304
                    dict_counts[in_nuc] += 1
3✔
305
                sumGC = float(dict_counts["G"] + dict_counts["C"])
3✔
306
                sumAT = float(dict_counts["A"] + dict_counts["T"])
3✔
307

308
            # fill results
309
            # GC
310
            GC_content_slide[0][i] = sumGC
3✔
311
            if sumGC > 0:
3✔
312
                GC_skew_slide[0][i] = (dict_counts["G"] - dict_counts["C"]) / sumGC
3✔
313
            # AT
314
            AT_content_slide[0][i] = sumAT
3✔
315
            if sumAT > 0:
3✔
316
                AT_skew_slide[0][i] = (dict_counts["A"] - dict_counts["T"]) / sumAT
3✔
317
            # cumul
318
            if in_nuc in self._dict_nuc:
3✔
319
                self._cumul[self._dict_nuc[in_nuc]][i + self._window - 1] += 1
3✔
320

321
        self._GC_content_slide = GC_content_slide / float(self._window)
3✔
322
        self._AT_content_slide = AT_content_slide / float(self._window)
3✔
323
        self._cumul = np.delete(self._cumul, range(self.__len__(), self._cumul.shape[1]), 1)
3✔
324
        self._cumul = np.cumsum(self._cumul, axis=1)
3✔
325

326
        ### save result for Z curve
327
        self._Xn = list(
3✔
328
            (self._cumul[self._dict_nuc["A"]] + self._cumul[self._dict_nuc["G"]])
329
            - (self._cumul[self._dict_nuc["C"]] + self._cumul[self._dict_nuc["T"]])
330
        )
331

332
        self._Yn = list(
3✔
333
            (self._cumul[self._dict_nuc["A"]] + self._cumul[self._dict_nuc["C"]])
334
            - (self._cumul[self._dict_nuc["G"]] + self._cumul[self._dict_nuc["T"]])
335
        )
336

337
        self._Zn = list(
3✔
338
            (self._cumul[self._dict_nuc["A"]] + self._cumul[self._dict_nuc["T"]])
339
            - (self._cumul[self._dict_nuc["C"]] + self._cumul[self._dict_nuc["G"]])
340
        )
341

342
        self._AT_skew_slide = AT_skew_slide
3✔
343
        self._GC_skew_slide = GC_skew_slide
3✔
344

345
        ### check proportion of ignored nucleotides
346
        GC_content_total = (self._cumul[self._dict_nuc["G"]][-1] + self._cumul[self._dict_nuc["C"]][-1]) / float(
3✔
347
            self.__len__()
348
        )
349
        AT_content_total = (self._cumul[self._dict_nuc["A"]][-1] + self._cumul[self._dict_nuc["T"]][-1]) / float(
3✔
350
            self.__len__()
351
        )
352
        self._ignored_nuc = 1.0 - GC_content_total - AT_content_total
3✔
353

354
    def _get_AT_skew(self):
3✔
355
        if self._AT_skew_slide is None:  # pragma: no cover
356
            raise AttributeError("Please set a valid window to compute skew")
357
        else:
358
            return self._AT_skew_slide
3✔
359

360
    AT_skew = property(_get_AT_skew)
3✔
361

362
    def _get_GC_skew(self):
3✔
363
        if self._GC_skew_slide is None:  # pragma: no cover
364
            raise AttributeError("Please set a valid window to compute skew")
365
        else:
366
            return self._GC_skew_slide
3✔
367

368
    GC_skew = property(_get_GC_skew)
3✔
369

370
    def _create_template_fft(self, M=1000):  # pragma: no cover
371
        M_3 = int(M / 3)
372
        W = [-0.5] * M_3 + list(np.linspace(-0.5, 0.5, M - 2 * M_3)) + [0.5] * M_3
373
        return list(W * np.hanning(M))
374

375
    def _myfft_gc_skew(self, M):  # pragma: no cover
376
        """
377
        x : GC_skew vector (list)
378
        param N: length of the GC skew vector
379
        param M: length of the template
380
        param A: amplitude between positive and negative GC skew vector
381

382
        """
383
        x = self._GC_skew_slide[0]
384

385
        N = len(x)
386
        template = self._create_template_fft(M) + [0] * (N - M)
387
        template /= pylab.norm(template)
388
        c = np.fft.fft(x)
389
        c = (
390
            abs(np.fft.ifft(np.fft.fft(x) * pylab.conj(np.fft.fft(template))) ** 2)
391
            / pylab.norm(x)
392
            / pylab.norm(template)
393
        )
394
        # shift the SNR vector by the template length so that the peak is at the END of the template
395
        c = np.roll(c, M // 2)
396
        self._template_fft = template
397
        self._c_fft = c * 2.0 / N
398

399
    def plot_all_skews(self, figsize=(10, 12), fontsize=16, alpha=0.5):
3✔
400
        if self._window is None:  # pragma: no cover
401
            raise AttributeError("Please set a valid window to compute skew")
402

403
        # create figure
404
        fig, axarr = pylab.subplots(9, 1, sharex=True, figsize=figsize)
3✔
405

406
        main_title = (
3✔
407
            "Window size = %d (%.0f %% of genome )\n\
408
        GC content = %.0f %%, AT content = %.0f %%, ignored = %.0f %%"
409
            % (
410
                self._window,
411
                self._window * 100 / self.__len__(),
412
                self.gc_content() * 100,
413
                (1 - self.gc_content()) * 100,
414
                self._ignored_nuc * 100,
415
            )
416
        )
417

418
        pylab.suptitle(main_title, fontsize=fontsize)
3✔
419

420
        # GC skew
421
        axarr[0].set_title("GC skew (blue) - Cumulative sum (red)")
3✔
422
        axarr[0].plot(list(self._GC_skew_slide[0]), "b-", alpha=alpha)
3✔
423
        axarr[0].set_ylabel("(G -C) / (G + C)")
3✔
424

425
        axarr[1].plot(list(np.cumsum(self._GC_skew_slide[0])), "r-", alpha=alpha)
3✔
426
        axarr[1].set_ylabel("(G -C) / (G + C)")
3✔
427

428
        # AT skew
429
        axarr[2].set_title("AT skew (blue) - Cumulative sum (red)")
3✔
430
        axarr[2].plot(list(self._AT_skew_slide[0]), "b-", alpha=alpha)
3✔
431
        axarr[2].set_ylabel("(A -T) / (A + T)")
3✔
432

433
        axarr[3].plot(list(np.cumsum(self._AT_skew_slide[0])), "r-", alpha=alpha)
3✔
434
        axarr[3].set_ylabel("(A -T) / (A + T)", rotation=0)
3✔
435

436
        # Xn
437
        axarr[4].set_title("Cumulative RY skew (Purine - Pyrimidine)")
3✔
438
        axarr[4].plot(self._Xn, "g-", alpha=alpha)
3✔
439
        axarr[4].set_ylabel("(A + G) - (C + T)")
3✔
440

441
        # Yn
442
        axarr[5].set_title("Cumulative MK skew (Amino - Keto)")
3✔
443
        axarr[5].plot(self._Yn, "g-", alpha=alpha)
3✔
444
        axarr[5].set_ylabel("(A + C) - (G + T)")
3✔
445

446
        # Zn
447
        axarr[6].set_title("Cumulative H-bond skew (Weak H-bond - Strong H-bond)")
3✔
448
        axarr[6].plot(self._Zn, "g-", alpha=alpha)
3✔
449
        axarr[6].set_ylabel("(A + T) - (G + C)")
3✔
450

451
        # GC content
452
        axarr[7].set_title("GC content")
3✔
453
        axarr[7].plot(list(self._GC_content_slide[0]), "k-", alpha=alpha)
3✔
454
        axarr[7].set_ylabel("GC")
3✔
455

456
        # AT content
457
        axarr[8].set_title("AT content")
3✔
458
        axarr[8].plot(list(self._AT_content_slide[0]), "k-", alpha=alpha)
3✔
459
        axarr[8].set_ylabel("AT")
3✔
460

461
        # # FFT
462
        # axarr[9].set_title("FFT")
463
        # axarr[9].plot(list(self._c_fft),'g-',alpha=alpha)
464
        # axarr[9].set_ylabel("FFT")
465

466
        fig.tight_layout()
3✔
467
        fig.subplots_adjust(top=0.88)
3✔
468

469
    def _update_ORF_frame(self, i, nuc, j, frame, d_vars):
3✔
470
        d_vars["codon"][j] = d_vars["codon"][j] + nuc
3✔
471

472
        if len(d_vars["codon"][j]) == 3:
3✔
473
            # test for start forward (if none already found)
474
            is_start = d_vars["codon"][j] in self._codons_start
3✔
475
            if is_start & np.isnan(d_vars["pos_ATG_f"][j]):
3✔
476
                d_vars["pos_ATG_f"][j] = i - 2
3✔
477
            # test for stop forward
478
            is_stop = d_vars["codon"][j] in self._codons_stop
3✔
479
            if is_stop:
3✔
480
                d_vars["end_f"][j] = i
3✔
481
                # test is length of CDS or ORF found is enough to be in output
482
                if self._type_filter == "CDS":
3✔
483
                    len_to_filter = d_vars["end_f"][j] - d_vars["pos_ATG_f"][j]  # len_CDS
×
484
                else:
485
                    len_to_filter = d_vars["end_f"][j] - d_vars["begin_f"][j]  # len_ORF
3✔
486

487
                if len_to_filter > self._threshold:
3✔
488
                    len_ORF = d_vars["end_f"][j] - d_vars["begin_f"][j]
3✔
489
                    len_CDS = d_vars["end_f"][j] - d_vars["pos_ATG_f"][j]
3✔
490
                    self._ORF_pos.append(
3✔
491
                        [
492
                            d_vars["begin_f"][j],
493
                            d_vars["end_f"][j],
494
                            frame + 1,
495
                            d_vars["pos_ATG_f"][j],
496
                            len_ORF,
497
                            len_CDS,
498
                        ]
499
                    )
500
                d_vars["begin_f"][j] = i + 1
3✔
501
                d_vars["pos_ATG_f"][j] = np.nan
3✔
502

503
            # test for start reverse
504
            is_start_rev = d_vars["codon"][j] in self._codons_start_rev
3✔
505
            if is_start_rev:
3✔
506
                d_vars["pos_ATG_r"][j] = i
3✔
507
            # test for stop reverse
508
            is_stop_rev = d_vars["codon"][j] in self._codons_stop_rev
3✔
509
            if is_stop_rev:
3✔
510
                d_vars["end_r"][j] = i - 3
3✔
511
                # test is length of CDS or ORF found is enough to be in output
512
                if self._type_filter == "CDS":
3✔
513
                    len_to_filter = d_vars["pos_ATG_r"][j] - d_vars["begin_r"][j]  # len_CDS
×
514
                else:
515
                    len_to_filter = d_vars["end_r"][j] - d_vars["begin_r"][j]  # len_ORF
3✔
516

517
                if len_to_filter > self._threshold:
3✔
518
                    len_ORF = d_vars["end_r"][j] - d_vars["begin_r"][j]
3✔
519
                    len_CDS = d_vars["pos_ATG_r"][j] - d_vars["begin_r"][j]
3✔
520
                    self._ORF_pos.append(
3✔
521
                        [
522
                            d_vars["begin_r"][j],
523
                            d_vars["end_r"][j],
524
                            -(frame + 1),
525
                            d_vars["pos_ATG_r"][j],
526
                            len_ORF,
527
                            len_CDS,
528
                        ]
529
                    )
530
                d_vars["begin_r"][j] = i - 3 + 1
3✔
531
                d_vars["pos_ATG_r"][j] = np.nan
3✔
532

533
            # reset codon
534
            d_vars["codon"][j] = ""
3✔
535

536
        return d_vars
3✔
537

538
    def _find_ORF_CDS(self):
3✔
539
        """Function for finding ORF and CDS in both strands of DNA"""
540
        # init variables
541
        d_vars = {
3✔
542
            "codon": ["", "-", "--"],
543
            "begin_f": [0] * 3,
544
            "begin_r": [0] * 3,
545
            "end_f": [0] * 3,
546
            "end_r": [0] * 3,
547
            "pos_ATG_f": [np.nan] * 3,
548
            "pos_ATG_r": [np.nan] * 3,
549
        }
550
        print("Finding ORF and CDS")
3✔
551
        # result list
552
        self._ORF_pos = []
3✔
553

554
        if self._threshold is None:
3✔
555
            self._threshold = 0
3✔
556

557
        if self._type_filter is None:
3✔
558
            self._type_filter = "ORF"
3✔
559

560
        # search ORF and CDS
561
        for i, nuc in enumerate(self.sequence):
3✔
562
            # print(i, nuc)
563
            frame = (i + 3 - 2) % 3
3✔
564
            # if (i % 200000)==0:
565
            #     print("%d / %d" %(i, len_genome))
566
            for j in range(3):
3✔
567
                d_vars = self._update_ORF_frame(i, nuc, j, frame, d_vars)
3✔
568

569
        # convert result to dataframe
570
        self._ORF_pos = pd.DataFrame(self._ORF_pos)
3✔
571
        self._ORF_pos.columns = [
3✔
572
            "begin_pos",
573
            "end_pos",
574
            "frame",
575
            "pos_ATG",
576
            "len_ORF",
577
            "len_CDS",
578
        ]
579

580
    def _get_ORF_pos(self):
3✔
581
        if self._ORF_pos is None:
3✔
582
            self._find_ORF_CDS()
3✔
583
        return self._ORF_pos
3✔
584

585
    ORF_pos = property(_get_ORF_pos)
3✔
586

587
    def _get_threshold(self):
3✔
588
        return self._threshold
×
589

590
    def _set_threshold(self, value):
3✔
591
        if value < 0:
3✔
592
            raise ValueError("Threshold cannot be negative")
×
593
        if (value < self._threshold) | (self._threshold is None):
3✔
594
            # need to compute again the result
595
            self._threshold = value
3✔
596
            self._find_ORF_CDS()
3✔
597
        elif value > self._threshold:
3✔
598
            # do not compute result again : filter existing df
599
            self._threshold = value
3✔
600
            if self._type_filter == "ORF":
3✔
601
                self._ORF_pos = self._ORF_pos[self._ORF_pos["len_ORF"] > self._threshold]
×
602
            elif self._type_filter == "CDS":
3✔
603
                self._ORF_pos = self._ORF_pos[self._ORF_pos["len_CDS"] > self._threshold]
3✔
604

605
    threshold = property(_get_threshold, _set_threshold)
3✔
606

607
    def _get_type_filter(self):
3✔
608
        return self._type_filter
×
609

610
    def _set_type_filter(self, value):
3✔
611
        if value not in ["ORF", "CDS"]:
3✔
612
            raise ValueError("Please select valid type of filter : ORF (default), CDS")
×
613
        if self._ORF_pos is None:
3✔
614
            self._type_filter = value
×
615
        elif self._type_filter != value:
3✔
616
            self._type_filter = value
3✔
617
            if value == "ORF":
3✔
618
                # need to compute again the result
619
                self._find_ORF_CDS()
3✔
620
            else:
621
                # need to filter the result by CDS length
622
                self._ORF_pos = self._ORF_pos[self._ORF_pos["len_CDS"] > self._threshold]
3✔
623

624
    type_filter = property(_get_type_filter, _set_type_filter)
3✔
625

626
    def hist_ORF_CDS_linearscale(self, alpha=0.5, bins=40, xlabel="Length", ylabel="#"):
3✔
627
        if self._ORF_pos is None:
3✔
628
            self._find_ORF_CDS()
×
629

630
        n_ORF = self._ORF_pos["len_ORF"].dropna().shape[0]
3✔
631
        n_CDS = self._ORF_pos["len_CDS"].dropna().shape[0]
3✔
632

633
        # plot for all ORF and CDS
634
        pylab.hist(
3✔
635
            self._ORF_pos["len_ORF"].dropna(),
636
            alpha=alpha,
637
            label="ORF, N = " + str(n_ORF),
638
            bins=bins,
639
        )
640
        pylab.hist(
3✔
641
            self._ORF_pos["len_CDS"].dropna(),
642
            alpha=alpha,
643
            label="CDS, N = " + str(n_CDS),
644
            bins=bins,
645
        )
646
        pylab.xlabel(xlabel)
3✔
647
        pylab.ylabel(ylabel)
3✔
648
        pylab.legend()
3✔
649
        pylab.title("Length of ORF and CDS (after filter %s > %d)" % (self._type_filter, self._threshold))
3✔
650

651
    def hist_ORF_CDS_logscale(self, alpha=0.5, bins=40, xlabel="Length", ylabel="#"):
3✔
652
        self.hist_ORF_CDS_linearscale(alpha, bins, xlabel, ylabel)
×
653
        pylab.yscale("log")
×
654

655
    def barplot_count_ORF_CDS_by_frame(self, alpha=0.5, bins=40, xlabel="Frame", ylabel="#", bar_width=0.35):
3✔
656
        if self._ORF_pos is None:
3✔
657
            self._find_ORF_CDS()
×
658
        # number of ORF and CDS found by frame
659
        frames = [-3, -2, -1, 1, 2, 3]
3✔
660
        nb_res_ORF = []
3✔
661
        nb_res_CDS = []
3✔
662
        for fr in frames:
3✔
663
            nb_res_ORF.append(self._ORF_pos[self._ORF_pos["frame"] == fr]["len_ORF"].dropna().shape[0])
3✔
664
            nb_res_CDS.append(self._ORF_pos[self._ORF_pos["frame"] == fr]["len_CDS"].dropna().shape[0])
3✔
665

666
        pylab.bar(
3✔
667
            np.array(frames) - (bar_width / 2),
668
            nb_res_ORF,
669
            bar_width,
670
            alpha=alpha,
671
            label="ORF N = %d" % sum(nb_res_ORF),
672
        )
673
        pylab.bar(
3✔
674
            np.array(frames) + (bar_width / 2),
675
            nb_res_CDS,
676
            bar_width,
677
            alpha=alpha,
678
            label="CDS N = %d" % sum(nb_res_CDS),
679
        )
680
        pylab.xlabel(xlabel)
3✔
681
        pylab.ylabel(ylabel)
3✔
682
        pylab.legend(loc=1)
3✔
683
        pylab.title("Number of ORF and CDS by frame")
3✔
684

685
    def entropy(self, sequence):
3✔
686
        # https://www.sciencedirect.com/science/article/pii/S0022519397904938?via%3Dihub
687
        pi = [sequence.count(l) / float(len(sequence)) for l in "ACGT"]
×
688
        pi = [x for x in pi if x != 0]
×
689
        return -sum(pi * log(pi))
×
690

691

692
class RNA(Sequence):
3✔
693
    """Simple RNA class
694

695

696
    >>> from sequana.sequence import RNA
697
    >>> d = RNA("ACGUUUU")
698
    >>> d.reverse_complement()
699

700
    """
701

702
    def __init__(self, sequence):
3✔
703
        super(RNA, self).__init__(sequence, complement_in=b"ACGU", complement_out=b"UGCA", letters="ACGUN")
3✔
704

705

706
class Repeats(object):
3✔
707
    """Class for finding repeats in DNA or RNA linear sequences.
708

709
    Computation is performed each time the :attr:`threshold` is set
710
    to a new value.
711

712
    .. plot::
713
        :include-source:
714

715
        from sequana import sequana_data, Repeats
716
        rr = Repeats(sequana_data("measles.fa"))
717
        rr.threshold = 4
718
        rr.hist_length_repeats()
719

720
    .. note:: Works with shustring package from Bioconda (April 2017)
721
    .. todo:: use a specific sequence (first one by default). Others can be
722
        selected by name
723

724
    """
725

726
    def __init__(self, filename_fasta, merge=False, name=None):
3✔
727
        """.. rubric:: Constructor
728

729
        Input must be a fasta file with valid DNA or RNA characters
730

731
        :param str filename_fasta: a Fasta file, only the first
732
            sequence is used !
733
        :param int threshold: Minimal length of repeat to output
734
        :param str name: if name is provided, scan the Fasta file
735
            and select the corresponding sequence. if you want to
736
            analyse all sequences, you need to use a loop by setting
737
            _header for each sequence with the sequence name found in
738
            sequence header.
739

740

741
        .. note:: known problems. Header with a > character (e.g. in the
742
            comment) are left strip and only the comments is kept. Another issue
743
            is for multi-fasta where one sequence is ignored (last or first ?)
744

745
        """
746
        # used to check everything is fine with the header/name
747
        self._fasta = FastA(filename_fasta)
3✔
748

749
        # Define the attributes, and set the header if already provided
750
        self._threshold = None
3✔
751
        self._df_shustring = None
3✔
752
        self._header = None
3✔
753
        self._length = None
3✔
754
        self._longest_shustring = None
3✔
755
        self._begin_end_repeat_position = None
3✔
756
        self._begin_end_repeat_position_merge = None
3✔
757
        self._filename_fasta = filename_fasta
3✔
758
        self._previous_thr = None
3✔
759
        self._list_len_repeats = None
3✔
760
        self._contig_names = None
3✔
761
        if not isinstance(merge, bool):
3✔
762
            raise TypeError("do_merge must be boolean")
×
763
        self._do_merge = merge
3✔
764
        if name is not None:
3✔
765
            self.header = name
×
766
        else:
767
            self.header = self._fasta.names[0]
3✔
768

769
    def _get_header(self):
3✔
770
        """get first line of fasta (needed in input shustring)
771
        and replace spaces by underscores
772
        """
773
        """if self._header is None:
1✔
774
            # -n is to specify the DNA nature of the sequence
775
            first_line = subprocess.check_output(["head", "-n", "1", self._filename_fasta])
776
            first_line = first_line.decode('utf8')
777
            first_line = first_line.replace("\n","").replace(" ","_")
778
            self._header = first_line"""
779
        return self._header
3✔
780

781
    def _set_header(self, name):
3✔
782
        if name not in self._fasta.names:
3✔
783
            raise ValueError("invalid name. Use one of %s" % self._fasta.names)
×
784
        self._header = name
3✔
785
        self._df_shustring = None
3✔
786

787
    header = property(_get_header, _set_header)
3✔
788

789
    def _get_names(self):
3✔
790
        if self._contig_names is None:
3✔
791
            self._contig_names = self._fasta.names
3✔
792
        return self._contig_names
3✔
793

794
    names = property(_get_names)
3✔
795

796
    def _get_shustrings_length(self):
3✔
797
        """Return dataframe with shortest unique substring length at each position
798
        shortest unique substrings are unique in the sequence and its complement
799
        Uses shustring tool"""
800
        if self._df_shustring is None:
3✔
801
            # read fasta
802
            task_read = subprocess.Popen(["cat", self._filename_fasta], stdout=subprocess.PIPE)
3✔
803

804
            # replace spaces of fasta
805
            task_replace_spaces = subprocess.Popen(["sed", "s/ /_/g"], stdin=task_read.stdout, stdout=subprocess.PIPE)
3✔
806

807
            # shustring command
808
            # the -l option uses a regular expression
809
            # if 2 identifier such as >1 and >11 then using regular expression
810
            # >1 means both match. Therefore, we use a $ in the next comand line
811
            # In Repeats, we set the chrom name therefore, we should add this $
812
            # character
813
            task_shus = subprocess.Popen(
3✔
814
                ["shustring", "-r", "-q", "-l", ">{}$".format(self.header)],
815
                stdin=task_replace_spaces.stdout,
816
                stdout=subprocess.PIPE,
817
            )
818

819
            # read stdout line by line and append to list
820
            list_df = []
3✔
821
            for line in task_shus.stdout:
3✔
822
                list_df.append(line.decode("utf8").replace("\n", "").split("\t"))
3✔
823
                # df=pd.concat([df, line])
824
            task_shus.wait()
3✔
825

826
            # convert to dataframe
827
            df = pd.DataFrame(list_df[2 : len(list_df)])
3✔
828
            self._df_shustring = df.astype(int)
3✔
829
            self._df_shustring.columns = ["position", "shustring_length"]
3✔
830

831
            # get input sequence length and longest shustring in the first line
832
            self._length = int(list_df[0][1])
3✔
833
            self._longest_shustring = int(list_df[0][3].split("<=")[2])
3✔
834
        return self._df_shustring
3✔
835

836
    df_shustring = property(_get_shustrings_length)
3✔
837

838
    def _get_genome_length(self):
3✔
839
        if self._df_shustring is None:
3✔
840
            self._get_shustrings_length()
×
841
        return self._length
3✔
842

843
    length = property(_get_genome_length)
3✔
844

845
    def _get_longest_shustring(self):
3✔
846
        if self._df_shustring is None:
3✔
847
            self._get_shustrings_length()
×
848
        return self._longest_shustring
3✔
849

850
    longest_shustring = property(_get_longest_shustring)
3✔
851

852
    def _find_begin_end_repeats(self, force=False):
3✔
853
        """Returns position of repeats longer than threshold
854
        as an ordered list
855
        """
856
        if self.df_shustring is None:
3✔
857
            self._get_shustrings_length()
×
858

859
        if self._threshold is None:
3✔
860
            # print("No threshold : please set minimul length of repeats to output")
861
            raise ValueError("threshold : please set threshold (minimum length of repeats to output)")
×
862

863
        # if there is no result yet, or the threshold has changed
864
        if (self._begin_end_repeat_position is None) | (self.threshold != self._previous_thr) | force:
3✔
865
            nb_row = self.df_shustring.shape[0]
3✔
866
            i = 0
3✔
867
            step_repeat_seq = []
3✔
868
            be_repeats = []
3✔
869
            e = 0
3✔
870

871
            # use list because faster
872
            list_len_shus = list(self.df_shustring.loc[:, "shustring_length"])
3✔
873

874
            while i < nb_row:
3✔
875
                # begining of repeat
876
                if list_len_shus[i] > self.threshold:
3✔
877
                    b = i
3✔
878
                    # compute new end of repeat
879
                    len_repeat = list_len_shus[i]
3✔
880
                    e = b + len_repeat
3✔
881
                    # save (b,e)
882
                    be_repeats.append((b, e))
3✔
883
                    # update i
884
                    i = e - 1
3✔
885
                i += 1
3✔
886

887
            self._begin_end_repeat_position = be_repeats
3✔
888

889
        self._get_merge_repeats()
3✔
890

891
    def _get_be_repeats(self):
3✔
892
        self._find_begin_end_repeats()
3✔
893
        return self._begin_end_repeat_position
3✔
894

895
    begin_end_repeat_position = property(_get_be_repeats)
3✔
896

897
    def _set_threshold(self, value):
3✔
898
        if value < 0:
3✔
899
            raise ValueError("Threshold must be positive integer")
×
900
        if value != self._threshold:
3✔
901
            self._previous_thr = self._threshold
3✔
902
        self._threshold = value
3✔
903
        self._find_begin_end_repeats()
3✔
904
        self._list_len_repeats = [tup[1] - tup[0] for tup in self._begin_end_repeat_position]
3✔
905

906
    def _get_threshold(self):
3✔
907
        return self._threshold
3✔
908

909
    threshold = property(_get_threshold, _set_threshold)
3✔
910

911
    def _get_list_len_repeats(self):
3✔
912
        if self._list_len_repeats is None:
3✔
913
            raise UserWarning("Please set threshold (minimum length of repeats to output)")
×
914
        return self._list_len_repeats
3✔
915

916
    list_len_repeats = property(_get_list_len_repeats)
3✔
917

918
    def _get_merge_repeats(self):
3✔
919
        if self._do_merge:
3✔
920
            # if there are repeats, merge repeats that are fragmented
921
            if len(self._begin_end_repeat_position) > 0:
3✔
922
                prev_tup = self._begin_end_repeat_position[0]
3✔
923
                b = prev_tup[0]
3✔
924
                begin_end_repeat_position_merge = []
3✔
925
                for i in range(1, len(self._begin_end_repeat_position)):
3✔
926
                    tup = self._begin_end_repeat_position[i]
3✔
927

928
                    if tup[0] == prev_tup[1]:
3✔
929
                        # concat
930
                        e = tup[1]
3✔
931
                        prev_tup = tup
3✔
932
                        if i == (len(self._begin_end_repeat_position) - 1):
3✔
933
                            # last tup : append to result
934
                            begin_end_repeat_position_merge.append((b, e))
×
935

936
                    else:
937
                        # real end of repeat : append result and update b, e
938
                        e = prev_tup[1]
3✔
939
                        begin_end_repeat_position_merge.append((b, e))
3✔
940
                        prev_tup = tup
3✔
941
                        b = prev_tup[0]
3✔
942
                        if i == (len(self._begin_end_repeat_position) - 1):
3✔
943
                            # last tup : append to result
944
                            begin_end_repeat_position_merge.append(tup)
3✔
945

946
                self._begin_end_repeat_position = begin_end_repeat_position_merge
3✔
947

948
    def _get_do_merge(self):
3✔
949
        return self._do_merge
×
950

951
    def _set_do_merge(self, do_merge):
3✔
952
        if not isinstance(do_merge, bool):
3✔
953
            raise TypeError("do_merge must be boolean")
×
954
        # if different
955
        if do_merge != self._do_merge:
3✔
956
            self._do_merge = do_merge
3✔
957
            if self._do_merge:
3✔
958
                # did not merge before, merge now
959
                if self._begin_end_repeat_position is None:
3✔
960
                    self._find_begin_end_repeats()
×
961
            else:
962
                # data is already merged : need to compute again to un-merge
963
                self._find_begin_end_repeats(force=True)
3✔
964

965
    do_merge = property(_get_do_merge, _set_do_merge)
3✔
966

967
    def hist_length_repeats(
3✔
968
        self,
969
        bins=20,
970
        alpha=0.5,
971
        hold=False,
972
        fontsize=12,
973
        grid=True,
974
        title="Repeat length",
975
        xlabel="Repeat length",
976
        ylabel="#",
977
        logy=True,
978
    ):
979
        """Plots histogram of the repeat lengths"""
980
        # check that user has set a threshold
981
        if hold is False:
3✔
982
            pylab.clf()
3✔
983
        pylab.hist(self.list_len_repeats, alpha=alpha, bins=bins)
3✔
984
        pylab.title(title)
3✔
985
        pylab.xlabel(xlabel, fontsize=fontsize)
3✔
986
        pylab.ylabel(ylabel, fontsize=fontsize)
3✔
987
        if grid is True:
3✔
988
            pylab.grid(True)
3✔
989
        if logy:
3✔
990
            pylab.semilogy()
3✔
991

992
    def plot(self, clf=True):
3✔
993
        if clf:
3✔
994
            pylab.clf()
3✔
995
        M = self.df_shustring.shustring_length.max()
3✔
996
        print(M)
3✔
997
        M = int(M / 1000) + 1
3✔
998
        for i in range(M):
3✔
999
            pylab.axhline(i * 1000, ls="--", color="grey")
3✔
1000
        pylab.plot(self.df_shustring.shustring_length)
3✔
1001
        pylab.xlabel("position (bp)")
3✔
1002
        pylab.ylabel("Length of repeats")
3✔
1003
        pylab.ylim(bottom=0)
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc