• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rmcar17 / cogent3 / 19318630917

10 Nov 2025 08:08PM UTC coverage: 90.631% (+0.008%) from 90.623%
19318630917

push

github

web-flow
Merge pull request #2518 from cogent3/dependabot/pip/ruff-0.14.4

Bump ruff from 0.14.3 to 0.14.4

28277 of 31200 relevant lines covered (90.63%)

5.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.29
/src/cogent3/core/seq_storage.py
1
from __future__ import annotations
6✔
2

3
import hashlib
6✔
4
from abc import ABC, abstractmethod
6✔
5
from typing import TYPE_CHECKING, Any, Self, cast
6✔
6

7
import numba
6✔
8
import numpy
6✔
9
import numpy.typing as npt
6✔
10

11
import cogent3.core.alphabet as c3_alphabet
6✔
12
import cogent3.core.sequence as c3_sequence
6✔
13
from cogent3.core.location import IndelMap
6✔
14
from cogent3.core.seqview import AlignedDataView, AlignedDataViewABC, SeqDataView
6✔
15
from cogent3.util.misc import extend_docstring_from
6✔
16

17
if TYPE_CHECKING:  # pragma: no cover
18
    from collections.abc import Collection, Mapping
19
    from collections.abc import Sequence as PySeq
20

21
    from cogent3.core.seqview import SeqViewABC
22
    from cogent3.core.slice_record import SliceRecord
23

24
NumpyIntArrayType = npt.NDArray[numpy.integer]
6✔
25

26

27
class SeqsDataABC(ABC):
6✔
28
    """Abstract base class for respresenting the storage object for sequences underlying
29
    a SequenceCollection.
30
    """
31

32
    __slots__ = ()
6✔
33

34
    @abstractmethod
35
    def __init__(
36
        self,
37
        *,
38
        data: dict[str, str | bytes | NumpyIntArrayType],
39
        alphabet: c3_alphabet.AlphabetABC[Any],
40
        offset: dict[str, int] | None = None,
41
        check: bool = True,
42
        reversed_seqs: set[str] | None = None,
43
    ) -> None: ...
44
    @classmethod
45
    @abstractmethod
46
    def from_seqs(
47
        cls,
48
        *,
49
        data: Mapping[str, str | bytes | NumpyIntArrayType],
50
        alphabet: c3_alphabet.CharAlphabet[Any],
51
        **kwargs: Any,
52
    ) -> Self: ...
53

54
    @abstractmethod
55
    def __eq__(self, value: object) -> bool: ...
56

57
    def __ne__(self, other: object) -> bool:
6✔
58
        return not self == other
6✔
59

60
    @abstractmethod
61
    def get_seq_length(self, seqid: str) -> int: ...
62

63
    @property
64
    @abstractmethod
65
    def reversed_seqs(self) -> frozenset[str]: ...
66

67
    @property
68
    @abstractmethod
69
    def names(self) -> tuple[str, ...]: ...
70

71
    @property
72
    @abstractmethod
73
    def alphabet(self) -> c3_alphabet.CharAlphabet[Any]: ...
74

75
    @property
76
    @abstractmethod
77
    def offset(self) -> dict[str, int]: ...
78

79
    @abstractmethod
80
    def get_seq_array(
81
        self,
82
        *,
83
        seqid: str,
84
        start: int | None = None,
85
        stop: int | None = None,
86
        step: int | None = None,
87
    ) -> NumpyIntArrayType: ...
88

89
    def get_seq_str(
6✔
90
        self,
91
        *,
92
        seqid: str,
93
        start: int | None = None,
94
        stop: int | None = None,
95
        step: int | None = None,
96
    ) -> str:
97
        """Return ungapped sequence corresponding to seqid as a string.
98
        start/stop are in sequence coordinates. Excludes gaps."""
99
        return self.alphabet.from_indices(
6✔
100
            self.get_seq_array(seqid=seqid, start=start, stop=stop, step=step),
101
        )
102

103
    def get_seq_bytes(
6✔
104
        self,
105
        *,
106
        seqid: str,
107
        start: int | None = None,
108
        stop: int | None = None,
109
        step: int | None = None,
110
    ) -> bytes:
111
        """Return ungapped sequence corresponding to seqid as a bytes string.
112
        start/stop are in sequence coordinates. Excludes gaps."""
113
        return self.get_seq_str(seqid=seqid, start=start, stop=stop, step=step).encode(
6✔
114
            "utf8",
115
        )
116

117
    @abstractmethod
118
    def get_view(self, seqid: str) -> SeqViewABC: ...
119

120
    @abstractmethod
121
    def to_alphabet(
122
        self,
123
        alphabet: c3_alphabet.CharAlphabet[Any],
124
        check_valid: bool = True,
125
    ) -> Self: ...
126

127
    @abstractmethod
128
    def add_seqs(
129
        self,
130
        seqs: Mapping[str, str | bytes | NumpyIntArrayType],
131
        force_unique_keys: bool = True,
132
        offset: dict[str, int] | None = None,
133
    ) -> SeqsDataABC: ...
134

135
    @abstractmethod
136
    def __len__(self) -> int: ...
137

138
    @abstractmethod
139
    def __getitem__(
140
        self,
141
        index: str | int,
142
    ) -> c3_sequence.Sequence | SeqViewABC: ...
143

144
    @abstractmethod
145
    def copy(self, **kwargs: Any) -> Self: ...
146

147
    @abstractmethod
148
    def get_hash(self, seqid: str) -> str | None: ...
149

150

151
class SeqsData(SeqsDataABC):
6✔
152
    """The builtin ``cogent3`` implementation of sequence storage underlying
153
    a ``SequenceCollection``. The sequence data is stored as numpy arrays. Indexing
154
    this object (using an int or seq name) returns a ``SeqDataView``, which can realise
155
    the corresponding slice as a string, bytes, or numpy array via the alphabet.
156

157
    Notes
158
    -----
159
    Methods on this object only accepts plust strand start, stop and step
160
    indices for selecting segments of data. It can return the gap coordinates
161
    for a sequence as used by IndelMap.
162
    """
163

164
    __slots__ = ("_alphabet", "_data", "_offset", "_reversed")
6✔
165

166
    def __init__(
6✔
167
        self,
168
        *,
169
        data: Mapping[str, str | bytes | NumpyIntArrayType],
170
        alphabet: c3_alphabet.CharAlphabet[Any],
171
        offset: dict[str, int] | None = None,
172
        check: bool = True,
173
        reversed_seqs: set[str] | None = None,
174
    ) -> None:
175
        """
176
        Parameters
177
        ----------
178
        data
179
            raw data as {seq name: sequence, ...} where the sequence can be converted
180
            to a numpy array using the provided alphabet.
181
        alphabet
182
            a cogent3 CharAlphabet instance, typically defined as
183
            <moltype>.most_degen_alphabet()
184
        offset
185
            dict indicating annotation offsets for each sequence
186
        check
187
            use the alphabet to check the sequences are valid
188
        reversed_seqs
189
            names of seqs that are reverse complemented
190

191
        Raises
192
        ------
193
        AlphabetError if the check fails
194
        """
195
        self._alphabet = alphabet
6✔
196
        self._offset = offset or {}
6✔
197
        self._reversed = frozenset(reversed_seqs or set())
6✔
198
        if check:
6✔
199
            assert self._offset.keys() <= data.keys(), (
6✔
200
                "sequence name provided in offset not found in data"
201
            )
202
            if any(not alphabet.is_valid(seq) for seq in data.values()):
6✔
203
                msg = f"One or more sequences are invalid for alphabet {alphabet}"
6✔
204
                raise c3_alphabet.AlphabetError(
6✔
205
                    msg,
206
                )
207
        self._data: dict[str, NumpyIntArrayType] = {}
6✔
208
        for name, seq in data.items():
6✔
209
            arr = self._alphabet.to_indices(seq)
6✔
210
            arr.flags.writeable = False
6✔
211
            self._data[str(name)] = arr
6✔
212

213
    def __eq__(self, other: object) -> bool:
6✔
214
        if not isinstance(other, self.__class__):
6✔
215
            return False
×
216
        for attr_name in ("_alphabet", "_offset"):
6✔
217
            self_attr = getattr(self, attr_name)
6✔
218
            other_attr = getattr(other, attr_name)
6✔
219
            if self_attr != other_attr:
6✔
220
                return False
6✔
221

222
        # compare individuals sequences
223
        if self._data.keys() != other._data.keys():
6✔
224
            return False
×
225
        return all(
6✔
226
            numpy.array_equal(self._data[name], other._data[name])
227
            for name in self._data
228
        )
229

230
    def __len__(self) -> int:
6✔
231
        return len(self.names)
×
232

233
    def __getitem__(self, index: str | int) -> SeqViewABC:
6✔
234
        if isinstance(index, int):
6✔
235
            return self[self.names[index]]
6✔
236
        if isinstance(index, str):
6✔
237
            return self.get_view(seqid=index)
6✔
238

239
        msg = f"__getitem__ not implemented for {type(index)}"
6✔
240
        raise NotImplementedError(msg)
6✔
241

242
    @classmethod
6✔
243
    def from_seqs(
6✔
244
        cls,
245
        *,
246
        data: Mapping[str, str | bytes | NumpyIntArrayType],
247
        alphabet: c3_alphabet.CharAlphabet[Any],
248
        **kwargs: Any,
249
    ) -> Self:
250
        return cls(data=data, alphabet=alphabet, **kwargs)
6✔
251

252
    @property
6✔
253
    def names(self) -> tuple[str, ...]:
6✔
254
        """returns the names of the sequences in the storage"""
255
        return tuple(self._data.keys())
6✔
256

257
    @property
6✔
258
    def reversed_seqs(self) -> frozenset[str]:
6✔
259
        """names of sequences that are reverse complemented"""
260
        return self._reversed
6✔
261

262
    @property
6✔
263
    def alphabet(self) -> c3_alphabet.CharAlphabet[Any]:
6✔
264
        """the character alphabet for validating, encoding, decoding sequences"""
265
        return self._alphabet
6✔
266

267
    @property
6✔
268
    def offset(self) -> dict[str, int]:
6✔
269
        """annotation offsets for each sequence"""
270
        return {name: self._offset.get(name, 0) for name in self.names}
6✔
271

272
    def get_seq_length(self, seqid: str) -> int:
6✔
273
        """return length for seqid"""
274
        return self._data[seqid].shape[0]
6✔
275

276
    def get_hash(self, seqid: str) -> str | None:
6✔
277
        """returns hash of seqid"""
278
        if seqid not in self._data:
6✔
279
            return None
6✔
280
        arr = self.get_seq_array(seqid=seqid)
6✔
281
        return array_hash64(arr)
6✔
282

283
    def get_seq_array(
6✔
284
        self,
285
        *,
286
        seqid: str,
287
        start: int | None = None,
288
        stop: int | None = None,
289
        step: int | None = None,
290
    ) -> NumpyIntArrayType:
291
        start = start or 0
6✔
292
        stop = stop if stop is not None else self.get_seq_length(seqid)
6✔
293
        step = step or 1
6✔
294

295
        if start < 0 or stop < 0 or step < 1:
6✔
296
            msg = f"{start=}, {stop=}, {step=} not >= 1"
×
297
            raise ValueError(msg)
×
298

299
        out_len = (stop - start + step - 1) // step
6✔
300

301
        seq = numpy.empty(out_len, dtype=self.alphabet.dtype)
6✔
302
        seq[:] = self._data[seqid][start:stop:step]
6✔
303

304
        return seq
6✔
305

306
    def get_view(self, seqid: str) -> SeqDataView:
6✔
307
        """reurns view of sequence data for seqid"""
308
        return SeqDataView(
6✔
309
            parent=self,
310
            seqid=seqid,
311
            parent_len=self.get_seq_length(seqid),
312
            alphabet=self.alphabet,
313
        )
314

315
    def add_seqs(
6✔
316
        self,
317
        seqs: Mapping[str, str | bytes | NumpyIntArrayType],
318
        force_unique_keys: bool = True,
319
        offset: dict[str, int] | None = None,
320
    ) -> SeqsData:
321
        """Returns a new SeqsData object with added sequences. If force_unique_keys
322
        is True, raises ValueError if any names already exist in the collection."""
323
        if force_unique_keys and any(name in self.names for name in seqs):
6✔
324
            msg = "One or more sequence names already exist in collection"
6✔
325
            raise ValueError(msg)
6✔
326
        new_data = {
6✔
327
            **self._data,
328
            **{name: self.alphabet.to_indices(seq) for name, seq in seqs.items()},
329
        }
330
        return self.copy(
6✔
331
            data=new_data,
332
            alphabet=self.alphabet,
333
            offset={**self._offset, **(offset or {})},
334
        )
335

336
    def copy(self, **kwargs: Any) -> Self:
6✔
337
        """shallow copy of self
338

339
        Notes
340
        -----
341
        kwargs are passed to constructor and will over-ride existing values
342
        """
343
        init_args: dict[str, Any] = {
6✔
344
            "data": self._data,
345
            "alphabet": self._alphabet,
346
            "offset": self._offset,
347
            "reversed_seqs": self._reversed,
348
            **kwargs,
349
        }
350
        return self.__class__(**init_args)
6✔
351

352
    def to_alphabet(
6✔
353
        self,
354
        alphabet: c3_alphabet.CharAlphabet[Any],
355
        check_valid: bool = True,
356
    ) -> Self:
357
        if (
6✔
358
            len(self.alphabet) == len(alphabet)
359
            and len(
360
                {
361
                    (a, b)
362
                    for a, b in zip(self.alphabet, alphabet, strict=False)
363
                    if a != b
364
                },
365
            )
366
            == 1
367
        ):
368
            # rna <-> dna swap just replace alphabet
369
            return self.copy(alphabet=alphabet)
6✔
370

371
        new_data = {}
6✔
372
        for seqid in self.names:
6✔
373
            seq_data = self.get_seq_array(seqid=seqid)
6✔
374
            as_new_alpha = self.alphabet.convert_seq_array_to(
6✔
375
                seq=seq_data,
376
                alphabet=alphabet,
377
                check_valid=check_valid,
378
            )
379
            new_data[seqid] = as_new_alpha
6✔
380

381
        return self.copy(
6✔
382
            data=new_data,
383
            alphabet=alphabet,
384
            check=False,
385
        )
386

387

388
class AlignedSeqsDataABC(SeqsDataABC):
6✔
389
    """Abstract base class for respresenting the storage object for sequences underlying
390
    a Alignment.
391
    """
392

393
    # all methods that are from SeqsDataABC should work in sequence coordinates,
394
    # all methods unique to AlignedSeqsDataABC should work in aligned coordinates.
395
    # all indices provided to AlignedSeqsDataABC should be on the plus strand.
396
    __slots__ = ()
6✔
397

398
    @abstractmethod
399
    def __init__(
400
        self,
401
        *,
402
        gapped_seqs: NumpyIntArrayType,
403
        names: Collection[str],
404
        alphabet: c3_alphabet.CharAlphabet[Any],
405
        ungapped_seqs: dict[str, NumpyIntArrayType] | None = None,
406
        gaps: Mapping[str, NumpyIntArrayType] | None = None,
407
        offset: dict[str, int] | None = None,
408
        align_len: int | None = None,
409
        check: bool = True,
410
        reversed_seqs: set[str] | None = None,
411
    ) -> None: ...
412

413
    @classmethod
414
    @abstractmethod
415
    def from_seqs_and_gaps(
416
        cls,
417
        *,
418
        seqs: Mapping[str, str | bytes | NumpyIntArrayType],
419
        gaps: Mapping[str, NumpyIntArrayType],
420
        alphabet: c3_alphabet.CharAlphabet[Any],
421
    ) -> Self: ...
422

423
    @classmethod
424
    @abstractmethod
425
    def from_names_and_array(
426
        cls,
427
        *,
428
        names: Collection[str],
429
        data: NumpyIntArrayType,
430
        alphabet: c3_alphabet.CharAlphabet[Any],
431
    ) -> Self: ...
432

433
    @property
434
    @abstractmethod
435
    def align_len(self) -> int: ...
436

437
    @abstractmethod
6✔
438
    def get_view(
6✔
439
        self,
440
        seqid: str | int,
441
        slice_record: SliceRecord | None = None,
442
    ) -> AlignedDataViewABC:
443
        # overriding the SeqsDataABC method as we support directly
444
        # providing the slice_record instance
445
        ...
446

447
    @abstractmethod
448
    def get_gapped_seq_array(
449
        self,
450
        *,
451
        seqid: str,
452
        start: int | None = None,
453
        stop: int | None = None,
454
        step: int | None = None,
455
    ) -> NumpyIntArrayType: ...
456

457
    @abstractmethod
458
    def get_gapped_seq_str(
459
        self,
460
        *,
461
        seqid: str,
462
        start: int | None = None,
463
        stop: int | None = None,
464
        step: int | None = None,
465
    ) -> str: ...
466

467
    @abstractmethod
468
    def get_gapped_seq_bytes(
469
        self,
470
        *,
471
        seqid: str,
472
        start: int | None = None,
473
        stop: int | None = None,
474
        step: int | None = None,
475
    ) -> bytes: ...
476

477
    @abstractmethod
6✔
478
    def get_ungapped(
6✔
479
        self,
480
        name_map: Mapping[str, str],
481
        start: int | None = None,
482
        stop: int | None = None,
483
        step: int | None = None,
484
    ) -> tuple[dict[str, NumpyIntArrayType], dict[str, Any]]:
485
        """
486
        Returns a dictionary of sequence data with no gaps or missing characters and
487
        a dictionary with information to construct a new SequenceCollection via
488
        make_unaligned_seqs.
489

490
        Parameters
491
        ----------
492
        name_map
493
            A dict of {aln_name: data_name, ...} indicating the mapping between
494
            names in the encompassing Alignment (aln_name) and the names in self
495
            (data_name).
496
        start
497
            The alignment starting position.
498
        stop
499
            The alignment stopping position.
500
        step
501
            The step size.
502

503
        Returns
504
        -------
505
        tuple
506
            A tuple containing the following:
507
            - seqs (dict): A dictionary of {name: seq, ...} where the sequences have no gaps
508
              or missing characters.
509
            - kwargs (dict): A dictionary of keyword arguments for make_unaligned_seqs, e.g.,
510
              {"offset": self.offset, "name_map": name_map}.
511
        """
512
        ...
513

514
    @abstractmethod
515
    def get_pos_range(
516
        self,
517
        names: PySeq[str],
518
        start: int | None = None,
519
        stop: int | None = None,
520
        step: int | None = None,
521
    ) -> NumpyIntArrayType: ...
522

523
    @abstractmethod
524
    def get_positions(
525
        self,
526
        names: PySeq[str],
527
        positions: PySeq[int] | NumpyIntArrayType,
528
    ) -> NumpyIntArrayType: ...
529

530
    @abstractmethod
531
    def variable_positions(
532
        self,
533
        names: PySeq[str],
534
        start: int | None = None,
535
        stop: int | None = None,
536
        step: int | None = None,
537
    ) -> NumpyIntArrayType: ...
538

539
    @abstractmethod
540
    def get_gaps(self, seqid: str) -> NumpyIntArrayType: ...
541

542

543
class AlignedSeqsData(AlignedSeqsDataABC):
6✔
544
    """The builtin ``cogent3`` implementation of aligned sequences storage
545
    underlying an ``Alignment``. Indexing this object returns an ``AlignedDataView``
546
    which can realise the corresponding slice as a string, bytes, or numpy array,
547
    gapped or ungapped.
548

549
    Notes
550
    -----
551
    Methods on this object only accepts plust strand start, stop and step
552
    indices for selecting segments of data. It can return the gap coordinates
553
    for a sequence as used by ``IndelMap``.
554
    """
555

556
    __slots__ = (
6✔
557
        "_align_len",
558
        "_alphabet",
559
        "_gapped",
560
        "_gaps",
561
        "_name_to_index",
562
        "_names",
563
        "_offset",
564
        "_reversed",
565
        "_ungapped",
566
    )
567

568
    def __init__(
6✔
569
        self,
570
        *,
571
        gapped_seqs: NumpyIntArrayType,
572
        names: Collection[str],
573
        alphabet: c3_alphabet.CharAlphabet[Any],
574
        ungapped_seqs: dict[str, NumpyIntArrayType] | None = None,
575
        gaps: Mapping[str, NumpyIntArrayType] | None = None,
576
        offset: dict[str, int] | None = None,
577
        align_len: int | None = None,
578
        check: bool = True,
579
        reversed_seqs: set[str] | None = None,
580
    ) -> None:
581
        """
582
        Parameters
583
        ----------
584
        gapped_seqs
585
            2D numpy.uint8 array of aligned sequences. axis 0 are sequences,
586
            axis 1 are alignment positions
587
        names
588
            sequence names in order matching the axis 0 of gapped_seqs
589
        alphabet
590
            caharacter alphabet for the sequences
591
        ungapped_seqs
592
            a dictionary mapping names to 1D numpy.uint8 arrays of individual
593
            sequences without gaps. If not provided, computed on demand.
594
        gaps
595
            a dictionary mapping names to 1D numpy.int32 arrays of gap data,
596
            axis 0 is a gap axis 1 is [gap position in sequence coordinates,
597
            cumulative gap length].  If not provided, computed on demand.
598
        offset
599
            a dictionary of annotation offsets
600
        align_len
601
            length of the alignment, which must equal the gapped_seqs.shape[1]
602
        check
603
            validate any keys in offset, ungapped_seqs, gaps are a subset of names
604
        reversed_seqs
605
            names of seqs that are reverse complemented
606
        """
607
        self._alphabet = alphabet
6✔
608
        self._names = tuple(names)
6✔
609
        self._name_to_index = {name: i for i, name in enumerate(names)}
6✔
610
        self._gapped = gapped_seqs
6✔
611
        self._ungapped = ungapped_seqs or {}
6✔
612
        self._gaps = dict(gaps) if gaps else {}
6✔
613
        align_len = align_len or gapped_seqs.shape[1]
6✔
614
        self._reversed = frozenset(reversed_seqs or set())
6✔
615
        if align_len:
6✔
616
            assert align_len == gapped_seqs.shape[1], "mismatch in alignment length"
6✔
617

618
        self._align_len = align_len
6✔
619
        self._offset = offset or {}
6✔
620

621
        if check:
6✔
622
            if not set(names) >= set(self._gaps.keys()) or not set(names) >= set(
6✔
623
                self._ungapped.keys(),
624
            ):
625
                msg = "Keys in ungapped seqs and gaps must be subsets of names."
6✔
626
                raise ValueError(
6✔
627
                    msg,
628
                )
629
            if not set(names) >= set(self._offset):
6✔
630
                msg = "Keys in offset must be a subset of names."
6✔
631
                raise ValueError(msg)
6✔
632

633
            if len(names) != gapped_seqs.shape[0]:
6✔
634
                msg = f"{len(names)=} != {gapped_seqs.shape[0]=}"
×
635
                raise ValueError(msg)
×
636

637
    def __eq__(self, other: object) -> bool:
6✔
638
        if not isinstance(other, self.__class__):
6✔
639
            return False
×
640
        attrs = (
6✔
641
            "_names",
642
            "_name_to_index",
643
            "_alphabet",
644
            "_align_len",
645
            "_offset",
646
        )
647
        for attr_name in attrs:
6✔
648
            self_attr = getattr(self, attr_name)
6✔
649
            other_attr = getattr(other, attr_name)
6✔
650
            if self_attr != other_attr:
6✔
651
                return False
6✔
652

653
        return bool(numpy.all(self._gapped == other._gapped))
6✔
654

655
    def __len__(self) -> int:
6✔
656
        return self.align_len
6✔
657

658
    def __getitem__(self, index: str | int) -> AlignedDataViewABC:
6✔
659
        return self.get_view(index)
6✔
660

661
    @property
6✔
662
    def names(self) -> tuple[str, ...]:
6✔
663
        """returns the names of the sequences in the storage"""
664
        return self._names
6✔
665

666
    @property
6✔
667
    def reversed_seqs(self) -> frozenset[str]:
6✔
668
        """names of sequences that are reverse complemented"""
669
        return self._reversed
6✔
670

671
    @property
6✔
672
    def alphabet(self) -> c3_alphabet.CharAlphabet[Any]:
6✔
673
        """the character alphabet for validating, encoding, decoding sequences"""
674
        return self._alphabet
6✔
675

676
    @property
6✔
677
    def align_len(self) -> int:
6✔
678
        """Return the length of the alignment."""
679
        return self._align_len
6✔
680

681
    @property
6✔
682
    def offset(self) -> dict[str, int]:
6✔
683
        """returns the offset of each sequence in the Alignment"""
684
        return {name: self._offset.get(name, 0) for name in self.names}
6✔
685

686
    @classmethod
6✔
687
    def from_seqs(
6✔
688
        cls,
689
        *,
690
        data: Mapping[str, str | bytes | NumpyIntArrayType],
691
        alphabet: c3_alphabet.CharAlphabet[Any],
692
        **kwargs: Any,
693
    ) -> Self:
694
        """Construct an AlignedSeqsData object from a dict of aligned sequences
695

696
        Parameters
697
        ----------
698
        data
699
            dict of gapped sequences {name: seq, ...}. sequences must all be
700
            the same length
701
        alphabet
702
            alphabet object for the sequences
703
        """
704
        seq_lengths = {len(v) for v in data.values()}
6✔
705
        if len(seq_lengths) != 1:
6✔
706
            msg = "All sequence lengths must be the same."
6✔
707
            raise ValueError(msg)
6✔
708

709
        align_len = seq_lengths.pop()
6✔
710
        names = tuple(data.keys())
6✔
711
        array_seqs = numpy.empty((len(names), align_len), dtype=alphabet.dtype)
6✔
712
        for i, name in enumerate(names):
6✔
713
            array_seqs[i] = alphabet.to_indices(data[name], validate=True)
6✔
714

715
        array_seqs.flags.writeable = False
6✔
716
        return cls(
6✔
717
            gapped_seqs=array_seqs,
718
            alphabet=alphabet,
719
            align_len=align_len,
720
            check=False,
721
            names=names,
722
            **kwargs,
723
        )
724

725
    @classmethod
6✔
726
    def from_seqs_and_gaps(
6✔
727
        cls,
728
        *,
729
        seqs: Mapping[str, str | bytes | NumpyIntArrayType],
730
        gaps: Mapping[str, NumpyIntArrayType],
731
        alphabet: c3_alphabet.CharAlphabet[Any],
732
        **kwargs: Any,
733
    ) -> Self:
734
        """Construct an AlignedSeqsData object from a dict of ungapped sequences
735
        and a corresponding dict of gap data.
736

737
        Parameters
738
        ----------
739
        seqs
740
            dict of ungapped sequences {name: seq, ...}
741
        gaps
742
            gap data {name: [[seq gap position, cumulative gap length], ...], ...}
743
        alphabet
744
            alphabet object for the sequences
745
        """
746
        names: tuple[str, ...] = tuple(kwargs.pop("names", seqs.keys()))
6✔
747
        if not names:
6✔
748
            msg = "seqs cannot be empty"
6✔
749
            raise ValueError(msg)
6✔
750

751
        align_len: int | None = kwargs.pop("align_len", None)
6✔
752
        if align_len is None:
6✔
753
            align_len = _gapped_seq_len(seqs[names[0]], gaps[names[0]])
6✔
754

755
        gapped_seqs = numpy.empty((len(names), align_len), dtype=alphabet.dtype)
6✔
756
        seq_arrays: dict[str, NumpyIntArrayType] = {}
6✔
757
        for i, name in enumerate(names):
6✔
758
            seq = alphabet.to_indices(seqs[name])
6✔
759
            seq_arrays[name] = seq
6✔
760
            if name not in gaps:
6✔
761
                msg = f"Missing gap data for sequence {name!r}"
6✔
762
                raise ValueError(msg)
6✔
763
            gapped_seqs[i] = compose_gapped_seq(
6✔
764
                seq, gaps[name], cast("int", alphabet.gap_index)
765
            )
766
            assert len(gapped_seqs[i]) == align_len, "aligned lengths do not match"
6✔
767

768
        gapped_seqs.flags.writeable = False
6✔
769
        return cls(
6✔
770
            ungapped_seqs=seq_arrays,
771
            gaps=gaps,
772
            gapped_seqs=gapped_seqs,
773
            alphabet=alphabet,
774
            names=names,
775
            align_len=align_len,
776
            **kwargs,
777
        )
778

779
    @classmethod
6✔
780
    def from_names_and_array(
6✔
781
        cls,
782
        *,
783
        names: Collection[str],
784
        data: NumpyIntArrayType,
785
        alphabet: c3_alphabet.CharAlphabet[Any],
786
    ) -> Self:
787
        """Construct an AlignedSeqsData object from a list of names and a numpy
788
        array of aligned sequence data.
789

790
        Parameters
791
        ----------
792
        names
793
            list of sequence names
794
        data
795
            numpy array of aligned sequence data
796
        alphabet
797
            alphabet object for the sequences
798
        """
799
        if len(names) != data.shape[0] or not len(names):
6✔
800
            msg = "Number of names must match number of rows in data."
6✔
801
            raise ValueError(msg)
6✔
802

803
        gapped_seqs = data.astype(alphabet.dtype)
6✔
804
        gapped_seqs.flags.writeable = False
6✔
805
        return cls(
6✔
806
            gapped_seqs=gapped_seqs,
807
            names=names,
808
            alphabet=alphabet,
809
        )
810

811
    def get_hash(self, seqid: str) -> str | None:
6✔
812
        """returns hash of seqid"""
813
        if seqid not in self._name_to_index:
6✔
814
            return None
6✔
815
        arr = self.get_gapped_seq_array(seqid=seqid)
6✔
816
        return array_hash64(arr)
6✔
817

818
    def _make_gaps_and_ungapped(self, seqid: str) -> None:
6✔
819
        if seqid in self._gaps and seqid in self._ungapped:
6✔
820
            # job already done
821
            return
×
822

823
        index = self._name_to_index[seqid]
6✔
824
        ungapped, gaps = decompose_gapped_seq(
6✔
825
            self._gapped[index],
826
            alphabet=self.alphabet,
827
        )
828
        self._gaps[seqid] = gaps
6✔
829
        self._ungapped[seqid] = ungapped
6✔
830

831
    def _get_ungapped(self, seqid: str) -> NumpyIntArrayType:
6✔
832
        if seqid not in self._ungapped:
6✔
833
            self._make_gaps_and_ungapped(seqid)
6✔
834
        return self._ungapped[seqid]
6✔
835

836
    def get_seq_length(self, seqid: str) -> int:
6✔
837
        """return length of the unaligned seq for seqid"""
838
        return len(self._get_ungapped(seqid))
6✔
839

840
    def get_view(
6✔
841
        self,
842
        seqid: str | int,
843
        slice_record: SliceRecord | None = None,
844
    ) -> AlignedDataView:
845
        """reurns view of aligned sequence data for seqid
846

847
        Parameters
848
        ----------
849
        seqid
850
            sequence name
851
        slice_record
852
            slice record to use for slicing the data. If None, uses the
853
            default slice record for the entire sequence.
854
        """
855
        if isinstance(seqid, int):
6✔
856
            seqid = self.names[seqid]
6✔
857

858
        return AlignedDataView(
6✔
859
            parent=self,
860
            seqid=seqid,
861
            alphabet=self.alphabet,
862
            slice_record=slice_record,
863
        )
864

865
    def _get_gaps(self, seqid: str) -> NumpyIntArrayType:
6✔
866
        if seqid not in self._gaps:
6✔
867
            self._make_gaps_and_ungapped(seqid)
6✔
868
        return self._gaps[seqid]
6✔
869

870
    def get_gaps(self, seqid: str) -> NumpyIntArrayType:
6✔
871
        """returns the gap data for seqid"""
872
        return self._get_gaps(seqid)
6✔
873

874
    def get_seq_array(
6✔
875
        self,
876
        *,
877
        seqid: str,
878
        start: int | None = None,
879
        stop: int | None = None,
880
        step: int | None = None,
881
    ) -> NumpyIntArrayType:
882
        """Return ungapped sequence corresponding to seqid as an array of indices.
883

884
        Notes
885
        -----
886
        Assumes start/stop are in sequence coordinates. If seqid is in
887
        reversed_seqs, that sequence will be in plus strand orientation.
888
        It is client codes responsibility to ensure the coordinates are
889
        consistent with that.
890
        """
891
        start = start or 0
6✔
892
        stop = stop if stop is not None else self.get_seq_length(seqid)
6✔
893
        step = step or 1
6✔
894

895
        if start < 0 or stop < 0 or step < 1:
6✔
896
            msg = f"{start=}, {stop=}, {step=} not >= 1"
×
897
            raise ValueError(msg)
×
898

899
        out_len = (stop - start + step - 1) // step
6✔
900

901
        seq = numpy.empty(out_len, dtype=self.alphabet.dtype)
6✔
902
        seq[:] = self._get_ungapped(seqid)[start:stop:step]
6✔
903

904
        return seq
6✔
905

906
    def get_gapped_seq_array(
6✔
907
        self,
908
        *,
909
        seqid: str,
910
        start: int | None = None,
911
        stop: int | None = None,
912
        step: int | None = None,
913
    ) -> NumpyIntArrayType:
914
        """Return sequence data corresponding to seqid as an array of indices.
915
        start/stop are in alignment coordinates. Includes gaps.
916
        """
917
        start = start or 0
6✔
918
        stop = stop if stop is not None else self.align_len
6✔
919
        step = step or 1
6✔
920
        if start < 0 or stop < 0 or step < 1:
6✔
921
            msg = f"{start=}, {stop=}, {step=} not >= 1"
×
922
            raise ValueError(msg)
×
923

924
        index = self._name_to_index[seqid]
6✔
925
        out_len = (stop - start + step - 1) // step
6✔
926
        gapped = numpy.empty(out_len, dtype=self.alphabet.dtype)
6✔
927
        gapped[:] = self._gapped[index][start:stop:step]
6✔
928

929
        return gapped
6✔
930

931
    def get_gapped_seq_str(
6✔
932
        self,
933
        *,
934
        seqid: str,
935
        start: int | None = None,
936
        stop: int | None = None,
937
        step: int | None = None,
938
    ) -> str:
939
        """Return sequence corresponding to seqid as a string.
940
        start/stop are in alignment coordinates. Includes gaps."""
941
        return self.alphabet.from_indices(
6✔
942
            self.get_gapped_seq_array(seqid=seqid, start=start, stop=stop, step=step),
943
        )
944

945
    def get_gapped_seq_bytes(
6✔
946
        self,
947
        *,
948
        seqid: str,
949
        start: int | None = None,
950
        stop: int | None = None,
951
        step: int | None = None,
952
    ) -> bytes:
953
        """Return sequence corresponding to seqid as a bytes string.
954
        start/stop are in alignment coordinates. Includes gaps."""
955
        return self.get_gapped_seq_str(
6✔
956
            seqid=seqid,
957
            start=start,
958
            stop=stop,
959
            step=step,
960
        ).encode("utf8")
961

962
    @extend_docstring_from(AlignedSeqsDataABC.get_ungapped)
6✔
963
    def get_ungapped(
6✔
964
        self,
965
        name_map: Mapping[str, str],
966
        start: int | None = None,
967
        stop: int | None = None,
968
        step: int | None = None,
969
    ) -> tuple[dict[str, NumpyIntArrayType], dict[str, Any]]:
970
        # redesign
971
        # if gaps exist, don't go via gapped seq
972
        # convert alignment coords into sequence coords using the location.align_to_seq_index function
973
        # this means we will need to convert coordinates to a plus strand slice
974
        if (start or 0) < 0 or (stop or 0) < 0 or (step or 1) <= 0:
6✔
975
            msg = f"{start=}, {stop=}, {step=} not >= 0"
×
976
            raise ValueError(msg)
×
977

978
        seq_array = numpy.empty(
6✔
979
            (len(name_map), self.align_len),
980
            dtype=self.alphabet.dtype,
981
        )
982
        names = tuple(name_map.values())
6✔
983
        for i, name in enumerate(names):
6✔
984
            index = self._name_to_index[name]
6✔
985
            seq_array[i] = self._gapped[index]
6✔
986
        seq_array = seq_array[:, start:stop:step]
6✔
987
        # now exclude gaps and missing
988
        seqs: dict[str, NumpyIntArrayType] = {}
6✔
989
        for i, name in enumerate(names):
6✔
990
            seq: NumpyIntArrayType = seq_array[i]
6✔
991
            indices = seq != self.alphabet.gap_index
6✔
992
            if self.alphabet.missing_index is not None:
6✔
993
                indices &= seq != self.alphabet.missing_index
6✔
994
            seqs[name] = seq[indices]
6✔
995

996
        offset = {n: v for n, v in self._offset.items() if n in names}
6✔
997
        return seqs, {
6✔
998
            "offset": offset,
999
            "name_map": name_map,
1000
            "reversed_seqs": self._reversed,
1001
        }
1002

1003
    def get_pos_range(
6✔
1004
        self,
1005
        names: PySeq[str],
1006
        start: int | None = None,
1007
        stop: int | None = None,
1008
        step: int | None = None,
1009
    ) -> NumpyIntArrayType:
1010
        """returns an array of the selected positions for names."""
1011
        start = start or 0
6✔
1012
        stop = stop or self.align_len
6✔
1013
        step = step or 1
6✔
1014
        if start < 0 or stop < 0 or step < 1:
6✔
1015
            msg = f"{start=}, {stop=}, {step=} not >= 1"
×
1016
            raise ValueError(msg)
×
1017

1018
        indices = tuple(self._name_to_index[name] for name in names)
6✔
1019
        if abs((start - stop) // step) == self.align_len:
6✔
1020
            array_seqs = self._gapped[indices, :]
6✔
1021
        else:
1022
            array_seqs = self._gapped[indices, start:stop:step]
6✔
1023

1024
        return array_seqs
6✔
1025

1026
    def get_positions(
6✔
1027
        self,
1028
        names: PySeq[str],
1029
        positions: PySeq[int] | NumpyIntArrayType,
1030
    ) -> NumpyIntArrayType:
1031
        """returns alignment positions for names
1032

1033
        Parameters
1034
        ----------
1035
        names
1036
            series of sequence names
1037
        positions
1038
            indices lying within self
1039

1040
        Returns
1041
        -------
1042
            2D numpy.array, oriented by sequence
1043

1044
        Raises
1045
        ------
1046
        IndexError if a provided position is negative or
1047
        greater then alignment length.
1048
        """
1049
        if diff := set(names) - set(self.names):
6✔
1050
            msg = f"these names not present {diff}"
×
1051
            raise ValueError(msg)
×
1052

1053
        min_index, max_index = numpy.min(positions), numpy.max(positions)
6✔
1054
        if min_index < 0 or max_index > self.align_len:
6✔
1055
            msg = f"Out of range: {min_index=} and / or {max_index=}"
6✔
1056
            raise IndexError(msg)
6✔
1057

1058
        seq_indices = numpy.array([self._name_to_index[n] for n in names])
6✔
1059
        return self._gapped[numpy.ix_(seq_indices, numpy.asarray(positions))]
6✔
1060

1061
    def copy(self, **kwargs: Any) -> Self:
6✔
1062
        """shallow copy of self
1063

1064
        Notes
1065
        -----
1066
        kwargs are passed to constructor and will over-ride existing values
1067
        """
1068
        init_args: dict[str, Any] = {
×
1069
            "gapped_seqs": self._gapped,
1070
            "names": self._names,
1071
            "alphabet": self._alphabet,
1072
            "ungapped_seqs": self._ungapped,
1073
            "gaps": self._gaps,
1074
            "offset": self._offset,
1075
            "align_len": self._align_len,
1076
            "check": False,
1077
            "reversed_seqs": self._reversed,
1078
            **kwargs,
1079
        }
1080

1081
        return self.__class__(**init_args)
×
1082

1083
    def add_seqs(
6✔
1084
        self,
1085
        seqs: Mapping[str, str | bytes | NumpyIntArrayType],
1086
        force_unique_keys: bool = True,
1087
        offset: dict[str, int] | None = None,
1088
    ) -> AlignedSeqsData:
1089
        """Returns a new AlignedSeqsData object with added sequences.
1090

1091
        Parameters
1092
        ----------
1093
        seqs
1094
            dict of sequences to add {name: seq, ...}
1095
        force_unique_keys
1096
            if True, raises ValueError if any sequence names already exist in the collection
1097
        offset
1098
            dict of offsets relative to for the new sequences.
1099
        """
1100
        if force_unique_keys and any(name in self.names for name in seqs):
6✔
1101
            msg = "One or more sequence names already exist in collection"
6✔
1102
            raise ValueError(msg)
6✔
1103

1104
        new_seq_lens = {len(seq) for seq in seqs.values()}
6✔
1105
        if len(new_seq_lens) != 1 or new_seq_lens.pop() != self.align_len:
6✔
1106
            msg = "All sequences must be the same length as existing sequences"
6✔
1107
            raise ValueError(
6✔
1108
                msg,
1109
            )
1110

1111
        new_seqs = dict(zip(self.names, self._gapped, strict=False))
6✔
1112
        for name, seq in seqs.items():
6✔
1113
            seq = self.alphabet.to_indices(seq, validate=True)
6✔
1114
            seq.flags.writeable = False
6✔
1115
            new_seqs[name] = seq
6✔
1116

1117
        names = tuple(new_seqs.keys())
6✔
1118
        gapped = numpy.empty((len(names), self.align_len), dtype=self.alphabet.dtype)
6✔
1119
        for i, name in enumerate(names):
6✔
1120
            gapped[i] = new_seqs[name]
6✔
1121

1122
        return self.__class__(
6✔
1123
            gapped_seqs=gapped,
1124
            names=names,
1125
            alphabet=self.alphabet,
1126
            offset={**self._offset, **(offset or {})},
1127
            align_len=self.align_len,
1128
        )
1129

1130
    def to_alphabet(
6✔
1131
        self,
1132
        alphabet: c3_alphabet.CharAlphabet[Any],
1133
        check_valid: bool = True,
1134
    ) -> Self:
1135
        """Returns a new AlignedSeqsData object with the same underlying data
1136
        with a new alphabet."""
1137
        if (
6✔
1138
            len(alphabet) == len(self.alphabet)
1139
            and len(
1140
                {
1141
                    (a, b)
1142
                    for a, b in zip(self.alphabet, alphabet, strict=False)
1143
                    if a != b
1144
                },
1145
            )
1146
            == 1
1147
        ):
1148
            # special case where mapping between dna and rna
1149
            return self.__class__(
6✔
1150
                gapped_seqs=self._gapped,
1151
                alphabet=alphabet,
1152
                offset=self._offset,
1153
                align_len=self.align_len,
1154
                names=self.names,
1155
            )
1156

1157
        gapped = numpy.empty(
6✔
1158
            (len(self.names), self.align_len),
1159
            dtype=self.alphabet.dtype,
1160
        )
1161

1162
        for i in range(len(self.names)):
6✔
1163
            seq_data = self._gapped[i]
6✔
1164
            as_new_alpha = self.alphabet.convert_seq_array_to(
6✔
1165
                seq=seq_data,
1166
                alphabet=alphabet,
1167
                check_valid=check_valid,
1168
            )
1169
            gapped[i] = as_new_alpha
6✔
1170

1171
        return self.__class__(
6✔
1172
            gapped_seqs=gapped,
1173
            alphabet=alphabet,
1174
            offset=self._offset,
1175
            names=self.names,
1176
        )
1177

1178
    def variable_positions(
6✔
1179
        self,
1180
        names: PySeq[str],
1181
        start: int | None = None,
1182
        stop: int | None = None,
1183
        step: int | None = None,
1184
    ) -> NumpyIntArrayType:
1185
        """returns absolute indices of positions that have more than one state
1186

1187
        Parameters
1188
        ----------
1189
        names
1190
            selected seqids
1191
        start
1192
            absolute start
1193
        stop
1194
            absolute stop
1195
        step
1196
            step
1197

1198
        Returns
1199
        -------
1200
        Absolute indices (as distinct from an index relative to start) of
1201
        variable positions.
1202
        """
1203
        start = start or 0
6✔
1204
        if len(names) < 2:
6✔
1205
            return numpy.array([])
×
1206

1207
        array_seqs = self.get_pos_range(names, start=start, stop=stop, step=step)
6✔
1208
        if array_seqs.size == 0:
6✔
1209
            return numpy.array([])
×
1210

1211
        indices = (array_seqs != array_seqs[0]).any(axis=0)
6✔
1212
        return numpy.where(indices)[0] + start
6✔
1213

1214

1215
def _gapped_seq_len(
6✔
1216
    seq: str | bytes | NumpyIntArrayType, gap_map: IndelMap | NumpyIntArrayType
1217
) -> int:
1218
    """calculate the gapped sequence length from a ungapped sequence and gap map
1219

1220
    Parameters
1221
    ----------
1222
    seq
1223
        numpy array of sequence indices
1224
    gap_map
1225
        numpy array of [gap index, cumulative gap length] pairs
1226
    """
1227
    if isinstance(gap_map, IndelMap):
6✔
1228
        gap_map = gap_map.array
×
1229
    try:
6✔
1230
        gap_len = gap_map[-1][1]
6✔
1231
    except IndexError:  # no gaps
6✔
1232
        return len(seq)
6✔
1233

1234
    return len(seq) + gap_len
6✔
1235

1236

1237
@numba.jit(cache=True)
1238
def compose_gapped_seq(
1239
    ungapped_seq: NumpyIntArrayType,
1240
    gaps: NumpyIntArrayType,
1241
    gap_index: int,
1242
) -> NumpyIntArrayType:  # pragma: no cover
1243
    """reconstruct a gapped sequence from an ungapped sequence and gap data"""
1244
    if not len(gaps):
1245
        return ungapped_seq
1246

1247
    gapped_len = len(ungapped_seq) + gaps[-1, 1]
1248

1249
    gapped_seq = numpy.empty(gapped_len, dtype=ungapped_seq.dtype)
1250

1251
    pos = 0
1252
    ungapped_pos = 0
1253
    prev_gap_len = 0
1254
    for gap_pos, cum_gap_len in gaps:
1255
        gap_len = cum_gap_len - prev_gap_len
1256
        prev_gap_len = cum_gap_len
1257

1258
        gapped_seq[pos : pos + gap_pos - ungapped_pos] = ungapped_seq[
1259
            ungapped_pos:gap_pos
1260
        ]
1261
        pos += gap_pos - ungapped_pos
1262
        ungapped_pos = gap_pos
1263

1264
        gapped_seq[pos : pos + gap_len] = gap_index
1265
        pos += gap_len
1266

1267
    gapped_seq[pos:] = ungapped_seq[ungapped_pos:]
1268

1269
    return gapped_seq
1270

1271

1272
def decompose_gapped_seq(
6✔
1273
    seq: str | bytes | NumpyIntArrayType | c3_sequence.Sequence,
1274
    *,
1275
    alphabet: c3_alphabet.AlphabetABC[Any],
1276
    missing_as_gap: bool = True,
1277
) -> tuple[NumpyIntArrayType, NumpyIntArrayType]:
1278
    """
1279
    Takes a sequence with (or without) gaps and returns an ungapped sequence
1280
    and a map of the position and length of gaps in the original parent sequence
1281
    """
1282

1283
    if isinstance(seq, bytes):
6✔
1284
        seq = seq.decode("utf-8")
6✔
1285

1286
    if isinstance(seq, str):
6✔
1287
        if not alphabet.is_valid(seq):
6✔
1288
            msg = f"Sequence is invalid for alphabet {alphabet}"
×
1289
            raise c3_alphabet.AlphabetError(msg)
×
1290
        seq = alphabet.to_indices(seq)
6✔
1291

1292
    if isinstance(seq, c3_sequence.Sequence):
6✔
1293
        seq = numpy.array(seq)
6✔
1294

1295
    if isinstance(seq, numpy.ndarray):
6✔
1296
        if missing_as_gap and alphabet.missing_index:
6✔
1297
            missing_index = int(numpy.uint8(alphabet.missing_index))
6✔
1298
        else:
1299
            missing_index = -1
×
1300
        return decompose_gapped_seq_array(
6✔
1301
            seq.astype(alphabet.dtype),
1302
            cast("int", alphabet.gap_index),
1303
            missing_index=missing_index,
1304
        )
1305

1306
    msg = f"decompose_gapped_seq not implemented for type {type(seq)}"
×
1307
    raise NotImplementedError(
×
1308
        msg,
1309
    )
1310

1311

1312
@numba.jit(cache=True)
1313
def decompose_gapped_seq_array(
1314
    seq: NumpyIntArrayType,
1315
    gap_index: int,
1316
    missing_index: int = -1,
1317
) -> tuple[NumpyIntArrayType, NumpyIntArrayType]:  # pragma: no cover
1318
    """
1319
    extracts the ungapped sequence and gap data from a gapped sequence
1320

1321
    Parameters
1322
    ----------
1323
    seq
1324
        numpy array representing a gapped sequence
1325
    gap_index
1326
        from an alphabet
1327
    missing_index
1328
        from an alphabet, represents index for missing character
1329

1330
    Returns
1331
    -------
1332
    ungapped, [[gap_pos in sequence coords, cumulative gap length]]
1333

1334
    Notes
1335
    -----
1336
    being called by decompose_gapped_seq
1337
    A missing_index is an ambiguity code that includes the gap character.
1338
    Be careful in providing this value when dealing with sequences that
1339
    may have had a feature masking applied.
1340
    """
1341
    seqlen = len(seq)
1342
    working = numpy.empty((seqlen, numpy.int64(2)), dtype=numpy.int64)
1343

1344
    in_gap = False
1345
    num_gaps = 0
1346
    start = 0
1347
    for i, base in enumerate(seq):
1348
        gapped = base in (gap_index, missing_index)
1349
        if gapped and not in_gap:
1350
            start = i
1351
            in_gap = True
1352
        elif not gapped and in_gap:
1353
            working[num_gaps][:] = start, i - start
1354
            num_gaps += 1
1355
            in_gap = False
1356

1357
        if gapped and i == seqlen - 1:
1358
            # end of sequence
1359
            working[num_gaps][:] = start, i - start + 1
1360
            num_gaps += 1
1361

1362
    if num_gaps == 0:
1363
        return seq, numpy.empty((0, 2), dtype=numpy.int64)
1364

1365
    gap_coords = working[:num_gaps]
1366
    gap_coords.T[1] = gap_coords.T[1].cumsum()
1367
    # get gap start positions in sequence coords
1368
    for index, cum_length in enumerate(numpy.append(0, gap_coords.T[1][:-1])):
1369
        gap_coords[index][0] -= cum_length
1370

1371
    ungapped = numpy.empty(seqlen - gap_coords.T[1][-1], dtype=seq.dtype)
1372
    seqpos = 0
1373
    for base in seq:
1374
        gapped = base in (gap_index, missing_index)
1375
        if not gapped:
1376
            ungapped[seqpos] = base
1377
            seqpos += 1
1378

1379
    return ungapped, gap_coords
1380

1381

1382
def array_hash64(data: npt.NDArray[numpy.number]) -> str:
6✔
1383
    """returns 64-bit hash of numpy array.
1384

1385
    Notes
1386
    -----
1387
    This function does not introduce randomisation and so
1388
    is reproducible between processes.
1389
    """
1390
    h = hashlib.md5(data.tobytes(), usedforsecurity=False)
6✔
1391
    return h.hexdigest()[:16]
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc