• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rmcar17 / cogent3 / 19318630917

10 Nov 2025 08:08PM UTC coverage: 90.631% (+0.008%) from 90.623%
19318630917

push

github

web-flow
Merge pull request #2518 from cogent3/dependabot/pip/ruff-0.14.4

Bump ruff from 0.14.3 to 0.14.4

28277 of 31200 relevant lines covered (90.63%)

5.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.52
/src/cogent3/core/alignment.py
1
from __future__ import annotations
6✔
2

3
import contextlib
6✔
4
import copy
6✔
5
import dataclasses
6✔
6
import json
6✔
7
import pathlib
6✔
8
import re
6✔
9
import types
6✔
10
import warnings
6✔
11
from abc import ABC, abstractmethod
6✔
12
from collections import Counter, defaultdict
6✔
13
from collections.abc import Mapping
6✔
14
from typing import TYPE_CHECKING, Any, Generic, Literal, Self, TypeVar, cast, overload
6✔
15

16
import numba
6✔
17
import numpy
6✔
18
import numpy.typing as npt
6✔
19
from typing_extensions import override
6✔
20

21
import cogent3
6✔
22
import cogent3._plugin as c3_plugin
6✔
23
import cogent3.core.alphabet as c3_alphabet
6✔
24
import cogent3.core.genetic_code as c3_genetic_code
6✔
25
import cogent3.core.moltype as c3_moltype
6✔
26
import cogent3.core.sequence as c3_sequence
6✔
27
from cogent3._version import __version__
6✔
28
from cogent3.core.annotation import Feature
6✔
29
from cogent3.core.annotation_db import (
6✔
30
    AnnotatableMixin,
31
    FeatureDataType,
32
    SqliteAnnotationDbMixin,
33
    SupportsFeatures,
34
)
35
from cogent3.core.info import Info as InfoClass
6✔
36
from cogent3.core.location import FeatureMap, IndelMap, Strand
6✔
37
from cogent3.core.profile import PSSM, MotifCountsArray, MotifFreqsArray, load_pssm
6✔
38
from cogent3.core.seq_storage import (
6✔
39
    AlignedSeqsData,
40
    AlignedSeqsDataABC,
41
    SeqsDataABC,
42
)
43
from cogent3.core.slice_record import SliceRecord
6✔
44
from cogent3.maths.stats.number import CategoryCounter
6✔
45
from cogent3.util import progress_display as UI
6✔
46
from cogent3.util import warning as c3warn
6✔
47
from cogent3.util.deserialise import register_deserialiser
6✔
48
from cogent3.util.dict_array import DictArray, DictArrayTemplate
6✔
49
from cogent3.util.io import atomic_write, get_format_suffixes
6✔
50
from cogent3.util.misc import (
6✔
51
    extend_docstring_from,
52
    get_object_provenance,
53
    get_setting_from_environ,
54
    negate_condition,
55
)
56
from cogent3.util.union_dict import UnionDict
6✔
57

58
if TYPE_CHECKING:  # pragma: no cover
59
    from collections.abc import Callable, Iterable, Iterator
60
    from collections.abc import Sequence as PySeq
61

62
    from cogent3.core.seqview import (
63
        AlignedDataViewABC,
64
        SeqViewABC,
65
    )
66
    from cogent3.core.tree import PhyloNode
67
    from cogent3.draw.dotplot import Dotplot
68
    from cogent3.draw.drawable import AnnotatedDrawable, Drawable, Shape
69
    from cogent3.evolve.fast_distance import DistanceMatrix
70
    from cogent3.maths.stats.contingency import TestResult
71
    from cogent3.util.io import PathType
72

73
MolTypes = c3_moltype.MolTypeLiteral | c3_moltype.MolType[Any]
6✔
74

75
NumpyIntArrayType = npt.NDArray[numpy.integer]
6✔
76
NumpyFloatArrayType = npt.NDArray[numpy.floating]
6✔
77

78

79
class Aligned(AnnotatableMixin):
6✔
80
    """A single sequence in an alignment.
81

82
    Notes
83
    -----
84
    This is a wrapper around a ``AlignedDataView``. This class performs any
85
    complementing needed. It can be cast directly to a string or numpy array,
86
    e.g. ``numpy.array(<aligned instance>)`` returns a numpy unsigned 8-bit
87
    integer array.
88
    """
89

90
    __slots__ = ("_annotation_db", "_data", "_moltype", "_name")
6✔
91

92
    def __init__(
6✔
93
        self,
94
        data: AlignedDataViewABC,
95
        moltype: c3_moltype.MolType[str],
96
        name: str | None = None,
97
        annotation_db: SupportsFeatures | list[SupportsFeatures] | None = None,
98
    ) -> None:
99
        self._data = data
6✔
100
        self._moltype = moltype
6✔
101
        self._name = name or data.seqid
6✔
102
        self._annotation_db: list[SupportsFeatures] = self._init_annot_db_value(
6✔
103
            annotation_db
104
        )
105

106
    @classmethod
6✔
107
    def from_map_and_seq(
6✔
108
        cls, indel_map: IndelMap, seq: c3_sequence.Sequence
109
    ) -> Aligned:
110
        """Creates an Aligned instance from an indel map and a Sequence."""
111
        moltype = seq.moltype
6✔
112
        # refactor: design
113
        # this is a temporary approach during migration to new_types
114
        # to support the sequence alignment algorithms
115
        # a better solution is to create a AlignedDataView instance the
116
        # map and seq directly without requiring a parent AlignedSeqsData
117
        name = cast("str", seq.name)
6✔
118
        asd = AlignedSeqsData.from_seqs_and_gaps(
6✔
119
            seqs={name: numpy.array(seq)},
120
            gaps={name: indel_map.array},
121
            alphabet=moltype.most_degen_alphabet(),
122
        )
123

124
        return cls(asd.get_view(name), moltype)
6✔
125

126
    @classmethod
6✔
127
    def from_map_and_aligned_data_view(
6✔
128
        cls,
129
        indel_map: IndelMap,
130
        seq: AlignedDataViewABC,
131
    ) -> Aligned:
132
        """Creates an Aligned instance from an indel map and AlignedDataView."""
133
        moltype = cast("c3_moltype.MolType[Any]", seq.alphabet.moltype)
6✔
134
        seqid = cast("str", seq.seqid)
6✔
135
        seq_arr = seq.array_value
6✔
136
        # refactor: design
137
        # see above comment in from_map_and_seq
138
        asd = AlignedSeqsData.from_seqs_and_gaps(
6✔
139
            seqs={seqid: seq_arr},
140
            gaps={seqid: indel_map.array},
141
            alphabet=moltype.most_degen_alphabet(),
142
        )
143

144
        return cls(asd.get_view(seqid), moltype)
6✔
145

146
    def __len__(self) -> int:
6✔
147
        return len(self.map)
6✔
148

149
    @property
6✔
150
    def data(self) -> AlignedDataViewABC:
6✔
151
        return self._data
6✔
152

153
    @property
6✔
154
    def map(self) -> IndelMap:
6✔
155
        return self.data.map
6✔
156

157
    @property
6✔
158
    def seq(self) -> c3_sequence.Sequence:
6✔
159
        """the ungapped sequence."""
160
        # if the slice record has abs(step) > 1, we cannot retain a connection
161
        # to the underlying aligned seq data container because the gaps are
162
        # not going to be modulo the step.
163
        seq: SeqViewABC | NumpyIntArrayType
164
        if self.data.slice_record.plus_step == 1:
6✔
165
            # to complement or not handled by the view
166
            seq = self.data.get_seq_view()
6✔
167
        elif self.data.slice_record.step > 1:
6✔
168
            # we have a step, but no complementing will be required
169
            seq = self.moltype.degap(self.data.gapped_array_value)
6✔
170
        else:
171
            # gapped_array_value gives the reverse of the plus strand
172
            # so we need to complement it. We do that here because with a
173
            # step != 1, we cannot retain a connection to the underlying
174
            # annotations
175
            seq = self.moltype.degap(self.data.gapped_array_value)
6✔
176
            seq = self.moltype.complement(seq)
6✔
177

178
        mt_seq = self.moltype.make_seq(seq=seq, name=self.data.seqid)
6✔
179
        ann_db = self._annotation_db if self.data.slice_record.plus_step == 1 else None
6✔
180
        mt_seq.replace_annotation_db(ann_db)
6✔
181
        return mt_seq
6✔
182

183
    @property
6✔
184
    def gapped_seq(self) -> c3_sequence.Sequence:
6✔
185
        """Returns Sequence object, including gaps."""
186
        seq = self.data.gapped_array_value
6✔
187
        if self.data.slice_record.step < 0:
6✔
188
            seq = self.moltype.complement(seq)
6✔
189
        return self.moltype.make_seq(seq=seq, name=self.data.seqid)
6✔
190

191
    @property
6✔
192
    def moltype(self) -> c3_moltype.MolType[str]:
6✔
193
        return self._moltype
6✔
194

195
    @property
6✔
196
    def name(self) -> str:
6✔
197
        return cast("str", self._name)
6✔
198

199
    def gap_vector(self) -> list[bool]:
6✔
200
        """Returns gap_vector of positions."""
201
        return self.gapped_seq.gap_vector()
6✔
202

203
    def make_feature(
6✔
204
        self, feature: FeatureDataType, alignment: Alignment
205
    ) -> Feature[Alignment]:
206
        """returns a feature, not written into annotation_db"""
207
        annot = self.seq.make_feature(feature)
6✔
208
        inverted = self.map.to_feature_map().inverse()
6✔
209
        return annot.remapped_to(alignment, inverted)
6✔
210

211
    def __repr__(self) -> str:
6✔
212
        seq = f"{str(self)[:7]}... {len(self):,}" if len(self) > 10 else str(self)
6✔
213
        return (
6✔
214
            f"Aligned(name={self.name!r}, seq={seq!r}, moltype={self.moltype.name!r})"
215
        )
216

217
    def __str__(self) -> str:
6✔
218
        return str(self.gapped_seq)
6✔
219

220
    def __array__(
6✔
221
        self,
222
        dtype: numpy.dtype[numpy.integer] | None = None,
223
        copy: bool | None = None,
224
    ) -> NumpyIntArrayType:
225
        return numpy.array(self.gapped_seq, dtype=dtype)
6✔
226

227
    def __bytes__(self) -> bytes:
6✔
228
        return bytes(self.gapped_seq)
6✔
229

230
    def __iter__(self) -> Iterator[str]:
6✔
231
        """Iterates over sequence one motif (e.g. char) at a time, incl. gaps"""
232
        yield from self.gapped_seq
6✔
233

234
    def __getitem__(self, span: int | slice | FeatureMap) -> Aligned:
6✔
235
        if isinstance(span, int):
6✔
236
            span = slice(span, span + 1)
6✔
237
        if isinstance(span, slice):
6✔
238
            return self.__class__(data=self.data[span], moltype=self.moltype)
6✔
239
        if isinstance(span, FeatureMap):
6✔
240
            # we assume the feature map is in align coordinates
241
            data, gaps = self.slice_with_map(span)
6✔
242
            seqid = cast("str", self.data.seqid)
6✔
243
            seqs_data = self.data.parent.from_seqs_and_gaps(
6✔
244
                seqs={seqid: data},
245
                gaps={seqid: gaps},
246
                alphabet=self.moltype.most_degen_alphabet(),
247
            )
248
            view = seqs_data.get_view(seqid)
6✔
249

250
            return Aligned(view, self.moltype)
6✔
251

252
        msg = f"__getitem__ not implemented for {type(span)}"
6✔
253
        raise NotImplementedError(msg)
6✔
254

255
    def slice_with_map(
6✔
256
        self, span: FeatureMap
257
    ) -> tuple[NumpyIntArrayType, NumpyIntArrayType]:
258
        start, end = span.start, span.end
6✔
259
        if span.useful and len(list(span.iter_spans())) == 1:
6✔
260
            im = self.map[start:end]
6✔
261
            seq_start = self.map.get_seq_index(start)
6✔
262
            seq_end = self.map.get_seq_index(end)
6✔
263
            data = self.data.array_value[seq_start:seq_end]
6✔
264
            # .array_value will return the data in the correct orientation
265
            # which means we need to complement it if the data is reversed
266
            data = (
6✔
267
                self.moltype.complement(data)
268
                if self.data.slice_record.is_reversed
269
                else data
270
            )
271
        elif not span.useful:
6✔
272
            im = self.map[start:end]
6✔
273
            data = self.data.array_value[:0]
6✔
274
        else:
275
            # multiple spans
276
            align_coords = span.get_coordinates()
6✔
277
            im = self.map.joined_segments(align_coords)
6✔
278
            seq_map = self.map.make_seq_feature_map(span)
6✔
279
            # self.seq will return the data in the correct orientation
280
            # and will complement it if the data is reversed
281
            data = numpy.array(self.seq.gapped_by_map(seq_map))
6✔
282

283
        gaps = numpy.array([im.gap_pos, im.cum_gap_lengths]).T
6✔
284
        return data, gaps
6✔
285

286
    def parent_coordinates(
6✔
287
        self, seq_coords: bool = False, apply_offset: bool = False
288
    ) -> tuple[str, int, int, int]:
289
        """returns seqid, start, stop, strand on the parent sequence
290

291
        Parameters
292
        ----------
293
        seq_coords
294
            if True, the coordinates for the unaligned sequence
295
        apply_offset
296
            if True and seq_coords, adds annotation offset from parent
297
        """
298
        return self.data.parent_coords(seq_coords=seq_coords, apply_offset=apply_offset)
6✔
299

300
    @extend_docstring_from(c3_sequence.Sequence.annotate_matches_to)
6✔
301
    def annotate_matches_to(
6✔
302
        self,
303
        pattern: str,
304
        biotype: str,
305
        name: str,
306
        allow_multiple: bool = False,
307
    ) -> list[Feature[c3_sequence.Sequence]]:
308
        return self.seq.annotate_matches_to(
6✔
309
            pattern=pattern,
310
            biotype=biotype,
311
            name=name,
312
            allow_multiple=allow_multiple,
313
        )
314

315

316
TSequenceOrAligned = TypeVar("TSequenceOrAligned", c3_sequence.Sequence, Aligned)
6✔
317

318

319
class CollectionBase(AnnotatableMixin, ABC, Generic[TSequenceOrAligned]):
6✔
320
    def __init__(
6✔
321
        self,
322
        *,
323
        seqs_data: SeqsDataABC | AlignedSeqsDataABC,
324
        moltype: c3_moltype.MolType[Any],
325
        info: dict[str, Any] | InfoClass | None = None,
326
        source: PathType | None = None,
327
        annotation_db: SupportsFeatures | list[SupportsFeatures] | None = None,
328
        name_map: Mapping[str, str] | None = None,
329
        is_reversed: bool = False,
330
    ) -> None:
331
        """
332
        Parameters
333
        ----------
334
        seqs_data
335
            a SeqsDataABC or AlignedSeqsDataABC instance containg the sequence data
336
        moltype
337
            the molecular type of the sequences
338
        info
339
            additional information about the collection
340
        source
341
            the source of the sequence data
342
        annotation_db
343
            a database of annotations for the sequences
344
        name_map
345
            map between the names specified in the collection and names used in
346
            the underlying seqs_data. Used for when the names have been changed,
347
            but we want to query for annotations using the original names.
348
        is_reversed
349
            entire collection is reverse complemented
350
        """
351
        self._seqs_data = seqs_data
6✔
352
        self.moltype = moltype
6✔
353
        name_map = name_map or {name: name for name in seqs_data.names}
6✔
354
        self._name_map = types.MappingProxyType(name_map)
6✔
355
        self._is_reversed = is_reversed
6✔
356
        if not isinstance(info, InfoClass):
6✔
357
            info = InfoClass(info) if info else InfoClass()
6✔
358
        self.info = info
6✔
359
        self.source = source
6✔
360
        self._repr_policy: dict[str, Any] = {
6✔
361
            "num_seqs": 10,
362
            "num_pos": 60,
363
            "ref_name": "longest",
364
            "wrap": 60,
365
        }
366
        self._annotation_db: list[SupportsFeatures] = self._init_annot_db_value(
6✔
367
            annotation_db
368
        )
369
        self._seqs: _IndexableSeqs[TSequenceOrAligned]
6✔
370
        self._post_init()
6✔
371

372
    @abstractmethod
373
    def _post_init(self) -> None: ...
374

375
    @abstractmethod
376
    def _get_init_kwargs(self) -> dict[str, Any]: ...
377

378
    @abstractmethod
379
    def __repr__(self) -> str: ...
380

381
    def __getstate__(self) -> dict[str, Any]:
6✔
382
        return self._get_init_kwargs()
6✔
383

384
    def __setstate__(self, state: dict[str, Any]) -> None:
6✔
385
        state["name_map"] = types.MappingProxyType(state.pop("name_map"))
6✔
386
        obj = self.__class__(**state)
6✔
387

388
        self.__dict__.update(obj.__dict__)
6✔
389

390
    def __str__(self) -> str:
6✔
391
        """Returns self in FASTA-format, respecting name order."""
392
        from cogent3.format.sequence import FORMATTERS
6✔
393

394
        return FORMATTERS["fasta"](self.to_dict())
6✔
395

396
    def __eq__(self, other: object) -> bool:
6✔
397
        if not isinstance(other, self.__class__):
6✔
398
            return False
6✔
399
        self_init = self._get_init_kwargs()
6✔
400
        other_init = other._get_init_kwargs()
6✔
401
        for key, self_val in self_init.items():
6✔
402
            if key in ("annotation_db", "slice_record"):
6✔
403
                continue
6✔
404
            other_val = other_init.get(key)
6✔
405
            if self_val != other_val:
6✔
406
                return False
6✔
407
        return True
6✔
408

409
    def __ne__(self, other: object) -> bool:
6✔
410
        return not self == other
6✔
411

412
    @property
413
    @abstractmethod
414
    def modified(self) -> bool: ...
415

416
    @property
6✔
417
    def storage(self) -> SeqsDataABC:
6✔
418
        """the unaligned sequence storage instance of the collection"""
419
        return self._seqs_data
6✔
420

421
    @storage.setter
6✔
422
    def storage(self, value: object) -> None:
6✔
423
        # storage cannot be set after initialisation
424
        msg = "storage cannot be set after initialisation"
6✔
425
        raise TypeError(msg)
6✔
426

427
    @property
6✔
428
    def name_map(self) -> types.MappingProxyType[str, str]:
6✔
429
        """returns mapping of seq names to parent seq names
430

431
        Notes
432
        -----
433
        The underlying SeqsData may have different names for the same
434
        sequences. This object maps the names of sequences in self to
435
        the names of sequences in SeqsData.
436
        MappingProxyType is an immutable mapping, so it cannot be
437
        changed. Use self.renamed_seqs() to do that.
438
        """
439
        return self._name_map
6✔
440

441
    @name_map.setter
6✔
442
    def name_map(self, value: dict[str, str] | None) -> None:  # noqa: ARG002
6✔
443
        """name_map can only be set at initialisation"""
444
        msg = "name_map cannot be set after initialisation"
6✔
445
        raise TypeError(msg)
6✔
446

447
    @property
6✔
448
    def names(self) -> tuple[str, ...]:
6✔
449
        """returns the names of the sequences in the collection"""
450
        return tuple(self._name_map.keys())
6✔
451

452
    @property
6✔
453
    def num_seqs(self) -> int:
6✔
454
        """the number of sequences in the collection"""
455
        return len(self.names)
6✔
456

457
    @property
6✔
458
    def seqs(self) -> _IndexableSeqs[TSequenceOrAligned]:
6✔
459
        """iterable of sequences in the collection
460

461
        Notes
462
        -----
463
        Can be indexed by a sequence name or integer index.
464
        Cannot be sliced.
465

466
        Returns
467
        -------
468
        Instance of ``MolType`` sequence or ``Aligned`` sequence if
469
        self is an ``Alignment``.
470
        """
471
        return self._seqs
6✔
472

473
    @overload
474
    @abstractmethod
475
    def to_dict(self) -> dict[str, str]: ...
476
    @overload
477
    @abstractmethod
478
    def to_dict(self, as_array: Literal[False]) -> dict[str, str]: ...
479
    @overload
480
    @abstractmethod
481
    def to_dict(self, as_array: Literal[True]) -> dict[str, NumpyIntArrayType]: ...
482

483
    @abstractmethod
484
    def to_dict(
485
        self, as_array: bool = False
486
    ) -> dict[str, str] | dict[str, NumpyIntArrayType]: ...
487

488
    @abstractmethod
489
    def to_rich_dict(self) -> dict[str, Any]: ...
490

491
    @classmethod
492
    @abstractmethod
493
    def from_rich_dict(cls, data: dict[str, Any]) -> Self: ...
494

495
    @abstractmethod
496
    def to_html(
497
        self,
498
        name_order: PySeq[str] | None = None,
499
        wrap: int = 60,
500
        limit: int | None = None,
501
        colors: Mapping[str, str] | None = None,
502
        font_size: int = 12,
503
        font_family: str = "Lucida Console",
504
        **kwargs: str,
505
    ) -> str: ...
506

507
    @abstractmethod
508
    def degap(
509
        self, storage_backend: str | None = None, **kwargs: Any
510
    ) -> SequenceCollection: ...
511

512
    @abstractmethod
513
    def get_translation(
514
        self,
515
        gc: c3_genetic_code.GeneticCode | int = 1,
516
        incomplete_ok: bool = False,
517
        include_stop: bool = False,
518
        trim_stop: bool = True,
519
        **kwargs: Any,
520
    ) -> Self: ...
521

522
    @abstractmethod
523
    def trim_stop_codons(
524
        self,
525
        gc: c3_genetic_code.GeneticCode | int = 1,
526
        strict: bool = False,
527
        **kwargs: Any,
528
    ) -> Self: ...
529

530
    @abstractmethod
531
    def rc(self) -> Self: ...
532

533
    @abstractmethod
534
    def distance_matrix(
535
        self, calc: str = "pdist", **kwargs: bool
536
    ) -> DistanceMatrix: ...
537

538
    @abstractmethod
539
    def make_feature(
540
        self,
541
        *,
542
        feature: FeatureDataType,
543
        **kwargs: bool | None,
544
    ) -> Feature[Any]: ...
545

546
    @abstractmethod
547
    def add_feature(
548
        self,
549
        *,
550
        seqid: str | None = None,
551
        biotype: str,
552
        name: str,
553
        spans: list[tuple[int, int]],
554
        parent_id: str | None = None,
555
        strand: str | int = "+",
556
        **kwargs: bool | None,
557
    ) -> Feature[Any]: ...
558

559
    @abstractmethod
560
    def get_features(
561
        self,
562
        *,
563
        seqid: str | Iterator[str] | None = None,
564
        biotype: str | tuple[str, ...] | list[str] | set[str] | None = None,
565
        name: str | None = None,
566
        allow_partial: bool = False,
567
        **kwargs: Any,
568
    ) -> Iterator[Feature[Any]]: ...
569

570
    @abstractmethod
571
    def is_ragged(self) -> bool: ...
572

573
    @abstractmethod
574
    def counts_per_seq(
575
        self,
576
        motif_length: int = 1,
577
        include_ambiguity: bool = False,
578
        allow_gap: bool = False,
579
        exclude_unobserved: bool = False,
580
        warn: bool = False,
581
    ) -> MotifCountsArray | None: ...
582

583
    @abstractmethod
584
    def count_ambiguous_per_seq(self) -> DictArray: ...
585

586
    @abstractmethod
587
    def strand_symmetry(self, motif_length: int = 1) -> dict[str, TestResult]: ...
588

589
    def duplicated_seqs(self) -> list[list[str]]:
6✔
590
        """returns the names of duplicated sequences"""
591
        seq_hashes: defaultdict[str, list[str]] = defaultdict(list)
6✔
592
        for n, n2 in self.name_map.items():
6✔
593
            h = cast("str | None", self.storage.get_hash(n2))
6✔
594
            if h is None:
6✔
595
                cls = self.__class__.__name__
6✔
596
                msg = (
6✔
597
                    f"{cls} has an inconsistent state, no sequence found for name={n!r}"
598
                )
599
                raise RuntimeError(msg)
6✔
600
            seq_hashes[h].append(n)
6✔
601
        return [v for v in seq_hashes.values() if len(v) > 1]
6✔
602

603
    def to_json(self) -> str:
6✔
604
        """returns json formatted string"""
605
        return json.dumps(self.to_rich_dict())
6✔
606

607
    def to_fasta(self, block_size: int = 60) -> str:
6✔
608
        """Return collection in Fasta format.
609

610
        Parameters
611
        ----------
612
        block_size
613
            the sequence length to write to each line,
614
            by default 60
615

616
        Returns
617
        -------
618
        The collection in Fasta format.
619
        """
620
        fasta = c3_plugin.get_seq_format_writer_plugin(format_name="fasta")
6✔
621
        return fasta.formatted(self, block_size=block_size)
6✔
622

623
    def write(
6✔
624
        self, filename: str, format_name: str | None = None, **kwargs: Any
625
    ) -> None:
626
        """Write the sequences to a file, preserving order of sequences.
627

628
        Parameters
629
        ----------
630
        filename
631
            name of the sequence file
632
        format_name
633
            format of the sequence file
634

635
        Notes
636
        -----
637

638
        If format_name is None, will attempt to infer format from the filename
639
        suffix.
640
        """
641

642
        suffix, _ = get_format_suffixes(filename)
6✔
643
        if format_name is None and suffix:
6✔
644
            format_name = suffix
6✔
645

646
        if format_name == "json":
6✔
647
            with atomic_write(filename, mode="wt") as f:
6✔
648
                f.write(self.to_json())
6✔
649
            return
6✔
650

651
        if "order" not in kwargs:
6✔
652
            kwargs["order"] = self.names
6✔
653

654
        writer = c3_plugin.get_seq_format_writer_plugin(
6✔
655
            format_name=format_name,
656
            file_suffix=suffix,
657
            unaligned_seqs=type(self) is SequenceCollection,
658
        )
659
        _ = writer.write(seqcoll=self, path=filename, **kwargs)
6✔
660

661
    def dotplot(
6✔
662
        self,
663
        name1: str | None = None,
664
        name2: str | None = None,
665
        window: int = 20,
666
        threshold: int | None = None,
667
        k: int | None = None,
668
        min_gap: int = 0,
669
        width: int = 500,
670
        title: str | None = None,
671
        rc: bool = False,
672
        biotype: str | tuple[str] | None = None,
673
        show_progress: bool = False,
674
    ) -> Dotplot | AnnotatedDrawable:
675
        """make a dotplot between two sequences.
676

677
        Parameters
678
        ----------
679
        name1, name2
680
            names of sequences -- if not provided, a random choice is made
681
        window
682
            segment size for comparison between sequences
683
        threshold
684
            windows where the sequences are identical >= threshold are a match
685
        k
686
            size of k-mer to break sequences into. Larger values increase
687
            speed but reduce resolution. If not specified, and
688
            window == threshold, then k is set to window. Otherwise, it is
689
            computed as the maximum of {threshold // (window - threshold), 5}.
690
        min_gap
691
            permitted gap for joining adjacent line segments, default is no gap
692
            joining
693
        width
694
            figure width. Figure height is computed based on the ratio of
695
            len(seq1) / len(seq2)
696
        title
697
            title for the plot
698
        rc
699
            include dotplot of reverse compliment also. Only applies to Nucleic
700
            acids moltypes
701
        biotype
702
            if selected sequences are annotated, display only these biotypes
703

704
        Returns
705
        -------
706
        a Drawable or AnnotatedDrawable
707
        """
708
        from cogent3.draw.dotplot import Dotplot
6✔
709
        from cogent3.draw.drawable import AnnotatedDrawable
6✔
710

711
        randgen = numpy.random.default_rng()
6✔
712
        if k is not None and not (0 < k < window):
6✔
713
            msg = f"{k=} must be positive, < {window=} and < {threshold=}"
6✔
714
            raise ValueError(msg)
6✔
715

716
        if len(self.names) == 1:
6✔
717
            name1 = name2 = self.names[0]
6✔
718
        elif name1 is None and name2 is None:
6✔
719
            name1, name2 = cast(
6✔
720
                "tuple[str, str]",
721
                randgen.choice(self.names, size=2, replace=False).tolist(),
722
            )
723
        elif not (name1 and name2):
6✔
724
            names: list[str] = cast(
6✔
725
                "list[str]", list(set[str | None]({*self.names, None}) ^ {name1, name2})
726
            )
727
            name = cast("str", next(iter(randgen.choice(names, size=1))).item())
6✔
728
            name1 = name1 or name
6✔
729
            name2 = name2 or name
6✔
730

731
        if not {name1, name2} <= set(self.names):
6✔
732
            msg = f"{name1}, {name2} missing"
6✔
733
            raise ValueError(msg)
6✔
734

735
        seq1: c3_sequence.Sequence | Aligned = self.seqs[name1]
6✔
736
        seq2: c3_sequence.Sequence | Aligned = self.seqs[name2]
6✔
737
        if not self._annotation_db:
6✔
738
            annotated = False
6✔
739
        else:
740
            annotated = any(
6✔
741
                self.annotation_db.num_matches(seqid=self._name_map[n], biotype=biotype)
742
                for n in [name1, name2]
743
            )
744
        dotplot = Dotplot(
6✔
745
            seq1,
746
            seq2,
747
            isinstance(self, Alignment),
748
            window=window,
749
            threshold=threshold,
750
            k=k,
751
            min_gap=min_gap,
752
            xtitle=None if annotated else seq1.name,
753
            ytitle=None if annotated else seq2.name,
754
            title=title,
755
            moltype=self.moltype,
756
            rc=rc,
757
            show_progress=show_progress,
758
            width=width,
759
        )
760
        if not annotated:
6✔
761
            return dotplot
6✔
762

763
        if isinstance(seq1, Aligned):
6✔
764
            seq1 = seq1.seq
6✔
765
        bottom = seq1.get_drawable(biotype=biotype)
6✔
766
        if isinstance(seq2, Aligned):
6✔
767
            seq2 = seq2.seq
6✔
768
        left = seq2.get_drawable(biotype=biotype, vertical=True)
6✔
769
        return AnnotatedDrawable(
6✔
770
            dotplot,
771
            left_track=left,
772
            bottom_track=bottom,
773
            xtitle=seq1.name,
774
            ytitle=seq2.name,
775
            title=title,
776
            xrange=[0, len(seq1)],
777
            yrange=[0, len(seq2)],
778
        )
779

780
    def iter_seqs(
6✔
781
        self,
782
        seq_order: list[str | int] | None = None,
783
    ) -> Iterator[TSequenceOrAligned]:
784
        """Iterates over sequences in the collection, in order.
785

786
        Parameters
787
        ----------
788
        seq_order:
789
            list of seqids giving the order in which seqs will be returned.
790
            Defaults to self.names
791
        """
792
        if seq_order is None:
6✔
793
            yield from self.seqs
6✔
794
        else:
795
            for name in seq_order:
6✔
796
                yield self.seqs[name]
6✔
797

798
    def take_seqs(
6✔
799
        self,
800
        names: str | PySeq[str],
801
        negate: bool = False,
802
        copy_annotations: bool = False,
803
        **kwargs: Any,
804
    ) -> Self:
805
        """Returns new collection containing only specified seqs.
806

807
        Parameters
808
        ----------
809
        names
810
            sequences to select (or exclude if negate=True)
811
        negate
812
            select all sequences EXCEPT names
813
        kwargs
814
            keyword arguments to be passed to the constructor of the new collection
815
        copy_annotations
816
            if True, only annotations from selected seqs are copied to the annotation_db
817
            of the new collection
818
        """
819

820
        # to return a new collection with a subset of the sequences we dont
821
        # want to modify the underlying data, instead we create a new collection
822
        # with a subset of the names, recorded in the name_map dict.
823

824
        # refactor: design, reimplement on Alignment. on which, if self.array_seqs
825
        # defined, assign result of self._array_seqs.take(subset_name_indices) to
826
        # resulting alignments _array_seqs attribute
827

828
        if isinstance(names, str):
6✔
829
            names = [names]
6✔
830

831
        if negate:
6✔
832
            names = [name for name in self.names if name not in names]
6✔
833

834
        if not names:
6✔
835
            msg = f"{names=} and {negate=} resulted in no names"
6✔
836
            raise ValueError(msg)
6✔
837

838
        if diff := set(names) - set(self.names):
6✔
839
            msg = f"The following provided names not found in collection: {diff}"
6✔
840
            raise ValueError(msg)
6✔
841

842
        selected_name_map = {name: self._name_map[name] for name in names}
6✔
843

844
        init_kwargs = self._get_init_kwargs()
6✔
845
        init_kwargs["name_map"] = selected_name_map
6✔
846
        if self._annotation_db:
6✔
847
            if copy_annotations:
6✔
848
                ann_db = type(self.annotation_db)()
6✔
849
                ann_db.update(
6✔
850
                    annot_db=cast("SqliteAnnotationDbMixin", self.annotation_db),
851
                    seqids=list(selected_name_map),
852
                )
853
            else:
854
                ann_db = self.annotation_db
6✔
855
            init_kwargs["annotation_db"] = ann_db
6✔
856

857
        return self.__class__(**init_kwargs)
6✔
858

859
    def get_seq_names_if(
6✔
860
        self,
861
        f: Callable[[TSequenceOrAligned], bool],
862
        negate: bool = False,
863
    ) -> list[str]:
864
        """Returns list of names of seqs where f(seq) is True.
865

866
        Parameters
867
        ----------
868
        f
869
            function that takes a sequence object and returns True or False
870
        negate
871
            select all sequences EXCEPT those where f(seq) is True
872

873
        Notes
874
        -----
875
        Sequence objects can be converted into strings or numpy arrays using
876
        str() and numpy.array() respectively.
877
        """
878
        get = self.seqs
6✔
879

880
        new_f = negate_condition(f) if negate else f
6✔
881

882
        return [name for name in self.names if new_f(get[name])]
6✔
883

884
    def take_seqs_if(
6✔
885
        self,
886
        f: Callable[[TSequenceOrAligned], bool],
887
        negate: bool = False,
888
    ) -> Self:
889
        """Returns new collection containing seqs where f(seq) is True.
890

891
        Parameters
892
        ----------
893
        f
894
            function that takes a sequence object and returns True or False
895
        negate
896
            select all sequences EXCEPT those where f(seq) is True
897

898
        Notes
899
        -----
900
        Sequence objects can be converted into strings or numpy arrays using
901
        str() and numpy.array() respectively.
902
        """
903
        return self.take_seqs(self.get_seq_names_if(f, negate))
6✔
904

905
    def get_seq(
6✔
906
        self,
907
        seqname: str,
908
        copy_annotations: bool = False,
909
    ) -> c3_sequence.Sequence:
910
        """Return a Sequence object for the specified seqname.
911

912
        Parameters
913
        ----------
914
        seqname
915
            name of the sequence to return
916
        copy_annotations
917
            if True, only the annotations for the specified sequence are copied
918
            to the annotation database of the Sequence object which is decoupled
919
            from this collection. If False, the connection to this collections db
920
            is retained.
921
        """
922
        seq: c3_sequence.Sequence | Aligned = self.seqs[seqname]
6✔
923
        if isinstance(seq, Aligned):
6✔
924
            seq = seq.seq
6✔
925
        if copy_annotations and self._annotation_db:
6✔
926
            # we need to copy the sequence too to break the link to self.annotation_db
927
            seq = seq.copy(exclude_annotations=True)
×
928
            seq.annotation_db = type(self.annotation_db)()
×
929
            seq.annotation_db.update(
×
930
                annot_db=cast("SqliteAnnotationDbMixin", self.annotation_db),
931
                seqids=seqname,
932
            )
933
            return seq
×
934

935
        seq.replace_annotation_db(self._annotation_db, check=False)
6✔
936

937
        return seq
6✔
938

939
    def add_seqs(
6✔
940
        self,
941
        seqs: dict[str, str | bytes | NumpyIntArrayType],
942
        **kwargs: Any,
943
    ) -> Self:
944
        """Returns new collection with additional sequences.
945

946
        Parameters
947
        ----------
948
        seqs
949
            sequences to add
950
        """
951
        assign_names = _SeqNamer()
6✔
952
        named_seqs = cast(
6✔
953
            "dict[str, str | bytes | NumpyIntArrayType]",
954
            _make_name_seq_mapping(seqs, assign_names),
955
        )
956
        name_map = make_name_map(named_seqs)
6✔
957
        data, offsets, _ = prep_for_seqs_data(
6✔
958
            named_seqs,
959
            self.moltype,
960
            assign_names,
961
        )
962

963
        if not name_map:
6✔
964
            name_map = dict(zip(data, data, strict=False))
6✔
965

966
        kwargs["offset"] = offsets
6✔
967
        seqs_data = self._seqs_data.add_seqs(data, **kwargs)
6✔
968
        return self.__class__(
6✔
969
            seqs_data=seqs_data,
970
            moltype=self.moltype,
971
            name_map={**self._name_map, **name_map},
972
            info=self.info,
973
            source=self.source,
974
            annotation_db=self._annotation_db,
975
        )
976

977
    @c3warn.deprecated_callable(
6✔
978
        version="2026.3",
979
        reason="incorrectly implies modifies instance",
980
        new="renamed_seqs",
981
    )
982
    def rename_seqs(self, renamer: Callable[[str], str]) -> Self:
6✔
983
        """.. deprecated:: 2026.3
984
        Use ``renamed_seqs`` instead.
985
        """
986
        return self.renamed_seqs(renamer)
×
987

988
    def renamed_seqs(self, renamer: Callable[[str], str]) -> Self:
6✔
989
        """Returns new collection with renamed sequences.
990

991
        Parameters
992
        ----------
993
        renamer
994
            callable that takes a name string and returns a string
995

996
        Raises
997
        ------
998
        ValueError if renamer produces duplicate names.
999

1000
        Notes
1001
        -----
1002
        The resulting object stores the mapping of new to old names in
1003
        self.name_map.
1004
        """
1005
        new_name_map: dict[str, str] = {}
6✔
1006
        for name, parent_name in self._name_map.items():
6✔
1007
            new_name = renamer(name)
6✔
1008
            # we retain the parent_name when it differs from the name,
1009
            # this can happen after multiple renames on the same collection
1010
            parent_name = parent_name if name != parent_name else name  # noqa: PLW2901
6✔
1011
            new_name_map[new_name] = parent_name
6✔
1012

1013
        if len(new_name_map) != len(self._name_map):
6✔
1014
            msg = f"non-unique names produced by {renamer=}"
6✔
1015
            raise ValueError(msg)
6✔
1016

1017
        init_args = self._get_init_kwargs()
6✔
1018
        init_args["name_map"] = new_name_map
6✔
1019

1020
        return self.__class__(**init_args)
6✔
1021

1022
    def to_moltype(self, moltype: MolTypes) -> Self:
6✔
1023
        """returns copy of self with changed moltype
1024

1025
        Parameters
1026
        ----------
1027
        moltype
1028
            name of the new moltype, e.g, 'dna', 'rna'.
1029

1030
        Notes
1031
        -----
1032
        Cannot convert from nucleic acids to proteins. Use get_translation() for that.
1033

1034
        """
1035
        mtype = c3_moltype.get_moltype(moltype)
6✔
1036
        if mtype is self.moltype:
6✔
1037
            return self  # nothing to be done
6✔
1038

1039
        alpha = mtype.most_degen_alphabet()
6✔
1040
        try:
6✔
1041
            new_seqs_data = self._seqs_data.to_alphabet(alpha)
6✔
1042
        except c3_moltype.MolTypeError as e:
6✔
1043
            msg = f"Failed to convert moltype from {self.moltype.label} to {moltype}"
×
1044
            raise c3_moltype.MolTypeError(
×
1045
                msg,
1046
            ) from e
1047

1048
        init_kwargs = self._get_init_kwargs()
6✔
1049
        init_kwargs["seqs_data"] = new_seqs_data
6✔
1050
        init_kwargs["moltype"] = mtype
6✔
1051

1052
        return self.__class__(**init_kwargs)
6✔
1053

1054
    def to_dna(self) -> Self:
6✔
1055
        """returns copy of self as a collection of DNA moltype seqs"""
1056
        return self.to_moltype("dna")
6✔
1057

1058
    def to_rna(self) -> Self:
6✔
1059
        """returns copy of self as a collection of RNA moltype seqs"""
1060
        return self.to_moltype("rna")
6✔
1061

1062
    def has_terminal_stop(
6✔
1063
        self, gc: c3_genetic_code.GeneticCode | int = 1, strict: bool = False
1064
    ) -> bool:
1065
        """Returns True if any sequence has a terminal stop codon.
1066

1067
        Parameters
1068
        ----------
1069
        gc
1070
            valid input to cogent3.get_code(), a genetic code object, number
1071
            or name
1072
        strict
1073
            If True, raises an exception if a seq length not divisible by 3
1074
        """
1075
        for seq_name in self.names:
6✔
1076
            seq: c3_sequence.Sequence | Aligned = self.seqs[seq_name]
6✔
1077
            if isinstance(seq, Aligned):
6✔
1078
                seq = seq.seq
6✔
1079
            seq = cast("c3_sequence.NucleicAcidSequenceBase", seq)
6✔
1080
            if seq.has_terminal_stop(gc=gc, strict=strict):
6✔
1081
                return True
6✔
1082
        return False
6✔
1083

1084
    def reverse_complement(self) -> Self:
6✔
1085
        """Returns the reverse complement of all sequences in the collection.
1086
        A synonym for rc.
1087
        """
1088
        return self.rc()
6✔
1089

1090
    def copy_annotations(self, seq_db: SupportsFeatures) -> None:
6✔
1091
        """copy annotations into attached annotation db
1092

1093
        Parameters
1094
        ----------
1095
        seq_db
1096
            compatible annotation db
1097

1098
        Notes
1099
        -----
1100
        Only copies annotations for records with seqid in self.names
1101
        """
1102
        if not isinstance(seq_db, SupportsFeatures):
6✔
1103
            msg = f"type {type(seq_db)} does not match SupportsFeatures interface"
6✔
1104
            raise TypeError(
6✔
1105
                msg,
1106
            )
1107

1108
        num = 0
6✔
1109
        for seqid in self.names:
6✔
1110
            num += seq_db.num_matches(seqid=seqid)
6✔
1111
            if num > 0:
6✔
1112
                break
6✔
1113
        else:
1114
            # no matching ID's, nothing to do
1115
            return
6✔
1116

1117
        if not self._annotation_db:
6✔
1118
            self.replace_annotation_db(type(seq_db)())
6✔
1119

1120
        if self.annotation_db.compatible(
6✔
1121
            cast("SqliteAnnotationDbMixin", seq_db), symmetric=False
1122
        ):
1123
            # our db contains the tables in other, so we update in place
1124
            self.annotation_db.update(
6✔
1125
                annot_db=cast("SqliteAnnotationDbMixin", seq_db), seqids=self.names
1126
            )
1127
        else:
1128
            # we use the union method to define a new one
1129
            # the setter handles propagation of the new instance to bound
1130
            # sequences
1131
            self.replace_annotation_db(
6✔
1132
                cast(
1133
                    "SupportsFeatures",
1134
                    self.annotation_db.union(cast("SqliteAnnotationDbMixin", seq_db)),
1135
                ),
1136
                check=False,
1137
            )
1138

1139
    def get_ambiguous_positions(self) -> dict[str, dict[int, str | bytes]]:
6✔
1140
        """Returns dict of seq:{position:char} for ambiguous chars.
1141

1142
        Used in likelihood calculations.
1143
        """
1144
        result: dict[str, dict[int, str | bytes]] = {}
6✔
1145
        alpha = self.moltype.most_degen_alphabet()
6✔
1146
        for name in self.names:
6✔
1147
            ambig: dict[int, str | bytes] = {}
6✔
1148
            result[name] = ambig
6✔
1149
            array = numpy.array(self.seqs[name])
6✔
1150
            for i in numpy.where(array > cast("int", alpha.gap_index))[0]:
6✔
1151
                ambig[i] = alpha[array[i]]
6✔
1152
        return result
6✔
1153

1154
    def counts(
6✔
1155
        self,
1156
        motif_length: int = 1,
1157
        include_ambiguity: bool = False,
1158
        allow_gap: bool = False,
1159
        exclude_unobserved: bool = False,
1160
    ) -> MotifCountsArray | None:
1161
        """counts of motifs
1162

1163
        Parameters
1164
        ----------
1165
        motif_length
1166
            number of elements per character.
1167
        include_ambiguity
1168
            if True, motifs containing ambiguous characters from the seq moltype
1169
            are included. No expansion of those is attempted.
1170
        allow_gap
1171
            if True, motifs containing a gap character are included.
1172
        exclude_unobserved
1173
            if True, unobserved motif combinations are excluded.
1174

1175
        Notes
1176
        -----
1177
        only non-overlapping motifs are counted
1178
        """
1179
        per_seq = self.counts_per_seq(
6✔
1180
            motif_length=motif_length,
1181
            include_ambiguity=include_ambiguity,
1182
            allow_gap=allow_gap,
1183
            exclude_unobserved=exclude_unobserved,
1184
        )
1185
        if per_seq is None:
6✔
1186
            return None
×
1187
        return per_seq.motif_totals()
6✔
1188

1189
    def get_motif_probs(
6✔
1190
        self,
1191
        alphabet: c3_alphabet.CharAlphabet[Any] | None = None,
1192
        include_ambiguity: bool = False,
1193
        exclude_unobserved: bool = False,
1194
        allow_gap: bool = False,
1195
        pseudocount: int = 0,
1196
    ) -> dict[str, float]:  # refactor: using array
1197
        """Return a dictionary of motif probs, calculated as the averaged
1198
        frequency across sequences.
1199

1200
        Parameters
1201
        ----------
1202
        alphabet
1203
            alphabet to use for motifs
1204
        include_ambiguity
1205
            if True resolved ambiguous codes are included in estimation of
1206
            frequencies.
1207
        exclude_unobserved
1208
            if True, motifs that are not present in the alignment are excluded
1209
            from the returned dictionary.
1210
        allow_gap
1211
            allow gap motif
1212
        pseudocount
1213
            value to add to each count
1214

1215
        Notes
1216
        -----
1217

1218
        only non-overlapping motifs are counted
1219
        """
1220
        moltype = self.moltype
6✔
1221
        if alphabet is None:
6✔
1222
            alphabet = moltype.alphabet
6✔
1223
            if allow_gap:
6✔
1224
                alphabet = cast(
6✔
1225
                    "c3_alphabet.CharAlphabet[Any]", moltype.gapped_alphabet
1226
                )
1227

1228
        motif_len = alphabet.motif_len
6✔
1229
        counts: Counter[str] = Counter()
6✔
1230
        for seq_name in self.names:
6✔
1231
            sequence: TSequenceOrAligned | list[str] = self.seqs[seq_name]
6✔
1232
            if motif_len > 1:
6✔
1233
                sequence = [
6✔
1234
                    str(sequence[i : i + motif_len])
1235
                    for i in range(0, len(sequence) + 1 - motif_len, motif_len)
1236
                ]
1237
            for motif in sequence:
6✔
1238
                if not allow_gap and cast("str", self.moltype.gap) in motif:
6✔
1239
                    continue
6✔
1240

1241
                counts[motif] += 1
6✔
1242

1243
        probs: dict[str, float] = {}
6✔
1244
        if not exclude_unobserved:
6✔
1245
            for motif in alphabet:
6✔
1246
                probs[motif] = pseudocount
6✔
1247

1248
        count: float
1249
        for motif, count in counts.items():
6✔
1250
            motif_set = moltype.resolve_ambiguity(motif, alphabet=alphabet)
6✔
1251
            if len(motif_set) > 1:
6✔
1252
                if include_ambiguity:
6✔
1253
                    count = float(count) / len(motif_set)
6✔
1254
                else:
1255
                    continue
6✔
1256
            for motif in motif_set:
6✔
1257
                probs[motif] = probs.get(motif, pseudocount) + count
6✔
1258

1259
        total = float(sum(probs.values()))
6✔
1260
        for motif in probs:
6✔
1261
            probs[motif] /= total
6✔
1262

1263
        return probs
6✔
1264

1265
    def probs_per_seq(
6✔
1266
        self,
1267
        motif_length: int = 1,
1268
        include_ambiguity: bool = False,
1269
        allow_gap: bool = False,
1270
        exclude_unobserved: bool = False,
1271
        warn: bool = False,
1272
    ) -> MotifFreqsArray | None:
1273
        """return frequency array of motifs per sequence
1274

1275
        Parameters
1276
        ----------
1277
        motif_length
1278
            number of characters per motif
1279
        include_ambiguity
1280
            if True, include motifs containing ambiguous characters
1281
            from the seq moltype are included. No expansion of those is attempted.
1282
        allow_gap
1283
            if True, include motifs containing a gap character
1284
        exclude_unobserved
1285
            if True, exclude motifs not present in the sequences in
1286
            the resulting array
1287
        warn
1288
            warns if motif_length > 1 and collection trimmed to produce motif
1289
            columns.
1290
        """
1291

1292
        counts = self.counts_per_seq(
6✔
1293
            motif_length=motif_length,
1294
            include_ambiguity=include_ambiguity,
1295
            allow_gap=allow_gap,
1296
            exclude_unobserved=exclude_unobserved,
1297
            warn=warn,
1298
        )
1299
        return None if counts is None else counts.to_freq_array()
6✔
1300

1301
    def entropy_per_seq(
6✔
1302
        self,
1303
        motif_length: int = 1,
1304
        include_ambiguity: bool = False,
1305
        allow_gap: bool = False,
1306
        exclude_unobserved: bool = True,
1307
        warn: bool = False,
1308
    ) -> NumpyFloatArrayType | None:
1309
        """Returns the Shannon entropy per sequence.
1310

1311
        Parameters
1312
        ----------
1313
        motif_length: int
1314
            number of characters per tuple.
1315
        include_ambiguity: bool
1316
            if True, motifs containing ambiguous characters
1317
            from the seq moltype are included. No expansion of those is attempted.
1318
        allow_gap: bool
1319
            if True, motifs containing a gap character are included.
1320
        exclude_unobserved: bool
1321
            if True, unobserved motif combinations are excluded.
1322
        warn
1323
            warns if motif_length > 1 and alignment trimmed to produce
1324
            motif columns
1325

1326
        Notes
1327
        -----
1328
        For motif_length > 1, it's advisable to specify exclude_unobserved=True,
1329
        this avoids unnecessary calculations.
1330
        """
1331
        probs = self.probs_per_seq(
6✔
1332
            motif_length=motif_length,
1333
            include_ambiguity=include_ambiguity,
1334
            allow_gap=allow_gap,
1335
            exclude_unobserved=exclude_unobserved,
1336
            warn=warn,
1337
        )
1338

1339
        return None if probs is None else probs.entropy()
6✔
1340

1341
    def get_lengths(
6✔
1342
        self,
1343
        include_ambiguity: bool = False,
1344
        allow_gap: bool = False,
1345
    ) -> DictArray:
1346
        """returns sequence lengths as a dict of {seqid: length}
1347

1348
        Parameters
1349
        ----------
1350
        include_ambiguity
1351
            if True, motifs containing ambiguous characters
1352
            from the seq moltype are included. No expansion of those is attempted.
1353
        allow_gap
1354
            if True, motifs containing a gap character are included.
1355

1356
        """
1357
        counts = self.counts_per_seq(
6✔
1358
            motif_length=1,
1359
            include_ambiguity=include_ambiguity,
1360
            allow_gap=allow_gap,
1361
        )
1362
        return cast("MotifCountsArray", counts).row_sum()
6✔
1363

1364
    def pad_seqs(self, pad_length: int | None = None) -> Self:
6✔
1365
        """Returns copy in which sequences are padded with the gap character to same length.
1366

1367
        Parameters
1368
        ----------
1369
        pad_length
1370
            Length all sequences are to be padded to. Will pad to max sequence
1371
            length if pad_length is None or less than max length.
1372
        """
1373

1374
        max_len = max(
6✔
1375
            self._seqs_data.get_seq_length(n) for n in self._name_map.values()
1376
        )
1377

1378
        if pad_length is None or pad_length < max_len:
6✔
1379
            pad_length = max_len
6✔
1380

1381
        padded_seqs: dict[str, NumpyIntArrayType] = {}
6✔
1382
        for seq in self.seqs:
6✔
1383
            padded_seq = numpy.full(
6✔
1384
                shape=pad_length,
1385
                fill_value=cast(
1386
                    "int",
1387
                    cast(
1388
                        "c3_alphabet.CharAlphabet[Any]", self.moltype.gapped_alphabet
1389
                    ).gap_index,
1390
                ),
1391
                dtype=self.moltype.most_degen_alphabet().dtype,
1392
            )
1393
            padded_seq[: len(seq)] = numpy.array(seq)
6✔
1394
            # the padded_seqs dict will be used to create the seqs_data, so the
1395
            # keys should be the seqids from the original seqs_data, if this differs
1396
            # from the seq name, this will be recorded in the name_map
1397
            padded_seqs[self.name_map[cast("str", seq.name)]] = padded_seq
6✔
1398

1399
        init_kwargs = self._get_init_kwargs()
6✔
1400
        # when we access .seqs, if the collection has been reversed, this will have
1401
        # been applied to the returned Sequence, so we reset it to False for the
1402
        # returned collection
1403
        init_kwargs["seqs_data"] = self._seqs_data.from_seqs(
6✔
1404
            data=padded_seqs,
1405
            alphabet=self._seqs_data.alphabet,
1406
            offset=self._seqs_data.offset,
1407
        )
1408
        init_kwargs["is_reversed"] = False
6✔
1409
        return self.__class__(**init_kwargs)
6✔
1410

1411
    def get_identical_sets(
6✔
1412
        self,
1413
        mask_degen: bool = False,
1414
    ) -> list[set[str]]:  # refactor: array/simplify
1415
        """returns sets of names for sequences that are identical
1416

1417
        Parameters
1418
        ----------
1419
        mask_degen
1420
            if True, degenerate characters are ignored
1421

1422
        """
1423

1424
        if self.is_ragged():
6✔
1425
            msg = "not all seqs same length, cannot get identical sets"
6✔
1426
            raise ValueError(msg)
6✔
1427

1428
        if mask_degen and not self.moltype.degen_alphabet:
6✔
1429
            warnings.warn(
6✔
1430
                "in get_identical_sets, mask_degen has no effect as moltype "
1431
                f"{self.moltype.label!r} has no degenerate characters",
1432
                UserWarning,
1433
                stacklevel=2,
1434
            )
1435
            mask_degen = False
6✔
1436

1437
        def reduced(seq: str, indices: list[int]) -> str:
6✔
1438
            return "".join(seq[i] for i in range(len(seq)) if i not in indices)
6✔
1439

1440
        identical_sets: list[set[str]] = []
6✔
1441
        seen: list[str] = []
6✔
1442

1443
        # if strict, we do a sort and one pass through the list
1444
        seqs = self.to_dict()
6✔
1445
        if not mask_degen:
6✔
1446
            seqs_names = [(s, n) for n, s in seqs.items()]
6✔
1447
            seqs_names.sort()
6✔
1448
            matched = None
6✔
1449
            dupes: defaultdict[str, set[str]] = defaultdict(set)
6✔
1450
            for i in range(len(seqs_names) - 1):
6✔
1451
                if seqs_names[i][0] == seqs_names[i + 1][0]:
6✔
1452
                    matched = seqs_names[i][1] if matched is None else matched
6✔
1453
                    dupes[matched].update([seqs_names[i + 1][1], matched])
6✔
1454
                else:
1455
                    matched = None
6✔
1456
            return list(dupes.values())
6✔
1457

1458
        mask_posns = {
6✔
1459
            name: self.moltype.get_degenerate_positions(seq, include_gap=True)
1460
            for name, seq in seqs.items()
1461
        }
1462

1463
        for i in range(len(self.names) - 1):
6✔
1464
            n1 = self.names[i]
6✔
1465
            if n1 in seen:
6✔
1466
                continue
6✔
1467

1468
            seq1 = seqs[n1]
6✔
1469
            group: set[str] = set()
6✔
1470
            for j in range(i + 1, len(self.names)):
6✔
1471
                n2 = self.names[j]
6✔
1472
                if n2 in seen:
6✔
1473
                    continue
×
1474

1475
                seq2 = seqs[n2]
6✔
1476
                pos = mask_posns[n1] + mask_posns[n2]
6✔
1477

1478
                if pos:
6✔
1479
                    seq1 = reduced(seq1, pos)
6✔
1480
                    seq2 = reduced(seq2, pos)
6✔
1481

1482
                if seq1 == seq2:
6✔
1483
                    seen.append(n2)
6✔
1484
                    group.update([n1, n2])
6✔
1485

1486
            if group:
6✔
1487
                identical_sets.append(group)
6✔
1488

1489
        return identical_sets
6✔
1490

1491
    def get_similar(
6✔
1492
        self,
1493
        target: c3_sequence.Sequence | Aligned,
1494
        min_similarity: float = 0.0,
1495
        max_similarity: float = 1.0,
1496
        metric: Callable[
1497
            [c3_sequence.Sequence | Aligned, c3_sequence.Sequence | Aligned],
1498
            float,
1499
        ] = cast(  # noqa: B008
1500
            "Callable[[Any, Any], float]",
1501
            c3_sequence.frac_same,
1502
        ),
1503
        transform: Callable[
1504
            [c3_sequence.Sequence | Aligned], c3_sequence.Sequence | Aligned
1505
        ]
1506
        | None = None,
1507
    ) -> Self:
1508
        """Returns new SequenceCollection containing sequences similar to target.
1509

1510
        Parameters
1511
        ----------
1512
        target
1513
            sequence object to compare to. Can be in the collection.
1514
        min_similarity
1515
            minimum similarity that will be kept. Default 0.0.
1516
        max_similarity
1517
            maximum similarity that will be kept. Default 1.0.
1518
        metric
1519
            a similarity function to use. Must be f(first_seq, second_seq).
1520
            The default metric is fraction similarity, ranging from 0.0 (0%
1521
            identical) to 1.0 (100% identical). The Sequence class have lots
1522
            of methods that can be passed in as unbound methods to act as the
1523
            metric, e.g. frac_same_gaps.
1524
        transform
1525
            transformation function to use on the sequences before the metric
1526
            is calculated. If None, uses the whole sequences in each case. A
1527
            frequent transformation is a function that returns a specified range
1528
            of a sequence, e.g. eliminating the ends. Note that the transform
1529
            applies to both the real sequence and the target sequence.
1530

1531
        Notes
1532
        -----
1533
        both min_similarity and max_similarity are inclusive.
1534

1535
        Warnings
1536
        --------
1537
        if the transformation changes the type of the sequence (e.g. extracting
1538
        a string from an RnaSequence object), distance metrics that depend on
1539
        instance data of the original class may fail.
1540
        """
1541
        if transform:
6✔
1542
            target = transform(target)
6✔
1543

1544
        def m(x: c3_sequence.Sequence | Aligned) -> float:
6✔
1545
            return metric(target, x)
6✔
1546

1547
        if transform:
6✔
1548

1549
            def f(x: c3_sequence.Sequence | Aligned) -> bool:
6✔
1550
                result = m(transform(x))
6✔
1551
                return min_similarity <= result <= max_similarity
6✔
1552

1553
        else:
1554

1555
            def f(x: c3_sequence.Sequence | Aligned) -> bool:  # type: ignore[misc]
6✔
1556
                result = m(x)
6✔
1557
                return min_similarity <= result <= max_similarity
6✔
1558

1559
        return self.take_seqs_if(f)
6✔
1560

1561
    def drop_duplicated_seqs(self) -> Self:
6✔
1562
        """returns self without duplicated sequences
1563

1564
        Notes
1565
        -----
1566
        Retains the first sequence of each duplicte group.
1567
        """
1568
        dupes = self.duplicated_seqs()
6✔
1569
        if not dupes:
6✔
1570
            return self
6✔
1571

1572
        omit: list[str] = []
6✔
1573
        for group in dupes:
6✔
1574
            omit.extend(group[1:])
6✔
1575
        return self.take_seqs(omit, negate=True)
6✔
1576

1577

1578
class SequenceCollection(CollectionBase[c3_sequence.Sequence]):
6✔
1579
    """A container of unaligned sequences.
1580

1581
    Notes
1582
    -----
1583
    Should be constructed using ``make_unaligned_seqs()``.
1584
    """
1585

1586
    def __init__(
6✔
1587
        self,
1588
        *,
1589
        seqs_data: SeqsDataABC,
1590
        moltype: c3_moltype.MolType[Any],
1591
        info: dict[str, Any] | InfoClass | None = None,
1592
        source: PathType | None = None,
1593
        annotation_db: SupportsFeatures | list[SupportsFeatures] | None = None,
1594
        name_map: Mapping[str, str] | None = None,
1595
        is_reversed: bool = False,
1596
    ) -> None:
1597
        """
1598
        Parameters
1599
        ----------
1600
        seqs_data
1601
            a SeqsDataABC or AlignedSeqsDataABC instance containg the sequence data
1602
        moltype
1603
            the molecular type of the sequences
1604
        info
1605
            additional information about the collection
1606
        source
1607
            the source of the sequence data
1608
        annotation_db
1609
            a database of annotations for the sequences
1610
        name_map
1611
            map between the names specified in the collection and names used in
1612
            the underlying seqs_data. Used for when the names have been changed,
1613
            but we want to query for annotations using the original names.
1614
        is_reversed
1615
            entire collection is reverse complemented
1616
        """
1617
        self._seqs_data: SeqsDataABC
6✔
1618
        super().__init__(
6✔
1619
            seqs_data=seqs_data,
1620
            moltype=moltype,
1621
            info=info,
1622
            source=source,
1623
            annotation_db=annotation_db,
1624
            name_map=name_map,
1625
            is_reversed=is_reversed,
1626
        )
1627

1628
    def _post_init(self) -> None:
6✔
1629
        self._seqs: _IndexableSeqs[c3_sequence.Sequence] = _IndexableSeqs(
6✔
1630
            self, make_seq=self._make_seq
1631
        )
1632

1633
    def _get_init_kwargs(self) -> dict[str, Any]:
6✔
1634
        """dict of all the arguments needed to initialise a new instance"""
1635
        # both SequenceCollection and Alignment implement _get_init_kwargs,
1636
        # ensuring methods in SequenceCollection that are inherited by Alignment
1637
        # capture initialisation arguments unique to the subclass.
1638
        # mutable arguments are copied
1639
        return {
6✔
1640
            "seqs_data": self._seqs_data,
1641
            "moltype": self.moltype,
1642
            "name_map": dict(self._name_map),
1643
            "info": self.info.copy(),
1644
            "annotation_db": self._annotation_db,
1645
            "source": self.source,
1646
            "is_reversed": self._is_reversed,
1647
        }
1648

1649
    def _make_seq(self, name: str) -> c3_sequence.Sequence:
6✔
1650
        # seqview is given the name of the parent (if different from the current name)
1651
        # the sequence is given the current name
1652
        seqid = self._name_map.get(name, name)
6✔
1653
        sv = self._seqs_data.get_view(seqid)
6✔
1654
        if self._is_reversed:
6✔
1655
            sv = sv[::-1]
6✔
1656
        return self.moltype.make_seq(
6✔
1657
            seq=sv,
1658
            name=name,
1659
            annotation_db=self._annotation_db,
1660
        )
1661

1662
    def __repr__(self) -> str:
6✔
1663
        seqs: list[str] = []
6✔
1664
        limit = 10
6✔
1665
        delimiter = ""
6✔
1666

1667
        repr_seq_names = [
6✔
1668
            min(
1669
                self.names,
1670
                key=lambda name: self._seqs_data.get_seq_length(self.name_map[name]),
1671
            )
1672
        ]
1673
        if len(self.names) > 1:
6✔
1674
            # In case of a tie, min and max return first.
1675
            # reversed ensures if all seqs are of same length, different seqs are returned
1676
            repr_seq_names.append(
6✔
1677
                max(
1678
                    reversed(self.names),
1679
                    key=lambda name: self._seqs_data.get_seq_length(
1680
                        self.name_map[name]
1681
                    ),
1682
                ),
1683
            )
1684

1685
        for name in repr_seq_names:
6✔
1686
            elts = list(str(self.seqs[name])[: limit + 1])
6✔
1687
            if len(elts) > limit:
6✔
1688
                elts[-1] = "..."
6✔
1689
            seqs.append(f"{name}[{delimiter.join(elts)}]")
6✔
1690

1691
        if len(self.names) > 2:
6✔
1692
            seqs.insert(1, "...")
2✔
1693

1694
        seqs_str = ", ".join(seqs)
6✔
1695

1696
        return f"{len(self.names)}x {self.moltype.label} seqcollection: ({seqs_str})"
6✔
1697

1698
    def _repr_html_(self) -> str:
6✔
1699
        settings = self._repr_policy.copy()
6✔
1700
        env_vals = get_setting_from_environ(
6✔
1701
            "COGENT3_ALIGNMENT_REPR_POLICY",
1702
            {"num_seqs": int, "num_pos": int, "wrap": int},
1703
        )
1704
        settings.update(env_vals)
6✔
1705
        return self.to_html(
6✔
1706
            name_order=self.names[: settings["num_seqs"]],
1707
            limit=settings["num_pos"],
1708
            wrap=settings["wrap"],
1709
        )
1710

1711
    def set_repr_policy(
6✔
1712
        self,
1713
        num_seqs: int | None = None,
1714
        num_pos: int | None = None,
1715
        ref_name: int | None = None,
1716
        wrap: int | None = None,
1717
    ) -> None:
1718
        """specify policy for repr(self)
1719

1720
        Parameters
1721
        ----------
1722
        num_seqs
1723
            number of sequences to include in represented display.
1724
        num_pos
1725
            length of sequences to include in represented display.
1726
        ref_name
1727
            name of sequence to be placed first, or "longest" (default).
1728
            If latter, indicates longest sequence will be chosen.
1729
        wrap
1730
            number of printed bases per row
1731
        """
1732
        if num_seqs:
6✔
1733
            if not isinstance(num_seqs, int):
6✔
1734
                msg = "num_seqs is not an integer"
6✔
1735
                raise TypeError(msg)
6✔
1736
            self._repr_policy["num_seqs"] = num_seqs
6✔
1737

1738
        if num_pos:
6✔
1739
            if not isinstance(num_pos, int):
6✔
1740
                msg = "num_pos is not an integer"
6✔
1741
                raise TypeError(msg)
6✔
1742
            self._repr_policy["num_pos"] = num_pos
6✔
1743

1744
        if ref_name:
6✔
1745
            if not isinstance(ref_name, str):
6✔
1746
                msg = "ref_name is not a string"
×
1747
                raise TypeError(msg)
×
1748

1749
            if ref_name != "longest" and ref_name not in self.names:
6✔
1750
                msg = f"no sequence name matching {ref_name}"
6✔
1751
                raise ValueError(msg)
6✔
1752

1753
            self._repr_policy["ref_name"] = ref_name
6✔
1754

1755
        if wrap:
6✔
1756
            if not isinstance(wrap, int):
6✔
1757
                msg = "wrap is not an integer"
6✔
1758
                raise TypeError(msg)
6✔
1759
            self._repr_policy["wrap"] = wrap
6✔
1760

1761
    @property
6✔
1762
    def modified(self) -> bool:
6✔
1763
        """collection is a modification of underlying storage"""
1764
        return any(
6✔
1765
            [
1766
                self._is_reversed,
1767
                set(self.name_map.values()) != set(self.storage.names),
1768
                set(self.name_map.keys()) != set(self.name_map.values()),
1769
            ]
1770
        )
1771

1772
    @overload
1773
    def to_dict(self) -> dict[str, str]: ...
1774
    @overload
1775
    def to_dict(self, as_array: Literal[False]) -> dict[str, str]: ...
1776
    @overload
1777
    def to_dict(self, as_array: Literal[True]) -> dict[str, NumpyIntArrayType]: ...
1778

1779
    def to_dict(
6✔
1780
        self, as_array: bool = False
1781
    ) -> dict[str, str] | dict[str, NumpyIntArrayType]:
1782
        """Return a dictionary of sequences.
1783

1784
        Parameters
1785
        ----------
1786
        as_array
1787
            if True, sequences are returned as numpy arrays, otherwise as strings
1788
        """
1789

1790
        if as_array:
6✔
1791
            return {cast("str", s.name): numpy.array(s) for s in self.seqs}
×
1792
        return {cast("str", s.name): str(s) for s in self.seqs}
6✔
1793

1794
    def to_rich_dict(self) -> dict[str, Any]:
6✔
1795
        """returns a json serialisable dict
1796

1797
        Notes
1798
        -----
1799
        Deserialisation the object produced by this method will not include the
1800
        annotation_db if present.
1801
        """
1802
        kwargs = self._get_init_kwargs()
6✔
1803
        kwargs.pop("is_reversed", None)  # reversal is realised
6✔
1804
        kwargs["moltype"] = self.moltype.label
6✔
1805
        kwargs.pop("annotation_db", None)
6✔
1806
        kwargs.pop(
6✔
1807
            "offset",
1808
            None,
1809
        )  # no need for offset if we dont have an annotation_db
1810
        kwargs.pop("seqs_data", None)  # we serialise the seqs_data directly
6✔
1811

1812
        data: dict[str, Any] = {
6✔
1813
            "init_args": kwargs,
1814
            "type": get_object_provenance(self),
1815
            "version": __version__,
1816
        }
1817
        data["seqs"] = {self._name_map[cast("str", s.name)]: str(s) for s in self.seqs}
6✔
1818

1819
        return data
6✔
1820

1821
    @classmethod
6✔
1822
    def from_rich_dict(
6✔
1823
        cls,
1824
        data: dict[str, Any],
1825
    ) -> SequenceCollection:
1826
        """returns a new instance from a rich dict"""
1827
        return make_unaligned_seqs(data["seqs"], **data["init_args"])
6✔
1828

1829
    def degap(self, storage_backend: str | None = None, **kwargs: Any) -> Self:
6✔
1830
        """returns collection sequences without gaps or missing characters.
1831

1832
        Parameters
1833
        ----------
1834
        storage_backend
1835
            name of the storage backend to use for the SeqsData object, defaults to
1836
            cogent3 builtin.
1837
        kwargs
1838
            keyword arguments for the storage driver
1839

1840
        Notes
1841
        -----
1842
        The returned collection will not retain an annotation_db if present.
1843
        """
1844
        if storage_backend:
6✔
1845
            make_storage = c3_plugin.get_unaligned_storage_driver(
6✔
1846
                storage_backend,
1847
            ).from_seqs
1848
        else:
1849
            make_storage = self._seqs_data.from_seqs
6✔
1850
        data: dict[str, NumpyIntArrayType] = {}
6✔
1851
        for name in self.names:
6✔
1852
            # because we are in a SequenceCollection, which cannot be sliced, so
1853
            # we can just interrogate the bound _seqs_data directly.
1854
            seq = self._seqs_data.get_seq_array(seqid=self._name_map.get(name, name))
6✔
1855
            data[self.name_map[name]] = self.moltype.degap(seq)
6✔
1856

1857
        init_kwargs = self._get_init_kwargs()
6✔
1858
        init_kwargs["seqs_data"] = make_storage(
6✔
1859
            data=data,
1860
            alphabet=self._seqs_data.alphabet,
1861
            check=False,
1862
            **kwargs,
1863
        )
1864

1865
        return self.__class__(**init_kwargs)
6✔
1866

1867
    def to_html(
6✔
1868
        self,
1869
        name_order: PySeq[str] | None = None,
1870
        wrap: int = 60,
1871
        limit: int | None = None,
1872
        colors: Mapping[str, str] | None = None,
1873
        font_size: int = 12,
1874
        font_family: str = "Lucida Console",
1875
        **kwargs: str,
1876
    ) -> str:
1877
        """returns html with embedded styles for sequence colouring
1878

1879
        Parameters
1880
        ----------
1881
        name_order
1882
            order of names for display.
1883
        wrap
1884
            number of columns per row
1885
        limit
1886
            truncate view of collection to this length
1887
        colors
1888
            {character
1889
            moltype.
1890
        font_size
1891
            in points. Affects labels and sequence and line spacing
1892
            (proportional to value)
1893
        font_family
1894
            string denoting font family
1895

1896
        Examples
1897
        --------
1898

1899
        In a jupyter notebook, this code is used to provide the representation.
1900

1901
        .. code-block:: python
1902

1903
            seq_col  # is rendered by jupyter
1904

1905
        You can directly use the result for display in a notebook as
1906

1907
        .. code-block:: python
1908

1909
            from IPython.core.display import HTML
1910

1911
            HTML(seq_col.to_html())
1912
        """
1913
        css, styles = self.moltype.get_css_style(
6✔
1914
            colors=colors,
1915
            font_size=font_size,
1916
            font_family=font_family,
1917
        )
1918

1919
        seq_lengths = numpy.array(
6✔
1920
            [self._seqs_data.get_seq_length(n) for n in self._name_map.values()],
1921
        )
1922
        min_val = seq_lengths.min()
6✔
1923
        max_val = seq_lengths.max()
6✔
1924
        med_val = numpy.median(seq_lengths)
6✔
1925

1926
        if name_order:
6✔
1927
            selected = self.take_seqs(name_order)
6✔
1928
        else:
1929
            name_order = self.names
6✔
1930
            selected = self
6✔
1931

1932
        # Stylise each character in each sequence
1933
        gaps = "".join(
6✔
1934
            frozenset(
1935
                [
1936
                    cast("str", selected.moltype.gap),
1937
                    cast("str", selected.moltype.missing),
1938
                ]
1939
            )
1940
        )
1941
        template = '<span class="%s">%%s</span>'
6✔
1942
        styled_seqs: defaultdict[str, list[str]] = defaultdict(list)
6✔
1943
        max_truncated_len = 0
6✔
1944
        for name in name_order:
6✔
1945
            sequence = str(self.seqs[name])[:limit]
6✔
1946
            seq_len = len(sequence)
6✔
1947
            max_truncated_len = max(seq_len, max_truncated_len)
6✔
1948
            start_gap = re.search(f"^[{gaps}]+", sequence)
6✔
1949
            end_gap = re.search(f"[{gaps}]+$", sequence)
6✔
1950
            start = 0 if start_gap is None else start_gap.end()
6✔
1951
            end = seq_len if end_gap is None else end_gap.start()
6✔
1952

1953
            seq: list[str] = []
6✔
1954
            for i, char in enumerate(sequence):
6✔
1955
                if i < start or i >= end:
6✔
1956
                    style = f"terminal_ambig_{self.moltype.label}"
6✔
1957
                else:
1958
                    style = styles[char]
6✔
1959
                s = template % style
6✔
1960
                s = s % char
6✔
1961
                seq.append(s)
6✔
1962

1963
            styled_seqs[name] = seq
6✔
1964

1965
        # Ensure all sublists are of same length
1966
        for name in styled_seqs:
6✔
1967
            if len(styled_seqs[name]) < max_truncated_len:
6✔
1968
                styled_seqs[name].extend(
6✔
1969
                    [""] * (max_truncated_len - len(styled_seqs[name])),
1970
                )
1971

1972
        # Make html table
1973
        seqs = numpy.array([styled_seqs[n] for n in name_order], dtype="O")
6✔
1974
        table = ["<table>"]
6✔
1975
        seq_ = "<td>%s</td>"
6✔
1976
        label_ = '<td class="label">%s</td>'
6✔
1977
        num_row_ = '<tr class="num_row"><td></td><td><b>{:,d}</b></td></tr>'
6✔
1978
        for i in range(0, max_truncated_len, wrap):
6✔
1979
            table.append(num_row_.format(i))
6✔
1980
            seqblock = seqs[:, i : i + wrap].tolist()
6✔
1981
            for n, s in zip(name_order, seqblock, strict=False):
6✔
1982
                s = "".join(s)
6✔
1983
                # Filter out rows that are empty (due to combination of shorter sequences + wrapping)
1984
                if s != "":
6✔
1985
                    row = "".join([label_ % n, seq_ % s])
6✔
1986
                    table.append(f"<tr>{row}</tr>")
6✔
1987
        table.append("</table>")
6✔
1988
        if (limit and limit < len(selected.names)) or (
6✔
1989
            name_order and len(name_order) < len(selected.names)
1990
        ):
1991
            summary = f"{self.num_seqs} x {{min={min_val}, median={med_val}, max={max_val}}} (truncated to {len(name_order) if name_order else selected.num_seqs} x {limit}) {selected.moltype.label} sequence collection"  # type: ignore[arg-type]
×
1992
        else:
1993
            summary = f"{self.num_seqs} x {{min={min_val}, median={med_val}, max={max_val}}} {selected.moltype.label} sequence collection"
6✔
1994

1995
        text = [
6✔
1996
            "<style>",
1997
            ".c3align table {margin: 10px 0;}",
1998
            ".c3align td { border: none !important; text-align: left !important; }",
1999
            ".c3align tr:not(.num_row) td span {margin: 0 2px;}",
2000
            ".c3align tr:nth-child(even) {background: #f7f7f7;}",
2001
            ".c3align .num_row {background-color:rgba(161, 195, 209, 0.5) !important; border-top: solid 1px black; }",
2002
            ".c3align .label { font-size: %dpt ; text-align: right !important; "
2003
            "color: black !important; padding: 0 4px; display: table-cell !important; "
2004
            "font-weight: normal !important; }" % font_size,
2005
            "\n".join([".c3align " + style for style in css]),
2006
            "</style>",
2007
            '<div class="c3align">',
2008
            "\n".join(table),
2009
            f"<p><i>{summary}</i></p>",
2010
            "</div>",
2011
        ]
2012
        return "\n".join(text)
6✔
2013

2014
    def get_translation(
6✔
2015
        self,
2016
        gc: c3_genetic_code.GeneticCode | int = 1,
2017
        incomplete_ok: bool = False,
2018
        include_stop: bool = False,
2019
        trim_stop: bool = True,
2020
        **kwargs: Any,
2021
    ) -> Self:
2022
        """translate sequences from nucleic acid to protein
2023

2024
        Parameters
2025
        ----------
2026
        gc
2027
            genetic code, either the number or name
2028
            (use cogent3.core.genetic_code.available_codes)
2029
        incomplete_ok
2030
            codons that are mixes of nucleotide and gaps converted to '?'.
2031
            raises a ValueError if False
2032
        include_stop
2033
            whether to allow a stops in the translated sequence
2034
        trim_stop
2035
            exclude terminal stop codons if they exist
2036
        kwargs
2037
            related to construction of the resulting object
2038

2039
        Returns
2040
        -------
2041
        A new instance of self translated into protein
2042

2043
        Notes
2044
        -----
2045
        Translating will break the relationship to an annotation_db if present.
2046
        """
2047
        if not self.moltype.is_nucleic:
6✔
2048
            msg = f"moltype must be a DNA/RNA, not {self.moltype.name!r}"
6✔
2049
            raise c3_moltype.MolTypeError(
6✔
2050
                msg,
2051
            )
2052

2053
        translated: dict[str, NumpyIntArrayType] = {}
6✔
2054
        for seq in self.seqs:
6✔
2055
            seq = cast("c3_sequence.NucleicAcidSequenceBase", seq)
6✔
2056
            pep = seq.get_translation(
6✔
2057
                gc,
2058
                incomplete_ok=incomplete_ok,
2059
                include_stop=include_stop,
2060
                trim_stop=trim_stop,
2061
            )
2062
            translated[self.name_map[cast("str", seq.name)]] = numpy.array(pep)
6✔
2063

2064
        pep_moltype = c3_moltype.get_moltype(
6✔
2065
            "protein_with_stop" if include_stop else "protein",
2066
        )
2067
        seqs_data = self._seqs_data.from_seqs(
6✔
2068
            data=translated,
2069
            alphabet=pep_moltype.most_degen_alphabet(),
2070
        )
2071
        return self.__class__(
6✔
2072
            seqs_data=seqs_data,
2073
            moltype=pep_moltype,
2074
            name_map=self._name_map,
2075
            info=self.info,
2076
            source=self.source,
2077
            **kwargs,
2078
        )
2079

2080
    def trim_stop_codons(
6✔
2081
        self,
2082
        gc: c3_genetic_code.GeneticCode | int = 1,
2083
        strict: bool = False,
2084
        **kwargs: Any,
2085
    ) -> Self:
2086
        """Removes any terminal stop codons from the sequences
2087

2088
        Parameters
2089
        ----------
2090
        gc
2091
            valid input to cogent3.get_code(), a genetic code object, number
2092
            or name, defaults to standard code
2093
        strict
2094
            If True, raises an exception if a seq length not divisible by 3
2095
        """
2096
        if not self.has_terminal_stop(gc=gc, strict=strict):
6✔
2097
            return self
6✔
2098

2099
        new_seqs = {
6✔
2100
            self.name_map[cast("str", s.name)]: s.trim_stop_codon(
2101
                gc=gc, strict=strict
2102
            ).to_array(
2103
                apply_transforms=False,
2104
            )
2105
            for s in cast("Iterable[c3_sequence.NucleicAcidSequenceBase]", self.seqs)
2106
        }
2107

2108
        init_kwargs = self._get_init_kwargs()
6✔
2109
        init_kwargs["seqs_data"] = self._seqs_data.from_seqs(
6✔
2110
            data=new_seqs,
2111
            alphabet=self._seqs_data.alphabet,
2112
            offset=self._seqs_data.offset,
2113
            check=False,
2114
        )
2115
        init_kwargs["annotation_db"] = self._annotation_db
6✔
2116
        return self.__class__(**init_kwargs)
6✔
2117

2118
    def rc(self) -> Self:
6✔
2119
        """Returns the reverse complement of all sequences in the collection.
2120
        A synonym for reverse_complement.
2121
        """
2122
        init_kwargs = self._get_init_kwargs()
6✔
2123
        init_kwargs["is_reversed"] = not self._is_reversed
6✔
2124
        return self.__class__(**init_kwargs)
6✔
2125

2126
    @UI.display_wrap
6✔
2127
    def apply_pssm(
6✔
2128
        self,
2129
        pssm: PSSM | None = None,
2130
        path: str | None = None,
2131
        background: NumpyFloatArrayType | None = None,
2132
        pseudocount: int = 0,
2133
        names: list[str] | str | None = None,
2134
        ui: UI.ProgressContext | None = None,
2135
    ) -> NumpyFloatArrayType:  # refactor: design: move to rich for progress bars?
2136
        """scores sequences using the specified pssm
2137

2138
        Parameters
2139
        ----------
2140
        pssm :
2141
            A profile.PSSM instance, if not provided, will be loaded from path
2142
        path
2143
            path to either a jaspar or cisbp matrix (path must end have a suffix
2144
            matching the format).
2145
        background
2146
            background frequencies distribution
2147
        pseudocount
2148
            adjustment for zero in matrix
2149
        names
2150
            returns only scores for these sequences and in the name order
2151

2152
        Returns
2153
        -------
2154
        numpy array of log2 based scores at every position
2155
        """
2156
        assert not self.is_ragged(), "all sequences must have same length"
6✔
2157
        assert pssm or path, "Must specify a PSSM or a path"
6✔
2158
        assert not (pssm and path), "Can only specify one of pssm, path"
6✔
2159

2160
        if isinstance(names, str):
6✔
2161
            names = [names]
6✔
2162

2163
        if path:
6✔
2164
            pssm = load_pssm(path, background=background, pseudocount=pseudocount)
6✔
2165

2166
        pssm = cast("PSSM", pssm)
6✔
2167
        assert set(pssm.motifs) == set(self.moltype)
6✔
2168

2169
        seqs = [self.seqs[n] for n in names] if names else self.seqs
6✔
2170
        result = [
6✔
2171
            pssm.score_seq(seq) for seq in cast("UI.ProgressContext", ui).series(seqs)
2172
        ]
2173

2174
        return numpy.array(result)
6✔
2175

2176
    def distance_matrix(self, calc: str = "pdist", **kwargs: bool) -> DistanceMatrix:
6✔
2177
        """Estimated pairwise distance between sequences
2178

2179
        Parameters
2180
        ----------
2181
        calc
2182
            The distance calculation method to use, either "pdist" or "jc69".
2183
            - "pdist" is an approximation of the proportion sites different.
2184
            - "jc69" is an approximation of the Jukes Cantor distance.
2185

2186
        Returns
2187
        -------
2188
        DistanceMatrix
2189
            Estimated pairwise distances between sequences in the collection
2190

2191
        Notes
2192
        -----
2193
        pdist approximates the proportion sites different from the Jaccard
2194
        distance. Coefficients for the approximation were derived from a
2195
        polynomial fit between Jaccard distance of kmers with k=10 and the
2196
        proportion of sites different using mammalian 106 protein coding
2197
        gene DNA sequence alignments.
2198

2199
        jc69 approximates the Jukes Cantor distance using the approximated
2200
        proportion sites different, i.e., a transformation of the above.
2201
        """
2202
        from cogent3.app.dist import get_approx_dist_calc
6✔
2203

2204
        # check moltype
2205
        if len(self.moltype.alphabet) != 4:
6✔
2206
            msg = "only defined for DNA/RNA molecular types"
6✔
2207
            raise NotImplementedError(msg)
6✔
2208

2209
        # assert we have more than one sequence in the SequenceCollection
2210
        if self.num_seqs == 1:
6✔
2211
            msg = (
6✔
2212
                "Pairwise distance cannot be computed for a single sequence. "
2213
                "Please provide at least two sequences."
2214
            )
2215
            raise ValueError(
6✔
2216
                msg,
2217
            )
2218

2219
        dist_calc_app = get_approx_dist_calc(
6✔
2220
            dist=calc,
2221
            num_states=len(self.moltype.alphabet),
2222
        )
2223

2224
        return dist_calc_app(self)
6✔
2225

2226
    def make_feature(
6✔
2227
        self,
2228
        *,
2229
        feature: FeatureDataType,
2230
        **kwargs: bool | None,
2231
    ) -> Feature[c3_sequence.Sequence]:
2232
        """
2233
        create a feature on named sequence, or on the collection itself
2234

2235
        Parameters
2236
        ----------
2237
        feature
2238
            a dict with all the necessary data to construct a feature
2239

2240
        Returns
2241
        -------
2242
        Feature
2243

2244
        Notes
2245
        -----
2246
        To get a feature AND add it to annotation_db, use add_feature().
2247
        """
2248
        return self.seqs[feature["seqid"]].make_feature(feature)
6✔
2249

2250
    def add_feature(
6✔
2251
        self,
2252
        *,
2253
        seqid: str | None = None,
2254
        biotype: str,
2255
        name: str,
2256
        spans: list[tuple[int, int]],
2257
        parent_id: str | None = None,
2258
        strand: str | int = "+",
2259
        **kwargs: bool | None,
2260
    ) -> Feature[c3_sequence.Sequence]:
2261
        """
2262
        add feature on named sequence
2263

2264
        Parameters
2265
        ----------
2266
        seqid
2267
            seq name to associate with
2268
        parent_id
2269
            name of the parent feature
2270
        biotype
2271
            biological type
2272
        name
2273
            feature name
2274
        spans
2275
            plus strand coordinates
2276
        strand
2277
            either '+' or '-'
2278

2279
        Returns
2280
        -------
2281
        Feature
2282
        """
2283
        if seqid and seqid not in self.names:
6✔
2284
            msg = f"unknown {seqid=}"
6✔
2285
            raise ValueError(msg)
6✔
2286
        del kwargs
6✔
2287

2288
        feature = {k: v for k, v in locals().items() if k != "self"}
6✔
2289
        feature["strand"] = Strand.from_value(strand).value
6✔
2290
        # property ensures db is created
2291
        self.annotation_db.add_feature(**feature)
6✔
2292
        feature.pop("parent_id", None)
6✔
2293
        return self.make_feature(feature=cast("FeatureDataType", feature))
6✔
2294

2295
    def get_features(
6✔
2296
        self,
2297
        *,
2298
        seqid: str | Iterator[str] | None = None,
2299
        biotype: str | tuple[str, ...] | list[str] | set[str] | None = None,
2300
        name: str | None = None,
2301
        allow_partial: bool = False,
2302
        start: int | None = None,
2303
        stop: int | None = None,
2304
        **kwargs: Any,
2305
    ) -> Iterator[Feature[c3_sequence.Sequence]]:
2306
        """yields Feature instances
2307

2308
        Parameters
2309
        ----------
2310
        seqid
2311
            limit search to features on this named sequence, defaults to search all
2312
        biotype
2313
            biotype of the feature, e.g. CDS, gene
2314
        name
2315
            name of the feature
2316
        start
2317
            start position of the feature (not inclusive)
2318
        stop
2319
            stop position of the feature (inclusive)
2320
        allow_partial
2321
            allow features partially overlaping self
2322
        kwargs
2323
            additional keyword arguments to query the annotation db
2324

2325
        Notes
2326
        -----
2327
        - When dealing with a nucleic acid moltype, the returned features will
2328
        yield a sequence segment that is consistently oriented irrespective
2329
        of strand of the current instance.
2330
        - start is non-inclusive, so if allow_partial is False, only features
2331
        strictly starting after start will be returned.
2332
        """
2333

2334
        if not self._annotation_db:
6✔
2335
            return None
6✔
2336

2337
        if seqid and (seqid not in self.names):
6✔
2338
            msg = f"unknown {seqid=}"
6✔
2339
            raise ValueError(msg)
6✔
2340

2341
        if seqid is None:
6✔
2342
            # if no seqids provided, we do direct search to find the seqids
2343
            # that match the other parameters. This matters for collections
2344
            # with large numbers of sequences due to the overhead of creating
2345
            # Sequence instances
2346
            matched = {
6✔
2347
                record["seqid"]
2348
                for record in self.annotation_db.get_features_matching(
2349
                    biotype=biotype,
2350
                    name=name,
2351
                    start=start,
2352
                    stop=stop,
2353
                    allow_partial=allow_partial,
2354
                    **kwargs,
2355
                )
2356
            }
2357
            seqids = sorted(matched)
6✔
2358
        else:
2359
            seqids = cast(
6✔
2360
                "list[str]", [seqid] if isinstance(seqid, str) else self.names
2361
            )
2362

2363
        for seqid in seqids:
6✔
2364
            seq = self.seqs[seqid]
6✔
2365
            yield from seq.get_features(
6✔
2366
                biotype=biotype,
2367
                name=name,
2368
                start=start,
2369
                stop=stop,
2370
                allow_partial=allow_partial,
2371
                **kwargs,
2372
            )
2373

2374
    def is_ragged(self) -> bool:
6✔
2375
        """rerturns True if sequences are of different lengths"""
2376
        return (
6✔
2377
            len({self._seqs_data.get_seq_length(n) for n in self._name_map.values()})
2378
            > 1
2379
        )
2380

2381
    def count_kmers(
6✔
2382
        self,
2383
        k: int = 1,
2384
        use_hook: str | None = None,
2385
        **kwargs,  # noqa: ANN003
2386
    ) -> NumpyIntArrayType:
2387
        """return kmer counts for each sequence
2388

2389
        Parameters
2390
        ----------
2391
        k
2392
            length of kmers to count
2393
        use_hook
2394
            name of a third-party package that implements the quick_tree
2395
            hook. If not specified, defaults to the first available hook or
2396
            the cogent3 quick_tree() app. To force default, set
2397
            use_hook="cogent3".
2398
        **kwargs
2399
            additional arguments to pass to the hook app constructor
2400

2401
        Notes
2402
        -----
2403
        Only states in moltype.alphabet are allowed in a kmer (the canonical
2404
        states). If using cogent3, to get the order of kmers as strings, use
2405
        self.moltype.alphabet.get_kmer_alphabet(k). See the documentation
2406
        for details of any third-party hooks.
2407

2408
        Returns
2409
        -------
2410
        numpy array of shape (num_seqs, num_kmers)
2411
        0th axis is in the order of self.names, 1st axis is in the order
2412
        of the kmer alphabet.
2413
        """
2414
        from cogent3._plugin import get_count_kmers_hook
6✔
2415

2416
        kwargs = {"k": k, **kwargs}
6✔
2417

2418
        app = get_count_kmers_hook(name=use_hook, **kwargs)
6✔
2419
        if app is not None:
6✔
2420
            # we call main method directly so errors are raised here
2421
            return app.main(list(self.seqs))  # type: ignore[attr-defined]
×
2422

2423
        shape = (self.num_seqs, len(self.moltype.alphabet) ** k)
6✔
2424
        result = numpy.empty(shape, dtype=int)
6✔
2425
        for i, name in enumerate(self.names):
6✔
2426
            # following may be made more efficient by directly reading from storage
2427
            # but this is simpler and handles strand transformations etc
2428
            seq = numpy.array(self.seqs[name])
6✔
2429
            result[i, :] = c3_sequence.count_kmers(seq, len(self.moltype.alphabet), k)
6✔
2430
        return result
6✔
2431

2432
    def counts_per_seq(
6✔
2433
        self,
2434
        motif_length: int = 1,
2435
        include_ambiguity: bool = False,
2436
        allow_gap: bool = False,
2437
        exclude_unobserved: bool = False,
2438
        warn: bool = False,
2439
    ) -> MotifCountsArray | None:  # refactor: using array
2440
        """counts of motifs per sequence
2441

2442
        Parameters
2443
        ----------
2444
        motif_length
2445
            number of characters per tuple.
2446
        include_ambiguity
2447
            if True, motifs containing ambiguous characters from the seq moltype
2448
            are included. No expansion of those is attempted.
2449
        allow_gap
2450
            if True, motifs containing a gap character are included.
2451
        warn
2452
            warns if motif_length > 1 and collection trimmed to produce motif
2453
            columns.
2454

2455
        Notes
2456
        -----
2457

2458
        only non-overlapping motifs are counted
2459
        """
2460
        cat_counts: list[CategoryCounter[str | bytes]] = []
6✔
2461
        motifs_set: set[str | bytes] = set()
6✔
2462
        for name in self.names:
6✔
2463
            seq = self.get_seq(name)
6✔
2464
            c = seq.counts(
6✔
2465
                motif_length=motif_length,
2466
                include_ambiguity=include_ambiguity,
2467
                allow_gap=allow_gap,
2468
                warn=warn,
2469
            )
2470
            motifs_set.update(c.keys())
6✔
2471
            cat_counts.append(c)
6✔
2472
        # use motifs from moltype if empty sequences
2473
        motifs = sorted(motifs_set) or sorted(self.moltype)
6✔
2474

2475
        if not motifs:
6✔
2476
            return None
×
2477

2478
        counts = [c.tolist(motifs) for c in cat_counts]
6✔
2479
        return MotifCountsArray(counts, motifs, row_indices=self.names)
6✔
2480

2481
    def count_ambiguous_per_seq(self) -> DictArray:
6✔
2482
        """Counts of ambiguous characters per sequence."""
2483

2484
        darr = DictArrayTemplate(self.names)
6✔
2485
        counts = numpy.array(
6✔
2486
            [self.seqs[name].count_ambiguous() for name in self.names],
2487
            dtype=numpy.uint32,
2488
        )
2489

2490
        return darr.wrap(counts)
6✔
2491

2492
    def strand_symmetry(self, motif_length: int = 1) -> dict[str, TestResult]:
6✔
2493
        """returns dict of strand symmetry test results per seq"""
2494
        return {
6✔
2495
            cast("str", s.name): s.strand_symmetry(motif_length=motif_length)
2496
            for s in cast("Iterable[c3_sequence.NucleicAcidSequenceBase]", self.seqs)
2497
        }
2498

2499

2500
class Alignment(CollectionBase[Aligned]):
6✔
2501
    """A collection of aligned sequences.
2502

2503
    Notes
2504
    -----
2505
    Should be constructed using ``make_aligned_seqs()``.
2506
    """
2507

2508
    def __init__(
6✔
2509
        self,
2510
        seqs_data: AlignedSeqsDataABC,
2511
        slice_record: SliceRecord | None = None,
2512
        **kwargs: Any,
2513
    ) -> None:
2514
        self._seqs_data: AlignedSeqsDataABC
6✔
2515
        super().__init__(seqs_data=seqs_data, **kwargs)
6✔
2516
        self._slice_record = (
6✔
2517
            slice_record
2518
            if slice_record is not None
2519
            else SliceRecord(parent_len=self._seqs_data.align_len)
2520
        )
2521
        self._array_seqs: NumpyIntArrayType | None = None
6✔
2522

2523
    def _post_init(self) -> None:
6✔
2524
        self._seqs: _IndexableSeqs[Aligned] = _IndexableSeqs(
6✔
2525
            self, make_seq=self._make_aligned
2526
        )
2527

2528
    def _get_init_kwargs(self) -> dict[str, Any]:
6✔
2529
        """returns the kwargs needed to re-instantiate the object"""
2530
        return {
6✔
2531
            "seqs_data": self._seqs_data,
2532
            "moltype": self.moltype,
2533
            "name_map": dict(self._name_map),
2534
            "info": self.info.copy(),
2535
            "annotation_db": self._annotation_db,
2536
            "slice_record": self._slice_record,
2537
            "source": self.source,
2538
        }
2539

2540
    def _make_aligned(self, seqid: str) -> Aligned:
6✔
2541
        adv = self._seqs_data.get_view(
6✔
2542
            self._name_map.get(seqid, seqid),
2543
            slice_record=self._slice_record,
2544
        )
2545
        aligned = Aligned(data=adv, moltype=self.moltype, name=seqid)
6✔
2546
        aligned.replace_annotation_db(self._annotation_db, check=False)
6✔
2547
        return aligned
6✔
2548

2549
    def _mapped(self, slicemap: FeatureMap) -> Self:
6✔
2550
        seqs: dict[str, NumpyIntArrayType] = {}
6✔
2551
        maps: dict[str, NumpyIntArrayType] = {}
6✔
2552
        for aligned in self.seqs:
6✔
2553
            seq, im = aligned.slice_with_map(slicemap)
6✔
2554
            name = self.name_map[aligned.name]
6✔
2555
            seqs[name] = seq
6✔
2556
            maps[name] = im
6✔
2557

2558
        data = self._seqs_data.from_seqs_and_gaps(
6✔
2559
            seqs=seqs,
2560
            gaps=maps,
2561
            alphabet=self.moltype.most_degen_alphabet(),
2562
        )
2563
        kwargs = self._get_init_kwargs()
6✔
2564
        kwargs["seqs_data"] = data
6✔
2565
        kwargs.pop("slice_record", None)
6✔
2566
        kwargs.pop("annotation_db", None)
6✔
2567
        return self.__class__(**kwargs)
6✔
2568

2569
    def __array__(
6✔
2570
        self,
2571
        dtype: numpy.dtype[numpy.integer] | None = None,
2572
        copy: bool | None = None,
2573
    ) -> NumpyIntArrayType:
2574
        return self.array_seqs
6✔
2575

2576
    def __eq__(self, other: object) -> bool:
6✔
2577
        return (
6✔
2578
            super().__eq__(other)
2579
            and self._slice_record == cast("Alignment", other)._slice_record
2580
        )
2581

2582
    def __len__(self) -> int:
6✔
2583
        return len(self._slice_record)
6✔
2584

2585
    @overload
2586
    def __getitem__(
2587
        self, index: int | slice | FeatureMap | Feature[Alignment]
2588
    ) -> Alignment: ...
2589
    @overload
2590
    def __getitem__(self, index: str) -> Aligned: ...
2591

2592
    def __getitem__(
6✔
2593
        self, index: str | int | slice | FeatureMap | Feature[Alignment]
2594
    ) -> Aligned | Alignment:
2595
        if isinstance(index, str):
6✔
2596
            return self.seqs[index]
6✔
2597
        if isinstance(index, int):
6✔
2598
            new_slice = self._slice_record[index]
6✔
2599
            kwargs = self._get_init_kwargs()
6✔
2600
            kwargs["slice_record"] = new_slice
6✔
2601
            return self.__class__(**kwargs)
6✔
2602

2603
        if isinstance(index, slice):
6✔
2604
            new_slice = self._slice_record[index]
6✔
2605
            kwargs = self._get_init_kwargs()
6✔
2606
            kwargs["slice_record"] = new_slice
6✔
2607
            if new_slice.plus_step > 1:
6✔
2608
                # we retain the annotation database only for "simple" slices
2609
                kwargs.pop("annotation_db", None)
6✔
2610

2611
            return self.__class__(**kwargs)
6✔
2612

2613
        if isinstance(index, FeatureMap):
6✔
2614
            return self._mapped(index)
6✔
2615

2616
        if isinstance(index, Feature):
6✔
2617
            if index.parent is not self:
6✔
2618
                msg = "This feature applied to the wrong sequence / alignment"
×
2619
                raise ValueError(msg)
×
2620
            return index.get_slice()
6✔
2621

2622
        msg = f"__getitem__ not implemented for {type(index)}"
6✔
2623
        raise NotImplementedError(msg)
6✔
2624

2625
    def __repr__(self) -> str:
6✔
2626
        seqs: list[str] = []
6✔
2627
        limit = 10
6✔
2628
        delimiter = ""
6✔
2629
        for count, name in enumerate(self.names):
6✔
2630
            if count == 3:
6✔
2631
                seqs.append("...")
6✔
2632
                break
6✔
2633
            elts = list(str(self.seqs[name])[: limit + 1])
6✔
2634
            if len(elts) > limit:
6✔
2635
                elts[-1] = "..."
6✔
2636
            seqs.append(f"{name}[{delimiter.join(elts)}]")
6✔
2637
        seqs_str = ", ".join(seqs)
6✔
2638

2639
        return f"{len(self.names)} x {len(self)} {self.moltype.label} alignment: {seqs_str}"
6✔
2640

2641
    def _repr_html_(self) -> str:
6✔
2642
        settings = self._repr_policy.copy()
×
2643
        env_vals = get_setting_from_environ(
×
2644
            "COGENT3_ALIGNMENT_REPR_POLICY",
2645
            {"num_seqs": int, "num_pos": int, "wrap": int, "ref_name": str},
2646
        )
2647
        settings.update(env_vals)
×
2648
        return self.to_html(
×
2649
            name_order=self.names[: settings["num_seqs"]],
2650
            ref_name=settings["ref_name"],
2651
            limit=settings["num_pos"],
2652
            wrap=settings["wrap"],
2653
        )
2654

2655
    @property
6✔
2656
    def storage(self) -> AlignedSeqsDataABC:
6✔
2657
        """the aligned sequence storage instance of the collection"""
2658
        return self._seqs_data
6✔
2659

2660
    @storage.setter
6✔
2661
    def storage(self, value: object) -> None:
6✔
2662
        # storage cannot be set after initialisation
2663
        msg = "storage cannot be set after initialisation"
6✔
2664
        raise TypeError(msg)
6✔
2665

2666
    @property
6✔
2667
    def modified(self) -> bool:
6✔
2668
        """collection is a modification of underlying storage"""
2669
        # include changed seq names?
2670
        sr = self._slice_record
6✔
2671
        changed_slice = sr.start != 0 or len(sr) != self.storage.align_len
6✔
2672
        return any(
6✔
2673
            [
2674
                changed_slice,
2675
                set(self.name_map.values()) != set(self.storage.names),
2676
                set(self.name_map.keys()) != set(self.name_map.values()),
2677
            ]
2678
        )
2679

2680
    @property
6✔
2681
    def array_seqs(self) -> NumpyIntArrayType:
6✔
2682
        """Returns a numpy array of sequences, axis 0 is seqs in order
2683
        corresponding to names"""
2684
        if self._array_seqs is None:
6✔
2685
            names = [self._name_map[n] for n in self.names]
6✔
2686
            # create the dest array dim
2687
            arr_seqs = self._seqs_data.get_pos_range(
6✔
2688
                names=names,
2689
                start=self._slice_record.plus_start,
2690
                stop=self._slice_record.plus_stop,
2691
                step=self._slice_record.plus_step,
2692
            )
2693
            if self.moltype.is_nucleic and self._slice_record.is_reversed:
6✔
2694
                rev_complement = self.moltype.rc
6✔
2695
                arr_seqs = arr_seqs.copy()
6✔
2696
                arr_seqs.flags.writeable = True
6✔
2697
                for i in range(arr_seqs.shape[0]):
6✔
2698
                    arr_seqs[i] = rev_complement(arr_seqs[i])
6✔
2699

2700
            arr_seqs.flags.writeable = False  # make sure data is immutable
6✔
2701
            self._array_seqs = arr_seqs
6✔
2702

2703
        return self._array_seqs
6✔
2704

2705
    @property
6✔
2706
    def positions(self) -> list[list[str]]:
6✔
2707
        # refactor: design
2708
        # possibly rename to str_positions since we have array_positions
2709
        from_indices = self.moltype.most_degen_alphabet().from_indices
6✔
2710
        return [list(from_indices(pos)) for pos in self.array_positions]
6✔
2711

2712
    @property
6✔
2713
    def array_positions(self) -> NumpyIntArrayType:
6✔
2714
        """Returns a numpy array of positions, axis 0 is alignment positions
2715
        columns in order corresponding to names."""
2716
        return self.array_seqs.T
6✔
2717

2718
    @c3warn.deprecated_callable(
6✔
2719
        version="2026.3",
2720
        reason="incorrectly implies modifies instance",
2721
        new="renamed_seqs",
2722
    )
2723
    def rename_seqs(self, renamer: Callable[[str], str]) -> Self:
6✔
2724
        """.. deprecated:: 2026.3
2725
        Use ``renamed_seqs`` instead.
2726
        """
2727
        return self.renamed_seqs(renamer)
×
2728

2729
    def renamed_seqs(self, renamer: Callable[[str], str]) -> Self:
6✔
2730
        """Returns new alignment with renamed sequences."""
2731
        new = super().renamed_seqs(renamer)
6✔
2732

2733
        if self._array_seqs is not None:
6✔
2734
            new._array_seqs = self._array_seqs  # noqa: SLF001
6✔
2735

2736
        return new
6✔
2737

2738
    def to_phylip(self) -> str:
6✔
2739
        """
2740
        Return collection in PHYLIP format and mapping to sequence ids
2741

2742
        Notes
2743
        -----
2744
        raises exception if sequences do not all have the same length
2745
        """
2746
        phylip = c3_plugin.get_seq_format_writer_plugin(format_name="phylip")
6✔
2747
        return phylip.formatted(self)
6✔
2748

2749
    @overload
2750
    def to_dict(self) -> dict[str, str]: ...
2751
    @overload
2752
    def to_dict(self, as_array: Literal[False]) -> dict[str, str]: ...
2753
    @overload
2754
    def to_dict(self, as_array: Literal[True]) -> dict[str, NumpyIntArrayType]: ...
2755

2756
    def to_dict(
6✔
2757
        self, as_array: bool = False
2758
    ) -> dict[str, str] | dict[str, NumpyIntArrayType]:
2759
        """Return a dictionary of sequences.
2760

2761
        Parameters
2762
        ----------
2763
        as_array
2764
            if True, sequences are returned as numpy arrays, otherwise as strings
2765
        """
2766
        arrayseqs = self.array_seqs
6✔
2767
        if as_array:
6✔
2768
            return {n: arrayseqs[i] for i, n in enumerate(self.names)}
6✔
2769

2770
        return {
6✔
2771
            n: self.storage.alphabet.from_indices(arrayseqs[i])
2772
            for i, n in enumerate(self.names)
2773
        }
2774

2775
    def to_rich_dict(self) -> dict[str, Any]:
6✔
2776
        """returns a json serialisable dict"""
2777
        kwargs = self._get_init_kwargs()
6✔
2778
        kwargs.pop("slice_record")  # slice is realised
6✔
2779
        kwargs["moltype"] = self.moltype.label
6✔
2780
        kwargs.pop("annotation_db", None)  # we dont serialise the annotation db
6✔
2781
        kwargs.pop(
6✔
2782
            "offset",
2783
            None,
2784
        )  # no need for offset since annotation db is not serialised
2785
        kwargs.pop("seqs_data")
6✔
2786

2787
        seqs = {self._name_map[s.name]: str(s) for s in self.seqs}
6✔
2788
        return {
6✔
2789
            "init_args": kwargs,
2790
            "type": get_object_provenance(self),
2791
            "version": __version__,
2792
            "seqs": seqs,
2793
        }
2794

2795
    @classmethod
6✔
2796
    def from_rich_dict(cls, data: dict[str, Any]) -> Alignment:
6✔
2797
        data["init_args"].pop("annotation_db", None)
6✔
2798
        return make_aligned_seqs(data["seqs"], **data["init_args"])
6✔
2799

2800
    def to_html(
6✔
2801
        self,
2802
        name_order: PySeq[str] | None = None,
2803
        wrap: int = 60,
2804
        limit: int | None = None,
2805
        colors: Mapping[str, str] | None = None,
2806
        font_size: int = 12,
2807
        font_family: str = "Lucida Console",
2808
        *,
2809
        ref_name: str = "longest",
2810
        **kwargs: str,
2811
    ) -> str:
2812
        """returns html with embedded styles for sequence colouring
2813

2814
        Parameters
2815
        ----------
2816
        name_order
2817
            order of names for display.
2818
        wrap
2819
            number of alignment columns per row
2820
        limit
2821
            truncate alignment to this length
2822
        ref_name
2823
            Name of an existing sequence or 'longest'. If the latter, the
2824
            longest sequence (excluding gaps and ambiguities) is selected as the
2825
            reference.
2826
        colors
2827
            {character
2828
            moltype.
2829
        font_size
2830
            in points. Affects labels and sequence and line spacing
2831
            (proportional to value)
2832
        font_family
2833
            string denoting font family
2834

2835
        Examples
2836
        --------
2837

2838
        In a jupyter notebook, this code is used to provide the representation.
2839

2840
        .. code-block:: python
2841

2842
            aln  # is rendered by jupyter
2843

2844
        You can directly use the result for display in a notebook as
2845

2846
        .. code-block:: python
2847

2848
            from IPython.core.display import HTML
2849

2850
            HTML(aln.to_html())
2851
        """
2852
        css, styles = self.moltype.get_css_style(
6✔
2853
            colors=colors,
2854
            font_size=font_size,
2855
            font_family=font_family,
2856
        )
2857
        if name_order:
6✔
2858
            selected = self.take_seqs(name_order)
×
2859
            name_order = list(name_order)
×
2860
        else:
2861
            name_order = list(self.names)
6✔
2862
            ref_name = ref_name or "longest"
6✔
2863
            selected = self
6✔
2864

2865
        if ref_name == "longest":
6✔
2866
            lengths = selected.get_lengths(include_ambiguity=False, allow_gap=False)
6✔
2867

2868
            length_names: defaultdict[int, list[str]] = defaultdict(list)
6✔
2869
            for n, l in lengths.items():
6✔
2870
                length_names[l].append(cast("str", n))
6✔
2871

2872
            longest = max(length_names)
6✔
2873
            ref = sorted(length_names[longest])[0]
6✔
2874

2875
        else:
2876
            if ref_name not in selected.names:
6✔
2877
                msg = f"Unknown sequence name {ref_name}"
×
2878
                raise ValueError(msg)
×
2879
            ref = ref_name
6✔
2880

2881
        name_order.remove(ref)
6✔
2882
        name_order.insert(0, ref)
6✔
2883

2884
        if limit is None:
6✔
2885
            names, output = selected._get_raw_pretty(name_order)
6✔
2886
        else:
2887
            names, output = selected[:limit]._get_raw_pretty(name_order)
×
2888

2889
        refname = names[0]
6✔
2890
        refseq = output[refname]
6✔
2891
        seqlen = len(refseq)
6✔
2892

2893
        if selected.moltype.gaps:
6✔
2894
            gaps = "".join(selected.moltype.gaps)
6✔
2895
            start_gap = re.search(f"^[{gaps}]+", "".join(refseq))
6✔
2896
            end_gap = re.search(f"[{gaps}]+$", "".join(refseq))
6✔
2897
            start = 0 if start_gap is None else start_gap.end()
6✔
2898
            end = seqlen if end_gap is None else end_gap.start()
6✔
2899
        else:
2900
            start = 0
6✔
2901
            end = seqlen
6✔
2902

2903
        seq_style: list[str] = []
6✔
2904
        template = '<span class="%s">%%s</span>'
6✔
2905
        styled_seqs: defaultdict[str, list[str]] = defaultdict(list)
6✔
2906
        for i in range(seqlen):
6✔
2907
            char = refseq[i]
6✔
2908
            if i < start or i >= end:
6✔
2909
                style = f"terminal_ambig_{selected.moltype.label}"
6✔
2910
            else:
2911
                style = styles[char]
6✔
2912

2913
            seq_style.append(template % style)
6✔
2914
            styled_seqs[refname].append(seq_style[-1] % char)
6✔
2915

2916
        for name in names:
6✔
2917
            if name == refname:
6✔
2918
                continue
6✔
2919

2920
            seq: list[str] = []
6✔
2921
            for i, c in enumerate(output[name]):
6✔
2922
                if c == ".":
6✔
2923
                    s = seq_style[i] % c
6✔
2924
                else:
2925
                    s = template % (styles[c])
6✔
2926
                    s = s % c
6✔
2927
                seq.append(s)
6✔
2928

2929
            styled_seqs[name] = seq
6✔
2930

2931
        # make a html table
2932
        seqs = numpy.array([styled_seqs[n] for n in names], dtype="O")
6✔
2933
        table = ["<table>"]
6✔
2934
        seq_ = "<td>%s</td>"
6✔
2935
        label_ = '<td class="label">%s</td>'
6✔
2936
        num_row_ = '<tr class="num_row"><td></td><td><b>{:,d}</b></td></tr>'
6✔
2937
        for i in range(0, seqlen, wrap):
6✔
2938
            table.append(num_row_.format(i))
6✔
2939
            seqblock = seqs[:, i : i + wrap].tolist()
6✔
2940
            for n, s in zip(names, seqblock, strict=False):
6✔
2941
                s = "".join(s)
6✔
2942
                row = "".join([label_ % n, seq_ % s])
6✔
2943
                table.append(f"<tr>{row}</tr>")
6✔
2944
        table.append("</table>")
6✔
2945
        if (limit and limit < len(selected)) or (
6✔
2946
            name_order and len(name_order) < len(selected.names)
2947
        ):
2948
            summary = (
×
2949
                f"{self.num_seqs} x {len(self)} (truncated to "
2950
                f"{len(name_order) if name_order else len(selected.names)} x "
2951
                f"{limit or len(selected)}) {selected.moltype.label} alignment"
2952
            )
2953
        else:
2954
            summary = (
6✔
2955
                f"{self.num_seqs} x {len(self)} {selected.moltype.label} alignment"
2956
            )
2957

2958
        text = [
6✔
2959
            "<style>",
2960
            ".c3align table {margin: 10px 0;}",
2961
            ".c3align td { border: none !important; text-align: left !important; }",
2962
            ".c3align tr:not(.num_row) td span {margin: 0 2px;}",
2963
            ".c3align tr:nth-child(even) {background: #f7f7f7;}",
2964
            ".c3align .num_row {background-color:rgba(161, 195, 209, 0.5) !important; border-top: solid 1px black; }",
2965
            ".c3align .label { font-size: %dpt ; text-align: right !important; "
2966
            "color: black !important; padding: 0 4px; display: table-cell !important; "
2967
            "font-weight: normal !important; }" % font_size,
2968
            "\n".join([f".c3align {style}" for style in css]),
2969
            "</style>",
2970
            '<div class="c3align">',
2971
            "\n".join(table),
2972
            f"<p><i>{summary}</i></p>",
2973
            "</div>",
2974
        ]
2975
        return "\n".join(text)
6✔
2976

2977
    def _get_raw_pretty(
6✔
2978
        self, name_order: PySeq[str] | None
2979
    ) -> tuple[PySeq[str], defaultdict[str, list[str]]]:
2980
        """returns dict {name: seq, ...} for pretty print"""
2981
        if name_order is not None:
6✔
2982
            assert set(name_order) <= set(self.names), "names don't match"
6✔
2983

2984
        output: defaultdict[str, list[str]] = defaultdict(list)
6✔
2985
        names: PySeq[str] = name_order or self.names
6✔
2986
        num_seqs = len(names)
6✔
2987

2988
        seqs = [str(self.seqs[name]) for name in names]
6✔
2989
        positions = list(zip(*seqs, strict=False))
6✔
2990

2991
        for position in positions:
6✔
2992
            ref = position[0]
6✔
2993
            output[names[0]].append(ref)
6✔
2994
            for seq_num in range(1, num_seqs):
6✔
2995
                val = "." if position[seq_num] == ref else position[seq_num]
6✔
2996
                output[names[seq_num]].append(val)
6✔
2997

2998
        return names, output
6✔
2999

3000
    def to_pretty(
6✔
3001
        self,
3002
        name_order: list[str] | None = None,
3003
        wrap: int | None = None,
3004
    ) -> str:
3005
        """returns a string representation of the alignment in pretty print format
3006

3007
        Parameters
3008
        ----------
3009
        name_order
3010
            order of names for display.
3011
        wrap
3012
            maximum number of printed bases
3013
        """
3014
        names, output = self._get_raw_pretty(name_order=name_order)
6✔
3015
        label_width = max(list(map(len, names)))
6✔
3016
        name_template = f"{{:>{label_width}}}"
6✔
3017
        display_names = {n: name_template.format(n) for n in names}
6✔
3018

3019
        def make_line(label: str, seq: str) -> str:
6✔
3020
            return f"{label}    {seq}"
6✔
3021

3022
        result: list[str]
3023

3024
        if wrap is None:
6✔
3025
            result = [make_line(display_names[n], "".join(output[n])) for n in names]
6✔
3026
            return "\n".join(result)
6✔
3027

3028
        align_length = len(self)
6✔
3029
        result = []
6✔
3030
        for start in range(0, align_length, wrap):
6✔
3031
            for n in names:
6✔
3032
                result.append(
6✔
3033
                    make_line(
3034
                        display_names[n],
3035
                        "".join(output[n][start : start + wrap]),
3036
                    ),
3037
                )
3038

3039
            result.append("")
6✔
3040

3041
        if result and not result[-1]:
6✔
3042
            del result[-1]
6✔
3043

3044
        return "\n".join(result)
6✔
3045

3046
    def degap(
6✔
3047
        self, storage_backend: str | None = None, **kwargs: Any
3048
    ) -> SequenceCollection:
3049
        """returns collection sequences without gaps or missing characters.
3050

3051
        Parameters
3052
        ----------
3053
        storage_backend
3054
            name of the storage backend to use for the SeqsData object, defaults to
3055
            cogent3 builtin.
3056
        kwargs
3057
            keyword arguments for the storage driver
3058

3059
        Notes
3060
        -----
3061
        The returned collection will not retain an annotation_db if present.
3062
        """
3063
        # because SequenceCollection does not track slice operations, we need
3064
        # to apply any slice record to the underlying data
3065
        sr = self._slice_record
6✔
3066
        data, kw = self._seqs_data.get_ungapped(
6✔
3067
            name_map=self._name_map,
3068
            start=sr.plus_start,
3069
            stop=sr.plus_stop,
3070
            step=sr.plus_step,
3071
        )
3072
        kwargs = kw | kwargs
6✔
3073
        # the SeqsData classes will return the data corresponding to the slice,
3074
        # however, will not complement the data if the step is negative. We do
3075
        # this here.
3076
        rev_complement = self.moltype.rc
6✔
3077
        data = (
6✔
3078
            {name: rev_complement(seq) for name, seq in data.items()}
3079
            if sr.step < 0
3080
            else data
3081
        )
3082
        kwargs["annotation_db"] = self._annotation_db
6✔
3083
        kwargs["storage_backend"] = storage_backend
6✔
3084
        return make_unaligned_seqs(data, moltype=self.moltype, info=self.info, **kwargs)
6✔
3085

3086
    def get_gapped_seq(
6✔
3087
        self,
3088
        seqname: str,
3089
        recode_gaps: bool = False,
3090
    ) -> c3_sequence.Sequence:
3091
        """Return a gapped Sequence object for the specified seqname.
3092

3093

3094
        Parameters
3095
        ----------
3096
        seqname
3097
            sequence name
3098
        recode_gaps
3099
            if True, gap characters are replaced by the most general
3100
            ambiguity code, e.g. N for DNA and RNA
3101

3102
        Notes
3103
        -----
3104
        This method breaks the connection to the annotation database.
3105
        """
3106
        s: c3_sequence.Sequence | str = self.seqs[seqname].gapped_seq
6✔
3107
        if recode_gaps:
6✔
3108
            s = str(s)
6✔
3109
            non_ambig = list(self.moltype)
6✔
3110
            ambig = self.moltype.degenerate_from_seq(non_ambig)
6✔
3111
            for gapchar in self.moltype.gaps:
6✔
3112
                s = s.replace(gapchar, ambig)
6✔
3113

3114
        return self.moltype.make_seq(seq=s, name=seqname)
6✔
3115

3116
    @extend_docstring_from(SequenceCollection.get_translation)
6✔
3117
    def get_translation(
6✔
3118
        self,
3119
        gc: c3_genetic_code.GeneticCode | int = 1,
3120
        incomplete_ok: bool = False,
3121
        include_stop: bool = False,
3122
        trim_stop: bool = True,
3123
        **kwargs: Any,
3124
    ) -> Self:
3125
        if not self.moltype.is_nucleic:
6✔
3126
            msg = f"moltype must be a DNA/RNA, not {self.moltype.name!r}"
6✔
3127
            raise c3_moltype.MolTypeError(msg)
6✔
3128

3129
        if not trim_stop or include_stop:
6✔
3130
            seqs = self
6✔
3131
        else:
3132
            seqs = self.trim_stop_codons(gc=gc, strict=not incomplete_ok)
6✔
3133

3134
        translated: dict[str, NumpyIntArrayType] = {}
6✔
3135
        for seqname in seqs.names:
6✔
3136
            seq = cast(
6✔
3137
                "c3_sequence.NucleicAcidSequenceBase", seqs.get_gapped_seq(seqname)
3138
            )
3139
            pep = seq.get_translation(
6✔
3140
                gc,
3141
                incomplete_ok=incomplete_ok,
3142
                include_stop=include_stop,
3143
                trim_stop=trim_stop,
3144
            )
3145
            translated[self.name_map[seqname]] = numpy.array(pep)
6✔
3146

3147
        pep_moltype = c3_moltype.get_moltype(
6✔
3148
            "protein_with_stop" if include_stop else "protein",
3149
        )
3150
        seqs_data = self._seqs_data.from_seqs(
6✔
3151
            data=translated,
3152
            alphabet=pep_moltype.most_degen_alphabet(),
3153
            offset=None,
3154
        )
3155
        return self.__class__(
6✔
3156
            seqs_data=seqs_data,
3157
            moltype=pep_moltype,
3158
            name_map=self._name_map,
3159
            info=self.info,
3160
            source=self.source,
3161
            **kwargs,
3162
        )
3163

3164
    def trim_stop_codons(
6✔
3165
        self,
3166
        gc: c3_genetic_code.GeneticCode | int = 1,
3167
        strict: bool = False,
3168
        **kwargs: Any,
3169
    ) -> Self:
3170
        # refactor: array
3171
        if not self.has_terminal_stop(gc=gc, strict=strict):
6✔
3172
            return self
6✔
3173

3174
        # define a regex for finding stop codons followed by terminal gaps
3175
        gc = c3_genetic_code.get_code(gc)
6✔
3176
        gaps = "".join(self.moltype.gaps)
6✔
3177
        pattern = f"({'|'.join(gc['*'])})[{gaps}]*$"
6✔
3178
        terminal_stop = re.compile(pattern)
6✔
3179

3180
        data = self.to_dict()
6✔
3181
        result: dict[str, str] = {}
6✔
3182
        for name, seq in data.items():
6✔
3183
            if match := terminal_stop.search(seq):
6✔
3184
                diff = len(seq) - match.start()
6✔
3185
                seq = terminal_stop.sub("-" * diff, seq)
6✔
3186

3187
            result[self.name_map[name]] = seq
6✔
3188

3189
        seqs_data = self._seqs_data.from_seqs(
6✔
3190
            data=result,
3191
            alphabet=self.moltype.most_degen_alphabet(),
3192
        )
3193
        init_kwargs = self._get_init_kwargs()
6✔
3194
        init_kwargs["seqs_data"] = seqs_data
6✔
3195
        init_kwargs.pop("slice_record", None)
6✔
3196
        init_kwargs |= kwargs
6✔
3197
        return self.__class__(**init_kwargs)
6✔
3198

3199
    def rc(self) -> Self:
6✔
3200
        """Returns the reverse complement of all sequences in the alignment.
3201
        A synonym for reverse_complement.
3202
        """
3203
        init_kwargs = self._get_init_kwargs()
6✔
3204
        init_kwargs["slice_record"] = self._slice_record[::-1]
6✔
3205
        return self.__class__(**init_kwargs)
6✔
3206

3207
    def distance_matrix(
6✔
3208
        self,
3209
        calc: str = "pdist",
3210
        drop_invalid: bool = False,
3211
        parallel: bool = False,
3212
        **kwargs: bool,
3213
    ) -> DistanceMatrix:
3214
        """Returns pairwise distances between sequences.
3215

3216
        Parameters
3217
        ----------
3218
        calc
3219
            a pairwise distance calculator name. Presently only
3220
            'pdist', 'jc69', 'tn93', 'hamming', 'paralinear' are supported.
3221
        drop_invalid
3222
            If True, sequences for which a pairwise distance could not be
3223
            calculated are excluded. If False, an ArithmeticError is raised if
3224
            a distance could not be computed on observed data.
3225
        """
3226
        from cogent3.evolve.pairwise_distance_numba import get_distance_calculator
6✔
3227

3228
        calculator = get_distance_calculator(calc)
6✔
3229
        try:
6✔
3230
            result = calculator(
6✔
3231
                self, invalid_raises=not drop_invalid, parallel=parallel
3232
            )
3233
        except ArithmeticError as e:
6✔
3234
            msg = "not all pairwise distances could be computed, try drop_invalid=True"
6✔
3235
            raise ArithmeticError(msg) from e
6✔
3236

3237
        if drop_invalid:
6✔
3238
            result = result.drop_invalid()
6✔
3239

3240
        return result
6✔
3241

3242
    def make_feature(
6✔
3243
        self,
3244
        *,
3245
        feature: FeatureDataType,
3246
        on_alignment: bool | None = None,
3247
        **kwargs: bool | None,
3248
    ) -> Feature[Alignment]:
3249
        """
3250
        create a feature on named sequence, or on the alignment itself
3251

3252
        Parameters
3253
        ----------
3254
        feature
3255
            a dict with all the necessary data rto construct a feature
3256
        on_alignment
3257
            the feature is in alignment coordinates, incompatible with setting
3258
            'seqid'. Set to True if 'seqid' not provided.
3259

3260
        Returns
3261
        -------
3262
        Feature
3263

3264
        Raises
3265
        ------
3266
        ValueError if define a 'seqid' not on alignment or use 'seqid' and
3267
        on_alignment.
3268

3269
        Notes
3270
        -----
3271
        To get a feature AND add it to annotation_db, use add_feature().
3272
        """
3273
        if on_alignment is None:
6✔
3274
            on_alignment = feature.pop("on_alignment", None)  # type: ignore[misc]
6✔
3275

3276
        if not on_alignment and feature["seqid"]:
6✔
3277
            return self.seqs[feature["seqid"]].make_feature(feature, self)
6✔
3278

3279
        feature["seqid"] = cast("str", cast("str | None", feature.get("seqid", None)))
6✔
3280
        # there's no sequence to bind to, the feature is directly on self
3281
        revd = Strand.from_value(feature.pop("strand", None)) is Strand.MINUS  # type: ignore[misc]
6✔
3282
        feature["strand"] = Strand.MINUS.value if revd else Strand.PLUS.value
6✔
3283
        fmap = FeatureMap.from_locations(
6✔
3284
            locations=feature.pop("spans"),  # type: ignore[misc]
3285
            parent_length=len(self),
3286
        )
3287
        if revd:
6✔
3288
            fmap = fmap.nucleic_reversed()
6✔
3289
        return Feature[Alignment](parent=self, map=fmap, **feature)  # type: ignore[misc]
6✔
3290

3291
    def add_feature(
6✔
3292
        self,
3293
        *,
3294
        seqid: str | None = None,
3295
        biotype: str,
3296
        name: str,
3297
        spans: list[tuple[int, int]],
3298
        parent_id: str | None = None,
3299
        strand: str | int = "+",
3300
        on_alignment: bool | None = None,
3301
        **kwargs: bool | None,
3302
    ) -> Feature[Alignment]:
3303
        """
3304
        add feature on named sequence, or on the alignment itself
3305

3306
        Parameters
3307
        ----------
3308
        seqid
3309
            sequence name, incompatible with on_alignment
3310
        parent_id
3311
            name of the parent feature
3312
        biotype
3313
            biological type, e.g. CDS
3314
        name
3315
            name of the feature
3316
        spans
3317
            plus strand coordinates of feature
3318
        strand
3319
            '+' (default) or '-'
3320
        on_alignment
3321
            the feature is in alignment coordinates, incompatible with setting
3322
            seqid. Set to True if seqid not provided.
3323

3324
        Returns
3325
        -------
3326
        Feature
3327

3328
        Raises
3329
        ------
3330
        ValueError if define a seqid not on alignment or use seqid and
3331
        on_alignment.
3332
        """
3333
        del kwargs
6✔
3334

3335
        if seqid and on_alignment is None:
6✔
3336
            on_alignment = False
6✔
3337
        elif not on_alignment:
6✔
3338
            on_alignment = on_alignment is None
6✔
3339

3340
        if seqid and on_alignment:
6✔
3341
            msg = "seqid and on_alignment are incomatible"
×
3342
            raise ValueError(msg)
×
3343

3344
        if seqid and seqid not in self.names:
6✔
3345
            msg = f"unknown {seqid=}"
6✔
3346
            raise ValueError(msg)
6✔
3347

3348
        feature = {k: v for k, v in locals().items() if k != "self"}
6✔
3349
        feature["strand"] = Strand.from_value(strand).value
6✔
3350
        # property ensures db is created
3351
        self.annotation_db.add_feature(**feature)
6✔
3352
        for discard in ("on_alignment", "parent_id"):
6✔
3353
            feature.pop(discard, None)
6✔
3354
        return self.make_feature(
6✔
3355
            feature=cast("FeatureDataType", feature), on_alignment=on_alignment
3356
        )
3357

3358
    def _get_seq_features(
6✔
3359
        self,
3360
        *,
3361
        seqid: str | None = None,
3362
        biotype: str | None = None,
3363
        name: str | None = None,
3364
        allow_partial: bool = False,
3365
    ) -> Iterator[Feature[Alignment]]:
3366
        """yields Feature instances
3367

3368
        Parameters
3369
        ----------
3370
        seqid
3371
            limit search to features on this named sequence, defaults to search all
3372
        biotype
3373
            biotype of the feature, e.g. CDS, gene
3374
        name
3375
            name of the feature
3376
        allow_partial
3377
            allow features partially overlaping self
3378

3379
        Notes
3380
        -----
3381
        When dealing with a nucleic acid moltype, the returned features will
3382
        yield a sequence segment that is consistently oriented irrespective
3383
        of strand of the current instance.
3384
        """
3385
        if not self._annotation_db:
6✔
3386
            return None
×
3387

3388
        seqid_to_seqname = {v: k for k, v in self._name_map.items()}
6✔
3389

3390
        seqids: PySeq[str] | None = [seqid] if isinstance(seqid, str) else seqid
6✔
3391
        if seqids is None:
6✔
3392
            seqids = tuple(seqid_to_seqname)
6✔
3393
        elif set(seqids) & set(self.names):
6✔
3394
            # we've been given seq names, convert to parent names
3395
            seqids = [self.seqs[seqid].parent_coordinates()[0] for seqid in seqids]
6✔
3396
        elif not (seqids and set(seqids) <= seqid_to_seqname.keys()):
6✔
3397
            msg = f"unknown {seqid=}"
6✔
3398
            raise ValueError(msg)
6✔
3399

3400
        for seqid in seqids:
6✔
3401
            seqname = seqid_to_seqname[seqid]
6✔
3402
            seq = self.seqs[seqname]
6✔
3403
            # we use parent seqid
3404
            parent_id, start, stop, _ = seq.parent_coordinates(apply_offset=False)
6✔
3405
            # we get the annotation offset from storage
3406
            # because we need it to adjust the returned feature spans
3407
            # to the alignment coordinates
3408
            offset = self.storage.offset.get(seqid, 0)
6✔
3409

3410
            for feature in self.annotation_db.get_features_matching(
6✔
3411
                seqid=parent_id,
3412
                biotype=biotype,
3413
                name=name,
3414
                on_alignment=False,
3415
                allow_partial=allow_partial,
3416
                start=start + offset,
3417
                stop=stop + offset,
3418
            ):
3419
                if offset:
6✔
3420
                    feature["spans"] = (numpy.array(feature["spans"]) - offset).tolist()
6✔
3421
                # passing self only used when self is an Alignment
3422
                yield seq.make_feature(feature, self)
6✔
3423

3424
    def get_features(
6✔
3425
        self,
3426
        *,
3427
        seqid: str | Iterator[str] | None = None,
3428
        biotype: str | tuple[str, ...] | list[str] | set[str] | None = None,
3429
        name: str | None = None,
3430
        allow_partial: bool = False,
3431
        on_alignment: bool | None = None,
3432
        **kwargs: Any,
3433
    ) -> Iterator[Feature[Alignment]]:
3434
        """yields Feature instances
3435

3436
        Parameters
3437
        ----------
3438
        seqid
3439
            limit search to features on this named sequence, defaults to search all
3440
        biotype
3441
            biotype of the feature, e.g. CDS, gene
3442
        name
3443
            name of the feature
3444
        on_alignment
3445
            limit query to features on Alignment, ignores sequences. Ignored on
3446
            SequenceCollection instances.
3447
        allow_partial
3448
            allow features partially overlaping self
3449

3450
        Notes
3451
        -----
3452
        When dealing with a nucleic acid moltype, the returned features will
3453
        yield a sequence segment that is consistently oriented irrespective
3454
        of strand of the current instance.
3455
        """
3456
        del kwargs
6✔
3457

3458
        if not self._annotation_db or not len(self._annotation_db):
6✔
3459
            return None
6✔
3460

3461
        # we only do on-alignment in here
3462
        if not on_alignment:
6✔
3463
            local_vars = locals()
6✔
3464
            kwargs = {k: v for k, v in local_vars.items() if k != "self"}
6✔
3465
            kwargs.pop("on_alignment")
6✔
3466
            yield from self._get_seq_features(**kwargs)
6✔
3467

3468
        if on_alignment == False:  # noqa: E712
6✔
3469
            return
×
3470

3471
        seq_map = None
6✔
3472
        for feature in self.annotation_db.get_features_matching(
6✔
3473
            biotype=biotype,
3474
            name=name,
3475
            on_alignment=on_alignment,
3476
            allow_partial=allow_partial,
3477
        ):
3478
            if feature["seqid"]:
6✔
3479
                continue
6✔
3480
            on_al = cast("bool", feature.pop("on_alignment", on_alignment))  # type: ignore[misc]
6✔
3481
            if feature["seqid"]:
6✔
3482
                msg = f"{on_alignment=} {feature=}"
×
3483
                raise RuntimeError(msg)
×
3484

3485
            strand: int | str | None
3486
            if seq_map is None:
6✔
3487
                seq_map = self.seqs[0].map.to_feature_map()
6✔
3488
                *_, strand = self.seqs[0].seq.parent_coordinates()
6✔
3489
            else:
3490
                strand = feature.pop("strand", None)  # type: ignore[misc]
×
3491

3492
            spans = seq_map.relative_position(numpy.array(feature["spans"]))
6✔
3493
            feature["spans"] = spans.tolist()
6✔
3494
            # and if i've been reversed...?
3495
            feature["strand"] = cast("int", Strand.from_value(strand).value)
6✔
3496
            yield self.make_feature(feature=feature, on_alignment=on_al)
6✔
3497

3498
    def is_ragged(self) -> bool:
6✔
3499
        """by definition False for an Alignment"""
3500
        return False
6✔
3501

3502
    def counts_per_seq(
6✔
3503
        self,
3504
        motif_length: int = 1,
3505
        include_ambiguity: bool = False,
3506
        allow_gap: bool = False,
3507
        exclude_unobserved: bool = False,
3508
        warn: bool = False,
3509
    ) -> MotifCountsArray | None:
3510
        """counts of non-overlapping motifs per sequence
3511

3512
        Parameters
3513
        ----------
3514
        motif_length
3515
            number of elements per character.
3516
        include_ambiguity
3517
            if True, motifs containing ambiguous characters
3518
            from the seq moltype are included. No expansion of those is attempted.
3519
        allow_gap
3520
            if True, motifs containing a gap character are included.
3521
        exclude_unobserved
3522
            if False, all canonical states included
3523
        warn
3524
            warns if motif_length > 1 and alignment trimmed to produce
3525
            motif columns
3526
        """
3527
        length = (len(self) // motif_length) * motif_length
6✔
3528
        if not length:
6✔
3529
            l_motifs = list(self.moltype)
6✔
3530
            l_counts: NumpyIntArrayType = numpy.zeros(
6✔
3531
                (len(self.names), len(l_motifs)), dtype=int
3532
            )
3533
            return MotifCountsArray(l_counts, l_motifs, row_indices=self.names)
6✔
3534

3535
        if warn and len(self) != length:
6✔
3536
            warnings.warn(f"trimmed {len(self) - length}", UserWarning, stacklevel=2)
×
3537

3538
        counts: list[CategoryCounter[str | bytes]] = []
6✔
3539
        motifs: set[str | bytes] = set()
6✔
3540
        for name in self.names:
6✔
3541
            seq = self.get_gapped_seq(name)
6✔
3542
            c = seq.counts(
6✔
3543
                motif_length=motif_length,
3544
                include_ambiguity=include_ambiguity,
3545
                allow_gap=allow_gap,
3546
            )
3547
            motifs.update(c.keys())
6✔
3548
            counts.append(c)
6✔
3549

3550
        # if type motifs not same as type element in moltype
3551
        if not exclude_unobserved:
6✔
3552
            motifs.update(self.moltype.alphabet.get_kmer_alphabet(motif_length))
6✔
3553

3554
        motifs_list = sorted(motifs)
6✔
3555
        if not motifs_list:
6✔
3556
            return None
6✔
3557

3558
        final_counts = [c.tolist(motifs_list) for c in counts]
6✔
3559
        return MotifCountsArray(final_counts, motifs_list, row_indices=self.names)
6✔
3560

3561
    def count_ambiguous_per_seq(self) -> DictArray:
6✔
3562
        """Return the counts of ambiguous characters per sequence as a DictArray."""
3563

3564
        gap_index = cast("int", self.moltype.most_degen_alphabet().gap_index)
6✔
3565
        ambigs_pos = self.array_seqs > gap_index
6✔
3566
        ambigs = ambigs_pos.sum(axis=1)
6✔
3567

3568
        return DictArray.from_array_names(ambigs, self.names)
6✔
3569

3570
    def strand_symmetry(self, motif_length: int = 1) -> dict[str, TestResult]:
6✔
3571
        """returns dict of strand symmetry test results per ungapped seq"""
3572
        return {
6✔
3573
            s.name: cast("c3_sequence.NucleicAcidSequenceBase", s.seq).strand_symmetry(
3574
                motif_length=motif_length
3575
            )
3576
            for s in self.seqs
3577
        }
3578

3579
    @override
6✔
3580
    def duplicated_seqs(self) -> list[list[str]]:
6✔
3581
        """returns the names of duplicated sequences
3582

3583
        Notes
3584
        -----
3585
        The gapped sequence is used.
3586
        """
3587
        if not len(self):
6✔
3588
            # all have zero lengths
3589
            return [] if self.num_seqs < 2 else [list(self.names)]
6✔
3590

3591
        return super().duplicated_seqs()
6✔
3592

3593
    def alignment_quality(self, app_name: str = "ic_score", **kwargs: Any) -> float:
6✔
3594
        """
3595
        Computes the alignment quality using the indicated app
3596

3597
        Parameters
3598
        ----------
3599
        app_name
3600
            name of an alignment score calculating app, e.g. 'ic_score',
3601
            'cogent3_score', 'sp_score'
3602

3603
        kwargs
3604
            keyword arguments to be passed to the app. Use
3605
            ``cogent3.app_help(app_name)`` to see the available options.
3606

3607
        Returns
3608
        -------
3609
        float or a NotCompleted instance if the score could not be computed
3610
        """
3611
        app = cogent3.get_app(app_name, **kwargs)
6✔
3612
        return app(self)
6✔
3613

3614
    def iter_positions(
6✔
3615
        self,
3616
        pos_order: list[int | slice | FeatureMap] | range | None = None,
3617
    ) -> Iterator[list[str]]:
3618
        """Iterates over positions in the alignment, in order.
3619

3620
        Parameters
3621
        ----------
3622
        pos_order
3623
            list of indices specifying the column order. If None, the
3624
            positions are iterated in order.
3625

3626
        Returns
3627
        -------
3628
        yields lists of elemenets for each position (column) in the alignment
3629
        """
3630
        # refactor: array
3631
        # this could also iter columns of indices as a numpy array - could be an optional arg
3632
        # refactor: add motif_length argument
3633
        pos_order = pos_order or range(len(self))
6✔
3634
        for pos in pos_order:
6✔
3635
            yield [str(self[seq][pos]) for seq in self.names]
6✔
3636

3637
    def get_position_indices(
6✔
3638
        self,
3639
        f: Callable[[PySeq[str]], bool],
3640
        negate: bool = False,
3641
    ) -> list[int]:
3642
        """Returns list of column indices for which f(col) is True.
3643

3644
        Parameters
3645
        ----------
3646
        f
3647
          function that returns true/false given an alignment position
3648
        negate
3649
          if True, not f() is used
3650
        """
3651
        # refactor:
3652
        # type hint for f
3653
        # implement native
3654
        new_f = negate_condition(f) if negate else f
6✔
3655

3656
        # refactor: design
3657
        # use array_positions here
3658
        return [i for i, col in enumerate(self.positions) if new_f(col)]
6✔
3659

3660
    def take_positions(
6✔
3661
        self,
3662
        cols: list[int] | NumpyIntArrayType,
3663
        negate: bool = False,
3664
    ) -> Self:
3665
        """Returns new Alignment containing only specified positions.
3666

3667
        Parameters
3668
        ----------
3669
        cols
3670
            list of column indices to keep
3671
        negate
3672
            if True, all columns except those in cols are kept
3673
        """
3674
        # refactor: array - use array operations throughout method
3675
        if negate:
6✔
3676
            col_lookup = dict.fromkeys(cols)
6✔
3677
            cols = [i for i in range(len(self)) if i not in col_lookup]
6✔
3678

3679
        new_data = {
6✔
3680
            self.name_map[aligned.name]: numpy.array(aligned).take(cols)
3681
            for aligned in self.seqs
3682
        }
3683
        seqs_data = self._seqs_data.from_seqs(
6✔
3684
            data=new_data,
3685
            alphabet=self.moltype.most_degen_alphabet(),
3686
        )
3687
        kwargs = self._get_init_kwargs()
6✔
3688
        kwargs["seqs_data"] = seqs_data
6✔
3689
        kwargs.pop("annotation_db", None)
6✔
3690
        kwargs.pop("slice_record", None)
6✔
3691
        return self.__class__(**kwargs)
6✔
3692

3693
    def take_positions_if(
6✔
3694
        self, f: Callable[[PySeq[str]], bool], negate: bool = False
3695
    ) -> Self:
3696
        """Returns new Alignment containing cols where f(col) is True."""
3697
        return self.take_positions(self.get_position_indices(f, negate=negate))
6✔
3698

3699
    def get_gap_array(self, include_ambiguity: bool = True) -> npt.NDArray[numpy.bool]:
6✔
3700
        """returns bool array with gap state True, False otherwise
3701

3702
        Parameters
3703
        ----------
3704
        include_ambiguity
3705
            if True, ambiguity characters that include the gap state are
3706
            included
3707
        """
3708
        alpha = self.moltype.most_degen_alphabet()
6✔
3709
        gapped = self.array_seqs == alpha.gap_index
6✔
3710
        if include_ambiguity:
6✔
3711
            gapped = gapped | (self.array_seqs == alpha.missing_index)
6✔
3712
        return gapped
6✔
3713

3714
    def iupac_consensus(self, allow_gap: bool = True) -> str:
6✔
3715
        """Returns string containing IUPAC consensus sequence of the alignment."""
3716
        exclude: set[str] = set() if allow_gap else set(self.moltype.gaps)
6✔
3717
        consensus: list[str] = []
6✔
3718
        degen = self.moltype.degenerate_from_seq
6✔
3719
        for col in self.iter_positions():
6✔
3720
            diff = set(col) - exclude
6✔
3721
            consensus.append(degen("".join(diff)))
6✔
3722
        return "".join(consensus)
6✔
3723

3724
    def majority_consensus(self) -> c3_sequence.Sequence:
6✔
3725
        """Returns consensus sequence containing most frequent item at each
3726
        position."""
3727
        states: list[str] = []
6✔
3728
        data: zip[tuple[str, ...]] = zip(*map(str, self.seqs), strict=False)
6✔
3729
        for pos in data:
6✔
3730
            pos_counts = CategoryCounter(pos)
6✔
3731
            states.append(pos_counts.mode)
6✔
3732

3733
        return self.moltype.make_seq(seq="".join(states))
6✔
3734

3735
    def counts_per_pos(
6✔
3736
        self,
3737
        motif_length: int = 1,
3738
        include_ambiguity: bool = False,
3739
        allow_gap: bool = False,
3740
        warn: bool = False,
3741
    ) -> MotifCountsArray:
3742
        """return MotifCountsArray of counts per position
3743

3744
        Parameters
3745
        ----------
3746
        motif_length
3747
            number of elements per character.
3748
        include_ambiguity
3749
            if True, motifs containing ambiguous characters from the seq moltype
3750
            are included. No expansion of those is attempted.
3751
        allow_gap
3752
            if True, motifs containing a gap character are included.
3753
        warn
3754
            warns if motif_length > 1 and alignment trimmed to produce
3755
            motif columns
3756
        """
3757
        # refactor: performance
3758
        # use self.variable_positions and a numba decorated
3759
        # function for counting k-mers the latter should allow returning the
3760
        # first allowed state for when position is not variable
3761

3762
        align_len = len(self._slice_record)
6✔
3763
        length = (align_len // motif_length) * motif_length
6✔
3764
        if warn and align_len != length:
6✔
3765
            warnings.warn(f"trimmed {align_len - length}", UserWarning, stacklevel=2)
×
3766

3767
        data = list(self.to_dict().values())
6✔
3768
        alpha = cast(
6✔
3769
            "tuple[str, ...]",
3770
            self.moltype.alphabet.get_kmer_alphabet(motif_length),
3771
        )
3772
        all_motifs: set[str] = set()
6✔
3773
        exclude_chars: set[str] = set()
6✔
3774
        if not allow_gap:
6✔
3775
            exclude_chars.update(cast("str", self.moltype.gap))
6✔
3776

3777
        if not include_ambiguity and self.moltype.degen_alphabet:
6✔
3778
            ambigs = [
6✔
3779
                c
3780
                for c, v in cast(
3781
                    "dict[str, frozenset[str]]", self.moltype.ambiguities
3782
                ).items()
3783
                if len(v) > 1
3784
            ]
3785
            exclude_chars.update(ambigs)
6✔
3786

3787
        result: list[CategoryCounter[str]] = []
6✔
3788
        for i in range(0, align_len - motif_length + 1, motif_length):
6✔
3789
            counts = CategoryCounter([s[i : i + motif_length] for s in data])
6✔
3790
            all_motifs.update(list(counts))
6✔
3791
            result.append(counts)
6✔
3792

3793
        if all_motifs:
6✔
3794
            alpha += tuple(sorted(set(alpha) ^ all_motifs))
6✔
3795

3796
        if exclude_chars:
6✔
3797
            # this additional clause is required for the bytes moltype
3798
            # That moltype includes '-' as a character
3799
            alpha = tuple(m for m in alpha if not (set(m) & exclude_chars))
6✔
3800

3801
        final_result = [counts.tolist(alpha) for counts in result]
6✔
3802

3803
        return MotifCountsArray(final_result, alpha)
6✔
3804

3805
    def probs_per_pos(
6✔
3806
        self,
3807
        motif_length: int = 1,
3808
        include_ambiguity: bool = False,
3809
        allow_gap: bool = False,
3810
        warn: bool = False,
3811
    ) -> MotifFreqsArray:
3812
        """returns MotifFreqsArray per position"""
3813
        counts = self.counts_per_pos(
6✔
3814
            motif_length=motif_length,
3815
            include_ambiguity=include_ambiguity,
3816
            allow_gap=allow_gap,
3817
            warn=warn,
3818
        )
3819
        return counts.to_freq_array()
6✔
3820

3821
    def entropy_per_pos(
6✔
3822
        self,
3823
        motif_length: int = 1,
3824
        include_ambiguity: bool = False,
3825
        allow_gap: bool = False,
3826
        warn: bool = False,
3827
    ) -> NumpyFloatArrayType:
3828
        """returns shannon entropy per position"""
3829
        # if the current alignment is very long, we chunk this
3830
        # in case a backend is being used that stores contents on
3831
        # disk by sequence
3832
        probs = self.probs_per_pos(
6✔
3833
            motif_length=motif_length,
3834
            include_ambiguity=include_ambiguity,
3835
            allow_gap=allow_gap,
3836
            warn=warn,
3837
        )
3838
        return probs.entropy()
6✔
3839

3840
    def count_gaps_per_pos(self, include_ambiguity: bool = True) -> DictArray:
6✔
3841
        """return counts of gaps per position as a DictArray
3842

3843
        Parameters
3844
        ----------
3845
        include_ambiguity
3846
            if True, ambiguity characters that include the gap state are
3847
            included
3848
        """
3849
        gap_array = self.get_gap_array(include_ambiguity=include_ambiguity)
6✔
3850
        darr = DictArrayTemplate(range(len(self)))
6✔
3851

3852
        result = gap_array.sum(axis=0)
6✔
3853
        return darr.wrap(result)
6✔
3854

3855
    def count_gaps_per_seq(
6✔
3856
        self,
3857
        induced_by: bool = False,
3858
        unique: bool = False,
3859
        include_ambiguity: bool = True,
3860
        drawable: str | None = None,
3861
    ) -> DictArray:
3862
        """return counts of gaps per sequence as a DictArray
3863

3864
        Parameters
3865
        ----------
3866
        induced_by
3867
            a gapped column is considered to be induced by a seq if the seq
3868
            has a non-gap character in that column.
3869
        unique
3870
            count is limited to gaps uniquely induced by each sequence
3871
        include_ambiguity
3872
            if True, ambiguity characters that include the gap state are
3873
            included
3874
        drawable
3875
            if True, resulting object is capable of plotting data via specified
3876
            plot type 'bar', 'box' or 'violin'
3877
        """
3878
        from cogent3.draw.drawable import Drawable
6✔
3879

3880
        gap_array = self.get_gap_array(include_ambiguity=include_ambiguity)
6✔
3881
        darr = DictArrayTemplate(self.names)
6✔
3882

3883
        if unique:
6✔
3884
            # we identify cols with a single non-gap character
3885
            gap_cols = gap_array.sum(axis=0) == self.num_seqs - 1
6✔
3886
            gap_array = gap_array[:, gap_cols] == False  # noqa
6✔
3887
        elif induced_by:
6✔
3888
            # identify all columns with gap opposite
3889
            gap_cols = gap_array.sum(axis=0) > 0
6✔
3890
            gap_array = gap_array[:, gap_cols] == False  # noqa
6✔
3891
        else:
3892
            gap_cols = gap_array.sum(axis=0) > 0
6✔
3893
            gap_array = gap_array[:, gap_cols]
6✔
3894

3895
        result = gap_array.sum(axis=1)
6✔
3896
        result = darr.wrap(result)
6✔
3897
        if drawable:
6✔
3898
            drawable = drawable.lower()
6✔
3899
            trace_name = pathlib.Path(self.source).name if self.source else None
6✔
3900
            draw = Drawable("Gaps Per Sequence", showlegend=False)
6✔
3901
            draw.layout |= {"yaxis": {"title": "Gap counts"}}
6✔
3902
            if drawable == "bar":
6✔
3903
                trace = UnionDict(type="bar", y=result.array, x=self.names)
6✔
3904
            else:
3905
                trace = UnionDict(
6✔
3906
                    type=drawable,
3907
                    y=result.array,
3908
                    text=self.names,
3909
                    name=trace_name,
3910
                )
3911

3912
            draw.add_trace(trace)
6✔
3913
            result = draw.bound_to(result)
6✔
3914

3915
        return result
6✔
3916

3917
    def variable_positions(
6✔
3918
        self,
3919
        include_gap_motif: bool = True,
3920
        include_ambiguity: bool = False,
3921
        motif_length: int = 1,
3922
    ) -> tuple[int, ...]:
3923
        """Return a list of variable position indexes.
3924

3925
        Parameters
3926
        ----------
3927
        include_gap_motif
3928
            if False, sequences with a gap motif in a column are ignored.
3929
        include_ambiguity
3930
            if True, all states are considered.
3931
        motif_length
3932
            if any position within a motif is variable, the entire motif is
3933
            considered variable.
3934

3935
        Returns
3936
        -------
3937
        tuple of integers, if motif_length > 1, the returned positions are
3938
        motif_length long sequential indices.
3939

3940
        Notes
3941
        -----
3942
        Truncates alignment to be modulo motif_length.
3943
        """
3944
        align_len = len(self) // motif_length * motif_length
6✔
3945
        # columns is 2D array with alignment columns as rows
3946
        pos = self.storage.variable_positions(
6✔
3947
            list(self.name_map.values()),
3948
            start=self._slice_record.plus_start,
3949
            stop=min(self._slice_record.plus_stop, align_len),
3950
            step=self._slice_record.plus_step,
3951
        )
3952
        if not pos.size:
6✔
3953
            return ()
6✔
3954

3955
        alpha = self.storage.alphabet
6✔
3956
        gap_index = alpha.gap_index or len(alpha)
6✔
3957
        missing_index = alpha.missing_index or len(alpha)
6✔
3958
        if include_gap_motif and include_ambiguity:
6✔
3959
            # allow all states
3960
            func = None
6✔
3961
            kwargs = {}
6✔
3962
        elif include_gap_motif and self.moltype.gapped_missing_alphabet:
6✔
3963
            # allow canonical, gap, missing
3964
            func = _var_pos_canonical_or_gap
6✔
3965
            kwargs = {
6✔
3966
                "gap_index": gap_index,
3967
                "missing_index": missing_index,
3968
            }
3969
        elif include_ambiguity and self.moltype.degen_alphabet:
6✔
3970
            # anything but a gap
3971
            func = _var_pos_not_gap
6✔
3972
            kwargs = {
6✔
3973
                "gap_index": gap_index,
3974
            }
3975
        else:
3976
            # canonical only
3977
            func = _var_pos_canonical
6✔
3978
            kwargs = {
6✔
3979
                "gap_index": gap_index,
3980
            }
3981

3982
        indices: npt.NDArray[numpy.bool] = numpy.zeros(align_len, dtype=bool)
6✔
3983
        if func is None:
6✔
3984
            indices[pos] = True
6✔
3985
        else:
3986
            array_seqs = self.storage.get_positions(list(self.name_map.values()), pos)
6✔
3987
            indices[pos] = func(array_seqs, **kwargs)
6✔
3988

3989
        if self._slice_record.is_reversed:
6✔
3990
            # for reverse complement alignments
3991
            # because we have a bool vector entire alignment length
3992
            # just reversing the order is all we need to do since the
3993
            # numpy.where statement will return positions in this new order
3994
            indices = indices[::-1]
6✔
3995

3996
        if motif_length > 1:
6✔
3997
            var_pos = indices.reshape(-1, motif_length).any(axis=1).repeat(motif_length)
6✔
3998
        else:
3999
            var_pos = indices
6✔
4000

4001
        var_pos_where = numpy.where(var_pos)[0]
6✔
4002
        return tuple(var_pos_where.tolist())
6✔
4003

4004
    def omit_bad_seqs(self, quantile: float | None = None) -> Self:
6✔
4005
        """Returns new alignment without sequences with a number of uniquely
4006
        introduced gaps exceeding quantile
4007

4008
        Uses count_gaps_per_seq(unique=True) to obtain the counts of gaps
4009
        uniquely introduced by a sequence. The cutoff is the quantile of
4010
        this distribution.
4011

4012
        Parameters
4013
        ----------
4014
        quantile
4015
            sequences whose unique gap count is in a quantile larger than this
4016
            cutoff are excluded. The default quantile is (num_seqs - 1) / num_seqs
4017
        """
4018
        gap_counts = self.count_gaps_per_seq(unique=True)
6✔
4019
        quantile = quantile or (self.num_seqs - 1) / self.num_seqs
6✔
4020
        cutoff = numpy.quantile(gap_counts.array, quantile)
6✔
4021
        names = [name for name, count in gap_counts.items() if count <= cutoff]
6✔
4022
        return self.take_seqs(cast("list[str]", names))
6✔
4023

4024
    def get_degapped_relative_to(self, name: str) -> Self:
6✔
4025
        """Remove all columns with gaps in sequence with given name.
4026

4027
        Parameters
4028
        ----------
4029
        name
4030
            sequence name
4031

4032
        Notes
4033
        -----
4034
        The returned alignment will not retain an annotation_db if present.
4035
        """
4036

4037
        if name not in self.names:
6✔
4038
            msg = f"Alignment missing sequence named {name!r}"
6✔
4039
            raise ValueError(msg)
6✔
4040

4041
        gapindex = self.moltype.most_degen_alphabet().gap_index
6✔
4042
        seqindex = self.names.index(name)
6✔
4043
        indices = self.array_seqs[seqindex] != gapindex
6✔
4044
        new = self.array_seqs[:, indices]
6✔
4045

4046
        new_seq_data = self._seqs_data.from_names_and_array(
6✔
4047
            names=self._name_map.values(),
4048
            data=new,
4049
            alphabet=self.moltype.most_degen_alphabet(),
4050
        )
4051
        kwargs = self._get_init_kwargs()
6✔
4052
        kwargs["seqs_data"] = new_seq_data
6✔
4053
        kwargs.pop("annotation_db", None)
6✔
4054
        kwargs.pop("slice_record", None)
6✔
4055
        return self.__class__(**kwargs)
6✔
4056

4057
    def matching_ref(self, ref_name: str, gap_fraction: float, gap_run: int) -> Self:
6✔
4058
        """Returns new alignment with seqs well aligned with a reference.
4059

4060
        Parameters
4061
        ----------
4062
        ref_name
4063
            name of the sequence to use as the reference
4064
        gap_fraction
4065
            fraction of positions that either have a gap in the
4066
            template but not in the seq or in the seq but not in the template
4067
        gap_run
4068
            number of consecutive gaps tolerated in query relative to
4069
            sequence or sequence relative to query
4070
        """
4071
        template = self.seqs[ref_name]
6✔
4072
        gap_filter = make_gap_filter(template, gap_fraction, gap_run)
6✔
4073
        return self.take_seqs_if(gap_filter)
6✔
4074

4075
    def sliding_windows(
6✔
4076
        self,
4077
        window: int,
4078
        step: int,
4079
        start: int | None = None,
4080
        end: int | None = None,
4081
    ) -> Iterator[Alignment]:
4082
        """Generator yielding new alignments of given length and interval.
4083

4084
        Parameters
4085
        ----------
4086
        window
4087
            The length of each returned alignment.
4088
        step
4089
            The interval between the start of the successive windows.
4090
        start
4091
            first window start position
4092
        end
4093
            last window start position
4094
        """
4095
        start = start or 0
6✔
4096
        end = len(self) - window + 1 if end is None else end
6✔
4097
        end = min(len(self) - window + 1, end)
6✔
4098
        if start < end and len(self) - end >= window - 1:
6✔
4099
            for pos in range(start, end, step):
6✔
4100
                yield self[pos : pos + window]
6✔
4101

4102
    def gapped_by_map(self, keep: FeatureMap, **kwargs: Any) -> Self:
6✔
4103
        # refactor: docstring
4104
        # TODO: kath, not explicitly tested
4105
        seqs: dict[str, NumpyIntArrayType] = {}
×
4106
        for seq in self.seqs:
×
4107
            selected = seq[keep]
×
4108
            seqs[self.name_map[seq.name]] = numpy.array(selected.gapped_seq)
×
4109

4110
        seqs_data = self._seqs_data.from_seqs(
×
4111
            data=seqs,
4112
            alphabet=self.moltype.most_degen_alphabet(),
4113
        )
4114
        init_kwargs = self._get_init_kwargs()
×
4115
        init_kwargs.pop("annotation_db", None)
×
4116
        init_kwargs |= kwargs
×
4117
        init_kwargs["seqs_data"] = seqs_data
×
4118
        init_kwargs.pop("slice_record", None)
×
4119
        return self.__class__(**init_kwargs)
×
4120

4121
    def filtered(
6✔
4122
        self,
4123
        predicate: Callable[[list[Aligned]], bool],
4124
        motif_length: int = 1,
4125
        drop_remainder: bool = True,
4126
        **kwargs: Any,
4127
    ) -> Self:
4128
        """The alignment positions where predicate(column) is true.
4129

4130
        Parameters
4131
        ----------
4132
        predicate
4133
            a callback function that takes an tuple of motifs and returns
4134
            True/False
4135
        motif_length
4136
            length of the motifs the sequences should be split  into, eg. 3 for
4137
            filtering aligned codons.
4138
        drop_remainder
4139
            If length is not modulo motif_length, allow dropping the terminal
4140
            remaining columns
4141
        """
4142
        # refactor: type hint for predicate
4143
        length = len(self)
6✔
4144
        drop = length % motif_length
6✔
4145
        if drop != 0 and not drop_remainder:
6✔
4146
            msg = f"aligned length not divisible by motif_length={motif_length}"
6✔
4147
            raise ValueError(
6✔
4148
                msg,
4149
            )
4150
        length -= drop
6✔
4151
        kept = numpy.zeros(length, dtype=bool)
6✔
4152
        for pos in range(0, length, motif_length):
6✔
4153
            seqs = [seq[pos : pos + motif_length] for seq in self.seqs]
6✔
4154
            if predicate(seqs):
6✔
4155
                kept[pos : pos + motif_length] = True
6✔
4156

4157
        indices = numpy.where(kept)[0]
6✔
4158

4159
        return self.take_positions(indices.tolist())
6✔
4160

4161
    def no_degenerates(self, motif_length: int = 1, allow_gap: bool = False) -> Self:
6✔
4162
        """returns new alignment without degenerate characters
4163

4164
        Parameters
4165
        ----------
4166
        motif_length
4167
            sequences are segmented into units of this size and the segments are
4168
            excluded if they contain degenerate characters.
4169
        allow_gap
4170
            whether gaps are allowed or whether they are treated as a degenerate
4171
            character (latter is default, as most evolutionary modelling treats
4172
            gaps as N).
4173
        """
4174
        if self.moltype.degen_alphabet is None:
6✔
4175
            msg = (
6✔
4176
                f"Invalid MolType={self.moltype.label} (no degenerate characters), "
4177
                "create the alignment using DNA, RNA or PROTEIN"
4178
            )
4179
            raise c3_moltype.MolTypeError(
6✔
4180
                msg,
4181
            )
4182

4183
        chars = len(self.moltype)
6✔
4184

4185
        array_pos = self.array_positions
6✔
4186
        # by design, char alphabets are organised such that the canonical
4187
        # characters always occur first, followed by gap, then ambiguity
4188
        # characters. so we can define a cutoff as follows:
4189
        cutoff = chars + 1 if allow_gap else chars
6✔
4190
        indices: npt.NDArray[numpy.bool] = cast(
6✔
4191
            "npt.NDArray[numpy.bool]", (array_pos < cutoff).all(axis=1)
4192
        )
4193

4194
        if motif_length > 1:
6✔
4195
            num_motif = len(self) // motif_length
6✔
4196

4197
            if remainder := len(self) % motif_length:
6✔
4198
                indices = indices[:-remainder]
6✔
4199
                array_pos = array_pos[:-remainder]
6✔
4200

4201
            motif_valid = indices.reshape(num_motif, motif_length).all(axis=1).flatten()
6✔
4202
            indices = numpy.repeat(motif_valid, motif_length)
6✔
4203

4204
        selected = array_pos[indices].T
6✔
4205

4206
        aligned_seqs_data = self._seqs_data.from_names_and_array(
6✔
4207
            names=self._name_map.values(),
4208
            data=selected,
4209
            alphabet=self.moltype.most_degen_alphabet(),
4210
        )
4211
        kwargs = self._get_init_kwargs()
6✔
4212
        kwargs["seqs_data"] = aligned_seqs_data
6✔
4213
        kwargs.pop("annotation_db", None)
6✔
4214
        kwargs.pop("slice_record", None)
6✔
4215
        return self.__class__(**kwargs)
6✔
4216

4217
    def _omit_gap_pos_single(
6✔
4218
        self,
4219
        gap_index: int,
4220
        missing_index: int | None,
4221
        allowed_num: int,
4222
    ) -> npt.NDArray[numpy.bool]:
4223
        # for motif_length == 1
4224
        indices = numpy.empty(len(self), dtype=bool)
6✔
4225
        for i, col in enumerate(self.array_seqs.T):
6✔
4226
            indices[i] = _gap_ok_vector_single(
6✔
4227
                col,
4228
                gap_index,
4229
                missing_index,
4230
                allowed_num,
4231
            )
4232
        return indices
6✔
4233

4234
    def _omit_gap_pos_multi(
6✔
4235
        self,
4236
        gap_index: int,
4237
        missing_index: int | None,
4238
        allowed_num: int,
4239
        motif_length: int,
4240
    ) -> npt.NDArray[numpy.bool]:
4241
        # for motif_length > 1
4242
        num_motifs = len(self) // motif_length
6✔
4243
        indices = numpy.empty(num_motifs * motif_length, dtype=bool)
6✔
4244
        for i in range(0, len(self), motif_length):
6✔
4245
            col = self.array_seqs.T[i : i + motif_length].T
6✔
4246
            ok = _gap_ok_vector_multi(
6✔
4247
                col,
4248
                gap_index,
4249
                missing_index,
4250
                motif_length,
4251
                allowed_num,
4252
            )
4253
            indices[i : i + motif_length] = ok
6✔
4254
        return indices
6✔
4255

4256
    def omit_gap_pos(
6✔
4257
        self,
4258
        allowed_gap_frac: float | None = None,
4259
        motif_length: int = 1,
4260
    ) -> Self:
4261
        """Returns new alignment where all cols (motifs) have <= allowed_gap_frac gaps.
4262

4263
        Parameters
4264
        ----------
4265
        allowed_gap_frac
4266
            specifies proportion of gaps is allowed in each column. Set to 0 to
4267
            exclude columns with any gaps, 1 to include all columns. Default is
4268
            None which is equivalent to (num_seqs-1)/num_seqs and leads to
4269
            elimination of columns that are only gaps.
4270
        motif_length
4271
            set's the "column" width, e.g. setting to 3 corresponds to codons.
4272
            A motif that includes a gap at any position is included in the
4273
            counting.
4274
        """
4275
        alpha = self.moltype.most_degen_alphabet()
6✔
4276
        gap_index = alpha.gap_index
6✔
4277
        missing_index = alpha.missing_index
6✔
4278
        if not gap_index and not missing_index:
6✔
4279
            return self
6✔
4280
        gap_index = cast("int", gap_index)
6✔
4281
        allowed_num = (
6✔
4282
            self.num_seqs - 1
4283
            if allowed_gap_frac is None
4284
            else int(numpy.floor(self.num_seqs * allowed_gap_frac))
4285
        )
4286

4287
        # we are assuming gap and missing data have same value on other strand
4288
        positions = self.array_positions
6✔
4289
        if motif_length == 1:
6✔
4290
            indices = self._omit_gap_pos_single(gap_index, missing_index, allowed_num)
6✔
4291
        else:
4292
            positions = positions[: motif_length * (len(positions) // motif_length)]
6✔
4293
            indices = self._omit_gap_pos_multi(
6✔
4294
                gap_index,
4295
                missing_index,
4296
                allowed_num,
4297
                motif_length,
4298
            )
4299
        selected = positions[indices, :].T
6✔
4300

4301
        aligned_seqs_data = self._seqs_data.from_names_and_array(
6✔
4302
            names=self._name_map.values(),
4303
            data=selected,
4304
            alphabet=alpha,
4305
        )
4306
        kwargs = self._get_init_kwargs()
6✔
4307
        kwargs["seqs_data"] = aligned_seqs_data
6✔
4308
        # slice record now needs to be reset since we likely have disjoint positions
4309
        kwargs.pop("slice_record", None)
6✔
4310
        return self.__class__(**kwargs)
6✔
4311

4312
    def sample(
6✔
4313
        self,
4314
        *,
4315
        n: int | None = None,
4316
        with_replacement: bool = False,
4317
        motif_length: int = 1,
4318
        randint: Callable[
4319
            [int, int | None, int | None], NumpyIntArrayType
4320
        ] = numpy.random.randint,
4321
        permutation: Callable[[int], NumpyIntArrayType] = numpy.random.permutation,
4322
    ) -> Self:
4323
        """Returns random sample of positions from self, e.g. to bootstrap.
4324

4325
        Parameters
4326
        ----------
4327
        n
4328
            number of positions to sample. If None, all positions are sampled.
4329
        with_replacement
4330
            if True, samples with replacement.
4331
        motif_length
4332
            number of positions to sample as a single motif.
4333
        randint
4334
            random number generator, default is numpy.randint
4335
        permutation
4336
            function to generate a random permutation of positions, default is
4337
            numpy.permutation
4338

4339
        Notes
4340
        -----
4341
        By default (resampling all positions without replacement), generates
4342
        a permutation of the positions of the alignment.
4343

4344
        Setting with_replacement to True and otherwise leaving parameters as
4345
        defaults generates a standard bootstrap resampling of the alignment.
4346
        """
4347
        # refactor: type hint for randint, permutation
4348
        # refactor: array
4349
        #  Given array_pos property, it will be efficient to generate random
4350
        # indices and use numpy.take() using that. In the case of motif_length
4351
        # != 1, the total number of positions is just len(self) // motif_length.
4352
        # Having produced that, those indices can be scaled back up, or the
4353
        # numpy array reshaped.
4354

4355
        population_size = len(self) // motif_length
6✔
4356
        if not with_replacement and n and n > population_size:
6✔
4357
            msg = f"cannot sample without replacement when {n=} > {population_size=}"
6✔
4358
            raise ValueError(msg)
6✔
4359

4360
        n = n or population_size
6✔
4361

4362
        if with_replacement:
6✔
4363
            locations = randint(0, population_size, n)
6✔
4364
        else:
4365
            locations = permutation(population_size)[:n]
6✔
4366

4367
        if motif_length == 1:
6✔
4368
            positions = locations
6✔
4369
        else:
4370
            positions = numpy.empty(n * motif_length, dtype=int)
6✔
4371
            for i, loc in enumerate(locations):
6✔
4372
                positions[i * motif_length : (i + 1) * motif_length] = range(
6✔
4373
                    loc * motif_length,
4374
                    (loc + 1) * motif_length,
4375
                )
4376

4377
        return self.take_positions(positions)
6✔
4378

4379
    def quick_tree(
6✔
4380
        self,
4381
        calc: str = "pdist",
4382
        drop_invalid: bool = False,
4383
        parallel: bool = False,
4384
        use_hook: str | None = None,
4385
    ) -> PhyloNode:
4386
        """Returns a phylogenetic tree.
4387

4388
        Parameters
4389
        ----------
4390
        calc
4391
            a pairwise distance calculator or name of one. For options see
4392
            cogent3.evolve.fast_distance.available_distances
4393
        drop_invalid
4394
            If True, sequences for which a pairwise distance could not be
4395
            calculated are excluded. If False, an ArithmeticError is raised if
4396
            a distance could not be computed on observed data.
4397
        parallel
4398
            parallel execution of distance calculations
4399
        use_hook
4400
            name of a third-party package that implements the quick_tree
4401
            hook. If not specified, defaults to the first available hook or
4402
            the cogent3 quick_tree() app. To force default, set
4403
            use_hook="cogent3".
4404

4405
        Returns
4406
        -------
4407
        a phylogenetic tree
4408
        """
4409
        dm = self.distance_matrix(
6✔
4410
            calc=calc,
4411
            drop_invalid=drop_invalid,
4412
            parallel=parallel,
4413
        )
4414
        return dm.quick_tree(use_hook=use_hook)
6✔
4415

4416
    def get_projected_feature(
6✔
4417
        self, *, seqid: str, feature: Feature[Alignment]
4418
    ) -> Feature[c3_sequence.Sequence]:
4419
        """returns an alignment feature projected onto the seqid sequence
4420

4421
        Parameters
4422
        ----------
4423
        seqid
4424
            name of the sequence to project the feature onto
4425
        feature
4426
            a Feature, bound to self, that will be projected
4427

4428
        Returns
4429
        -------
4430
        a new Feature bound to seqid
4431

4432
        Notes
4433
        -----
4434
        The alignment coordinates of feature are converted into the seqid
4435
        sequence coordinates and the object is bound to that sequence.
4436

4437
        The feature is added to the annotation_db.
4438
        """
4439
        target_aligned = self.seqs[seqid]
6✔
4440
        if feature.parent is not self:
6✔
4441
            msg = "Feature does not belong to this alignment"
×
4442
            raise ValueError(msg)
×
4443
        result = feature.remapped_to(target_aligned.seq, target_aligned.map)
6✔
4444
        # property ensures db is created
4445
        self.annotation_db.add_feature(**feature.to_dict())
6✔
4446
        return result
6✔
4447

4448
    def get_projected_features(
6✔
4449
        self, *, seqid: str, **kwargs: Any
4450
    ) -> list[Feature[c3_sequence.Sequence]]:
4451
        """projects all features from other sequences onto seqid"""
4452
        annots: list[Feature[Alignment]] = []
6✔
4453
        for name in self.names:
6✔
4454
            if name == seqid:
6✔
4455
                continue
6✔
4456
            annots.extend(list(self.get_features(seqid=name, **kwargs)))
6✔
4457
        return [self.get_projected_feature(seqid=seqid, feature=a) for a in annots]
6✔
4458

4459
    def get_drawables(
6✔
4460
        self,
4461
        *,
4462
        biotype: str | tuple[str, ...] | list[str] | set[str] | None = None,
4463
    ) -> dict[str, list[Shape]]:
4464
        """returns a dict of drawables, keyed by type
4465

4466
        Parameters
4467
        ----------
4468
        biotype
4469
            passed to get_features(biotype). Can be a single biotype or
4470
            series. Only features matching this will be included.
4471
        """
4472
        result: defaultdict[str, list[Shape]] = defaultdict(list)
6✔
4473
        for f in self.get_features(biotype=biotype, allow_partial=True):
6✔
4474
            result[f.biotype].append(f.get_drawable())
6✔
4475
        return result
6✔
4476

4477
    def get_drawable(
6✔
4478
        self,
4479
        *,
4480
        biotype: str | tuple[str, ...] | list[str] | set[str] | None = None,
4481
        width: float = 600,
4482
        vertical: int = False,
4483
        title: str | None = None,
4484
    ) -> Drawable | None:
4485
        """make a figure from sequence features
4486

4487
        Parameters
4488
        ----------
4489
        biotype
4490
            passed to get_features(biotype). Can be a single biotype or
4491
            series. Only features matching this will be included.
4492
        width
4493
            width in pixels
4494
        vertical
4495
            rotates the drawable
4496
        title
4497
            title for the plot
4498

4499
        Returns
4500
        -------
4501
        a Drawable instance
4502
        """
4503
        # TODO gah I think this needs to be modified to make row-blocks
4504
        # for each sequence in the alignment, or are we displaying the
4505
        # sequence name in the feature label?
4506
        from cogent3.draw.drawable import Drawable
6✔
4507

4508
        drawables = self.get_drawables(biotype=biotype)
6✔
4509
        if not drawables:
6✔
4510
            return None
6✔
4511
        # we order by tracks
4512
        top: float = 0
6✔
4513
        space = 0.25
6✔
4514
        annotes: list[Shape] = []
6✔
4515
        annott = None
6✔
4516
        for feature_type in drawables:
6✔
4517
            new_bottom = top + space
6✔
4518
            for i, annott in enumerate(drawables[feature_type]):
6✔
4519
                annott.shift(y=new_bottom - annott.bottom)
6✔
4520
                if i > 0:
6✔
4521
                    # refactor: design
4522
                    # modify the api on annott, we should not be using
4523
                    # a private attribute!
4524
                    annott._showlegend = False
6✔
4525
                annotes.append(annott)
6✔
4526

4527
            top = cast("Shape", annott).top
6✔
4528

4529
        top += space
6✔
4530
        height = max((top / len(self)) * width, 300)
6✔
4531
        xaxis: dict[str, Any] = {
6✔
4532
            "range": [0, len(self)],
4533
            "zeroline": False,
4534
            "showline": True,
4535
        }
4536
        yaxis: dict[str, Any] = {
6✔
4537
            "range": [0, top],
4538
            "visible": False,
4539
            "zeroline": True,
4540
            "showline": True,
4541
        }
4542

4543
        if vertical:
6✔
4544
            all_traces = [t.T.as_trace() for t in annotes]
6✔
4545
            width, height = height, width
6✔
4546
            xaxis, yaxis = yaxis, xaxis
6✔
4547
        else:
4548
            all_traces = [t.as_trace() for t in annotes]
6✔
4549

4550
        drawer = Drawable(title=title, traces=all_traces, width=width, height=height)
6✔
4551
        drawer.layout.update(xaxis=xaxis, yaxis=yaxis)
6✔
4552
        return drawer
6✔
4553

4554
    def seqlogo(
6✔
4555
        self,
4556
        width: float = 700,
4557
        height: float = 100,
4558
        wrap: int | None = None,
4559
        vspace: float = 0.005,
4560
        colours: dict[str, str] | None = None,
4561
    ) -> Drawable | None:
4562
        """returns Drawable sequence logo using mutual information
4563

4564
        Parameters
4565
        ----------
4566
        width, height
4567
            plot dimensions in pixels
4568
        wrap
4569
            number of alignment columns per row
4570
        vspace
4571
            vertical separation between rows, as a proportion of total plot
4572
        colours
4573
            mapping of characters to colours. If note provided, defaults to
4574
            custom for everything ecept protein, which uses protein moltype
4575
            colours.
4576

4577
        Notes
4578
        -----
4579
        Computes MI based on log2 and includes the gap state, so the maximum
4580
        possible value is -log2(1/num_states)
4581
        """
4582
        assert 0 <= vspace <= 1, "vertical space must be in range 0, 1"
6✔
4583
        freqs = self.counts_per_pos(allow_gap=True).to_freq_array()
6✔
4584
        if colours is None and "protein" in self.moltype.label:
6✔
4585
            colours = self.moltype._colors
×
4586

4587
        return freqs.logo(
6✔
4588
            width=width,
4589
            height=height,
4590
            wrap=wrap,
4591
            vspace=vspace,
4592
            colours=colours,
4593
        )
4594

4595
    def coevolution(
6✔
4596
        self,
4597
        stat: str = "nmi",
4598
        segments: list[tuple[int, int]] | None = None,
4599
        drawable: str | None = None,
4600
        show_progress: bool = False,
4601
        parallel: bool = False,
4602
        par_kw: dict[str, Any] | None = None,
4603
    ) -> DictArray:
4604
        """performs pairwise coevolution measurement
4605

4606
        Parameters
4607
        ----------
4608
        stat
4609
            coevolution metric, defaults to 'nmi' (Normalized Mutual
4610
            Information). Valid choices are 'rmi' (Resampled Mutual Information)
4611
            and 'mi', mutual information.
4612
        segments
4613
            coordinates of the form [(start, end), ...] where all possible
4614
            pairs of alignment positions within and between segments are
4615
            examined.
4616
        drawable
4617
            Result object is capable of plotting data specified type. str value
4618
            must be one of plot type 'box', 'heatmap', 'violin'.
4619
        show_progress
4620
            shows a progress bar.
4621
        parallel
4622
            run in parallel, according to arguments in par_kwargs.
4623
        par_kw
4624
            dict of values for configuring parallel execution.
4625

4626
        Returns
4627
        -------
4628
        DictArray of results with lower-triangular values. Upper triangular
4629
        elements and estimates that could not be computed for numerical reasons
4630
        are set as nan
4631
        """
4632
        from cogent3.draw.drawable import AnnotatedDrawable, Drawable
6✔
4633
        from cogent3.evolve import coevolution as coevo
6✔
4634
        from cogent3.util.union_dict import UnionDict
6✔
4635

4636
        # refactor: design
4637
        # These graphical representations of matrices should be separate functions
4638
        # in the drawing submodule somewhere
4639
        stat = stat.lower()
6✔
4640
        range_segments = [range(*segment) for segment in segments] if segments else None
6✔
4641

4642
        result = coevo.coevolution_matrix(
6✔
4643
            alignment=self,
4644
            stat=stat,
4645
            positions=range_segments,
4646
            show_progress=show_progress,
4647
            parallel=parallel,
4648
            par_kw=par_kw,
4649
        )
4650
        if drawable is None:
6✔
4651
            return result
6✔
4652

4653
        trace_name = pathlib.Path(self.source).name if self.source else None
6✔
4654
        if drawable in ("box", "violin"):
6✔
4655
            trace = UnionDict(
6✔
4656
                type=drawable,
4657
                y=result.array.flatten(),
4658
                showlegend=False,
4659
                name="",
4660
            )
4661
            draw = Drawable(
6✔
4662
                width=500,
4663
                height=500,
4664
                title=trace_name,
4665
                ytitle=stat.upper(),
4666
            )
4667
            draw.add_trace(trace)
6✔
4668
            result = draw.bound_to(result)
6✔
4669
        elif drawable:
6✔
4670
            axis_title = "Alignment Position"
6✔
4671
            axis_args = {
6✔
4672
                "showticklabels": True,
4673
                "mirror": True,
4674
                "showgrid": False,
4675
                "showline": True,
4676
                "zeroline": False,
4677
            }
4678
            height = 500
6✔
4679
            width = height
6✔
4680
            draw = Drawable(
6✔
4681
                width=width,
4682
                height=height,
4683
                xtitle=axis_title,
4684
                ytitle=axis_title,
4685
                title=trace_name,
4686
            )
4687

4688
            trace = UnionDict(
6✔
4689
                type="heatmap",
4690
                z=result.array,
4691
                colorbar={"title": {"text": stat.upper(), "font": {"size": 16}}},
4692
            )
4693
            draw.add_trace(trace)
6✔
4694
            draw.layout.xaxis.update(axis_args)
6✔
4695
            draw.layout.yaxis.update(axis_args)
6✔
4696

4697
            try:
6✔
4698
                bottom = self.get_drawable()
6✔
4699
                left = self.get_drawable(vertical=True)
6✔
4700
            except AttributeError:
×
4701
                bottom = None
×
4702
                left = None
×
4703

4704
            if bottom and drawable != "box":
6✔
4705
                xlim = 1.2
6✔
4706
                draw.layout.width = height * xlim
6✔
4707
                layout: dict[str, dict[str, float]] = {"legend": {"x": xlim, "y": 1}}
6✔
4708
                draw = AnnotatedDrawable(
6✔
4709
                    draw,
4710
                    left_track=left,
4711
                    bottom_track=bottom,
4712
                    xtitle=axis_title,
4713
                    ytitle=axis_title,
4714
                    xrange=[0, len(self)],
4715
                    yrange=[0, len(self)],
4716
                    layout=layout,
4717
                )
4718

4719
            result = draw.bound_to(result)
6✔
4720

4721
        return result
6✔
4722

4723
    def information_plot(
6✔
4724
        self,
4725
        width: int | None = None,
4726
        height: int | None = None,
4727
        window: int | None = None,
4728
        stat: str = "median",
4729
        include_gap: bool = True,
4730
    ) -> Drawable | AnnotatedDrawable:
4731
        """plot information per position
4732

4733
        Parameters
4734
        ----------
4735
        width
4736
            figure width in pixels
4737
        height
4738
            figure height in pixels
4739
        window
4740
            used for smoothing line, defaults to sqrt(length)
4741
        stat
4742
            'mean' or 'median, used as the summary statistic for each window
4743
        include_gap
4744
            whether to include gap counts, shown on right y-axis
4745
        """
4746
        from cogent3.draw.drawable import AnnotatedDrawable, Drawable
6✔
4747

4748
        # refactor: design
4749
        # These graphical representations of matrices should be separate functions
4750
        # in the drawing submodule somewhere
4751
        window = int(window or numpy.sqrt(len(self)))
6✔
4752
        y: NumpyFloatArrayType | list[numpy.floating]
4753
        y = self.entropy_per_pos()
6✔
4754
        nan_indices = numpy.isnan(y)
6✔
4755
        if nan_indices.sum() == y.shape[0]:  # assuming 1D array
6✔
4756
            y.fill(0.0)
×
4757
        max_entropy: numpy.floating = y[nan_indices == False].max()  # noqa
6✔
4758
        y = max_entropy - y  # convert to information
6✔
4759
        # now make all nan's 0
4760
        y[nan_indices] = 0
6✔
4761
        if stat not in ("mean", "median"):
6✔
4762
            msg = 'stat must be either "mean" or "median"'
×
4763
            raise ValueError(msg)
×
4764
        calc_stat = numpy.mean if stat == "mean" else numpy.median
6✔
4765
        num = len(y) - window
6✔
4766
        v = [calc_stat(y[i : i + window]) for i in range(num)]
6✔
4767
        x = numpy.arange(num)
6✔
4768
        x += window // 2  # shift x-coordinates to middle of window
6✔
4769
        trace_line = UnionDict(
6✔
4770
            type="scatter",
4771
            x=x,
4772
            y=v,
4773
            mode="lines",
4774
            name=f"smoothed {stat}",
4775
            line={"shape": "spline", "smoothing": 1.3},
4776
        )
4777
        trace_marks = UnionDict(
6✔
4778
            type="scatter",
4779
            x=numpy.arange(y.shape[0]),
4780
            y=y,
4781
            mode="markers",
4782
            opacity=0.5,
4783
            name="per position",
4784
        )
4785
        layout = UnionDict(
6✔
4786
            title="Information per position",
4787
            width=width,
4788
            height=height,
4789
            showlegend=True,
4790
            yaxis={"range": [0, max(y) * 1.2], "showgrid": False},
4791
            xaxis={
4792
                "showgrid": False,
4793
                "range": [0, len(self)],
4794
                "mirror": True,
4795
                "showline": True,
4796
            },
4797
        )
4798

4799
        traces = [trace_marks, trace_line]
6✔
4800
        if include_gap:
6✔
4801
            gap_counts = self.count_gaps_per_pos()
6✔
4802
            y = [
6✔
4803
                calc_stat(cast("NumpyFloatArrayType", gap_counts[i : i + window]))
4804
                for i in range(num)
4805
            ]
4806
            trace_g = UnionDict(
6✔
4807
                type="scatter",
4808
                x=x,
4809
                y=y,
4810
                yaxis="y2",
4811
                name="Gaps",
4812
                mode="lines",
4813
                line={"shape": "spline", "smoothing": 1.3},
4814
            )
4815
            traces += [trace_g]
6✔
4816
            layout.yaxis2 = {
6✔
4817
                "title": "Count",
4818
                "side": "right",
4819
                "overlaying": "y",
4820
                "range": [0, max(cast("Iterable[float]", gap_counts)) * 1.2],
4821
                "showgrid": False,
4822
                "showline": True,
4823
            }
4824

4825
        draw = Drawable(
6✔
4826
            title="Information per position",
4827
            xtitle="Position",
4828
            ytitle=f"Information (window={window})",
4829
        )
4830
        draw.traces.extend(traces)
6✔
4831
        draw.layout |= layout
6✔
4832
        draw.layout.legend = {"x": 1.1, "y": 1}
6✔
4833

4834
        try:
6✔
4835
            drawable = self.get_drawable()
6✔
4836
        except AttributeError:
×
4837
            drawable = None
×
4838

4839
        if drawable:
6✔
4840
            draw = AnnotatedDrawable(
×
4841
                draw,
4842
                bottom_track=drawable,
4843
                xtitle="position",
4844
                ytitle=f"Information (window={window})",
4845
                layout=layout,
4846
            )
4847

4848
        return draw
6✔
4849

4850
    def apply_scaled_gaps(
6✔
4851
        self,
4852
        other: SequenceCollection,
4853
        aa_to_codon: bool = True,
4854
    ) -> Self:
4855
        """applies gaps in self to ungapped sequences"""
4856
        assert set(other.names) == set(self.names), "Names must match"
6✔
4857
        if aa_to_codon and not all(
6✔
4858
            (not self.moltype.is_nucleic, other.moltype.is_nucleic),
4859
        ):
4860
            msg = "aa_to_codon True requires nucleic moltype destination not {self.moltype.name!r}"
6✔
4861
            raise ValueError(
6✔
4862
                msg,
4863
            )
4864
        if not aa_to_codon and not all(
6✔
4865
            (self.moltype.is_nucleic, not other.moltype.is_nucleic),
4866
        ):
4867
            msg = f"aa_to_codon False requires protein moltype destination not {other.moltype.name!r}"
6✔
4868
            raise ValueError(
6✔
4869
                msg,
4870
            )
4871

4872
        assert aa_to_codon is not None
6✔
4873
        gaps: dict[str, NumpyIntArrayType] = {}
6✔
4874
        seqs: dict[str, NumpyIntArrayType] = {}
6✔
4875
        scale = 3 if aa_to_codon else 1 / 3
6✔
4876

4877
        for seq in self.seqs:
6✔
4878
            parent_name = self._name_map[seq.name]
6✔
4879
            gaps[parent_name] = seq.map.scaled(scale).array
6✔
4880
            ungapped = numpy.array(other.seqs[seq.name])
6✔
4881
            seqs[parent_name] = ungapped
6✔
4882
            seq_len = self._seqs_data.get_seq_length(parent_name)
6✔
4883
            if len(ungapped) != int(seq_len * scale):
6✔
4884
                msg = f"Destination sequence for {seq.name!r} != {scale:.2f} x {seq_len} sequence"
6✔
4885
                raise ValueError(
6✔
4886
                    msg,
4887
                )
4888

4889
        seq_data = self._seqs_data.from_seqs_and_gaps(
6✔
4890
            seqs=seqs,
4891
            gaps=gaps,
4892
            alphabet=other.moltype.most_degen_alphabet(),
4893
        )
4894
        init_args = self._get_init_kwargs()
6✔
4895
        init_args.pop("slice_record")  # slice is realised
6✔
4896
        init_args["moltype"] = other.moltype
6✔
4897
        init_args["seqs_data"] = seq_data
6✔
4898
        init_args["annotation_db"] = other.annotation_db
6✔
4899
        return self.__class__(**init_args)
6✔
4900

4901
    def copy(self, copy_annotations: bool = False) -> Self:
6✔
4902
        """creates new instance, only mutable attributes are copied"""
4903
        kwargs = self._get_init_kwargs()
6✔
4904
        if copy_annotations:
6✔
4905
            kwargs["annotation_db"] = copy.deepcopy(self._annotation_db)
6✔
4906
        return self.__class__(**kwargs)
6✔
4907

4908
    def deepcopy(self, **kwargs: Any) -> Self:
6✔
4909
        """returns deep copy of self
4910

4911
        Notes
4912
        -----
4913
        Reduced to sliced sequences in self, kwargs are ignored.
4914
        Annotation db is not copied if the alignment has been sliced.
4915
        """
4916
        import copy
6✔
4917

4918
        kwargs = self._get_init_kwargs()
6✔
4919
        kwargs.pop("seqs_data")
6✔
4920
        kwargs["annotation_db"] = (
6✔
4921
            None
4922
            if len(self) != self._seqs_data.align_len
4923
            else copy.deepcopy(self._annotation_db)
4924
        )
4925
        new_seqs_data_dict = {
6✔
4926
            n: self._seqs_data.get_gapped_seq_array(
4927
                seqid=n,
4928
                start=self._slice_record.plus_start,
4929
                stop=self._slice_record.plus_stop,
4930
                step=self._slice_record.plus_step,
4931
            )
4932
            for n in self._name_map.values()
4933
        }
4934
        new_seqs_data = self._seqs_data.from_seqs(
6✔
4935
            data=new_seqs_data_dict,
4936
            alphabet=self.moltype.most_degen_alphabet(),
4937
        )
4938
        kwargs["seqs_data"] = new_seqs_data
6✔
4939
        kwargs["slice_record"] = (
6✔
4940
            SliceRecord(parent_len=new_seqs_data.align_len, step=-1)
4941
            if self._slice_record.is_reversed
4942
            else None
4943
        )
4944
        return self.__class__(**kwargs)
6✔
4945

4946
    def with_masked_annotations(
6✔
4947
        self,
4948
        biotypes: PySeq[str],
4949
        mask_char: str = "?",
4950
        shadow: bool = False,
4951
        seqid: str | None = None,
4952
    ) -> Self:
4953
        """returns an alignment with regions replaced by mask_char
4954

4955
        Parameters
4956
        ----------
4957
        biotypes
4958
            annotation type(s)
4959
        mask_char
4960
            must be a character valid for the moltype. The default value is
4961
            the most ambiguous character, eg. '?' for DNA
4962
        shadow
4963
            If True, masks everything but the biotypes
4964
        seqid
4965
            name of sequence to mask, defaults to all
4966
        """
4967
        # by doing numpy.array(seq), this method applies the slice_record
4968
        # and modifies the underlying sequence data. We therefore split
4969
        # gapped sequences into their gaps and seq components and discard
4970
        # the current slice_record.
4971
        gaps: dict[str, NumpyIntArrayType] = {}
6✔
4972
        ungapped: dict[str, NumpyIntArrayType] = {}
6✔
4973
        for seq in self.seqs:
6✔
4974
            parent_name = self._name_map[seq.name]
6✔
4975
            gaps[parent_name] = seq.map.array
6✔
4976
            if seqid is None or seq.name == seqid:
6✔
4977
                ungapped[parent_name] = numpy.array(
6✔
4978
                    seq.seq.with_masked_annotations(biotypes, mask_char, shadow),
4979
                )
4980
            else:
4981
                ungapped[parent_name] = numpy.array(seq.seq)
6✔
4982

4983
        seq_data = self._seqs_data.from_seqs_and_gaps(
6✔
4984
            seqs=ungapped,
4985
            gaps=gaps,
4986
            alphabet=self.moltype.most_degen_alphabet(),
4987
        )
4988
        init = self._get_init_kwargs()
6✔
4989
        init["seqs_data"] = seq_data
6✔
4990
        # we have to drop the slice record since we have changed the data
4991
        init["slice_record"] = None
6✔
4992
        return self.__class__(**init)
6✔
4993

4994

4995
def make_gap_filter(
6✔
4996
    template: Aligned, gap_fraction: float, gap_run: int
4997
) -> Callable[[Aligned], bool]:
4998
    """Returns f(seq) -> True if no gap runs and acceptable gap fraction.
4999

5000
    Calculations relative to template.
5001
    gap_run = number of consecutive gaps allowed in either the template or seq
5002
    gap_fraction = fraction of positions that either have a gap in the template
5003
        but not in the seq or in the seq but not in the template
5004
    NOTE: template and seq must both be ArraySequence objects.
5005
    """
5006
    template_gaps = numpy.array(template.gap_vector())
6✔
5007

5008
    def result(seq: Aligned) -> bool:
6✔
5009
        """Returns True if seq adhers to the gap threshold and gap fraction."""
5010
        seq_gaps = numpy.array(seq.gap_vector())
6✔
5011
        # check if gap amount bad
5012
        if sum(seq_gaps != template_gaps) / float(len(seq)) > gap_fraction:
6✔
5013
            return False
6✔
5014
        # check if gap runs bad
5015
        return not (
6✔
5016
            b"\x01" * gap_run
5017
            in numpy.logical_and(seq_gaps, numpy.logical_not(template_gaps))
5018
            .astype(numpy.uint8)
5019
            .tobytes()
5020
            or b"\x01" * gap_run
5021
            in numpy.logical_and(template_gaps, numpy.logical_not(seq_gaps))
5022
            .astype(numpy.uint8)
5023
            .tobytes()
5024
        )
5025

5026
    return result
6✔
5027

5028

5029
@numba.njit(cache=True)
5030
def _gap_ok_vector_single(
5031
    data: npt.NDArray[numpy.uint8],
5032
    gap_index: int,
5033
    missing_index: int | None,
5034
    num_allowed: int,
5035
) -> bool:  # pragma: no cover
5036
    """returns indicies for which the number of gaps & missing data is less than or equal to num_allowed"""
5037
    num = 0
5038
    for i in range(len(data)):
5039
        if data[i] == gap_index or (
5040
            missing_index is not None and data[i] == missing_index
5041
        ):
5042
            num += 1
5043

5044
        if num > num_allowed:
5045
            break
5046

5047
    return num <= num_allowed
5048

5049

5050
@numba.njit(cache=True)
5051
def _gap_ok_vector_multi(
5052
    motifs: NumpyIntArrayType,
5053
    gap_index: int,
5054
    missing_index: int | None,
5055
    motif_length: int,
5056
    num_allowed: int,
5057
) -> bool:  # pragma: no cover
5058
    """returns indicies for which the number of gaps & missing data in a vector of motifs is less than or equal to num_allowed"""
5059
    num = 0
5060
    for motif in motifs:
5061
        for j in range(motif_length):
5062
            if motif[j] == gap_index or (
5063
                missing_index is not None and motif[j] == missing_index
5064
            ):
5065
                num += 1
5066
                break
5067

5068
        if num > num_allowed:
5069
            break
5070

5071
    return num <= num_allowed
5072

5073

5074
@numba.jit(cache=True)
5075
def _var_pos_canonical_or_gap(
5076
    arr: NumpyIntArrayType,
5077
    gap_index: int,
5078
    missing_index: int,
5079
) -> npt.NDArray[numpy.bool]:  # pragma: no cover
5080
    """return boolean array indicating columns with more than one value below threshold
5081

5082
    Parameters
5083
    ----------
5084
    arr
5085
        a 2D array with rows being sequences and columns positions
5086
    gap_index
5087
        value of gap state
5088
    missing_index
5089
        value of missing state
5090

5091
    Returns
5092
    ------
5093
    a boolean array
5094
    """
5095
    # relying on consistent ordering of gap as num canonical + 1
5096
    m, n = arr.shape
5097
    result = numpy.zeros(n, dtype=numpy.bool_)
5098
    for pos in numpy.arange(n):
5099
        last = -1
5100
        for seq in numpy.arange(m):
5101
            state = arr[seq, pos]
5102
            if state <= gap_index or state == missing_index:
5103
                if last == -1:
5104
                    last = state
5105
                elif state != last:
5106
                    result[pos] = True
5107
                    break
5108

5109
    return result
5110

5111

5112
@numba.jit(cache=True)
5113
def _var_pos_canonical(
5114
    arr: NumpyIntArrayType,
5115
    gap_index: int,
5116
) -> npt.NDArray[numpy.bool]:  # pragma: no cover
5117
    """return boolean array indicating columns with more than one value below threshold
5118

5119
    Parameters
5120
    ----------
5121
    arr
5122
        a 2D array with rows being sequences and columns positions
5123
    gap_index
5124
        value of gap state
5125

5126
    Returns
5127
    ------
5128
    a boolean array
5129
    """
5130
    # relying on consistent ordering of gap as num canonical + 1
5131
    m, n = arr.shape
5132
    result = numpy.zeros(n, dtype=numpy.bool_)
5133
    for pos in numpy.arange(n):
5134
        last = -1
5135
        for seq in numpy.arange(m):
5136
            state = arr[seq, pos]
5137
            if state < gap_index:
5138
                if last == -1:
5139
                    last = state
5140
                elif state != last:
5141
                    result[pos] = True
5142
                    break
5143

5144
    return result
5145

5146

5147
@numba.jit(cache=True)
5148
def _var_pos_not_gap(
5149
    arr: NumpyIntArrayType,
5150
    gap_index: int,
5151
) -> npt.NDArray[numpy.bool]:  # pragma: no cover
5152
    """return boolean array indicating columns with more than one value below threshold
5153

5154
    Parameters
5155
    ----------
5156
    arr
5157
        a 2D array with rows being sequences and columns positions
5158
    gap_index
5159
        value of gap state
5160

5161
    Returns
5162
    ------
5163
    a boolean array
5164
    """
5165
    m, n = arr.shape
5166
    result = numpy.zeros(n, dtype=numpy.bool_)
5167
    for pos in numpy.arange(n):
5168
        last = -1
5169
        for seq in numpy.arange(m):
5170
            state = arr[seq, pos]
5171
            if state != gap_index:
5172
                if last == -1:
5173
                    last = state
5174
                elif state != last:
5175
                    result[pos] = True
5176
                    break
5177

5178
    return result
5179

5180

5181
class _IndexableSeqs(Generic[TSequenceOrAligned]):
6✔
5182
    """container that is created by SequenceCollection and Alignment instances"""
5183

5184
    def __init__(
6✔
5185
        self,
5186
        parent: SequenceCollection | Alignment,
5187
        make_seq: Callable[[str], TSequenceOrAligned],
5188
    ) -> None:
5189
        """
5190
        Parameters
5191
        ----------
5192
        parent
5193
            either a SequenceCollection or Alignment instance
5194
        make_seq
5195
            method on the parent that creates the correct object type when given a seqid
5196
        """
5197
        self.parent = parent
6✔
5198
        self._make_seq: Callable[[str], TSequenceOrAligned] = make_seq
6✔
5199

5200
    def __getitem__(
6✔
5201
        self,
5202
        key: str | int | slice,
5203
    ) -> TSequenceOrAligned:
5204
        if isinstance(key, int):
6✔
5205
            key = self.parent.names[key]
6✔
5206

5207
        if isinstance(key, str):
6✔
5208
            return self._make_seq(key)
6✔
5209

5210
        msg = f"indexing not supported for {type(key)}, try .take_seqs()"
×
5211
        raise TypeError(msg)
×
5212

5213
    def __repr__(self) -> str:
6✔
5214
        one_seq = self[self.parent.names[0]]
6✔
5215
        return f"({one_seq!r}, + {self.parent.num_seqs - 1} seqs)"
6✔
5216

5217
    def __len__(self) -> int:
6✔
5218
        return self.parent.num_seqs
6✔
5219

5220
    def __iter__(self) -> Iterator[TSequenceOrAligned]:
6✔
5221
        for name in self.parent.names:
6✔
5222
            yield self._make_seq(name)
6✔
5223

5224

5225
class _SeqNamer:
6✔
5226
    def __init__(
6✔
5227
        self,
5228
        name_func: Callable[[str], str] | None = None,
5229
        base_name: str = "seq",
5230
        start_at: int = 0,
5231
    ) -> None:
5232
        self._base_name = base_name
6✔
5233
        self._num = start_at
6✔
5234
        self._name_func = name_func
6✔
5235

5236
    def __call__(
6✔
5237
        self,
5238
        seq: str | bytes | NumpyIntArrayType | c3_sequence.Sequence | Aligned,
5239
        name: str | None = None,
5240
    ) -> str:
5241
        name = name or getattr(seq, "name", name)
6✔
5242

5243
        if not name:
6✔
5244
            name = f"{self._base_name}_{self._num}"
6✔
5245
            self._num += 1
6✔
5246
        elif self._name_func:
6✔
5247
            name = self._name_func(name)
6✔
5248

5249
        return name
6✔
5250

5251

5252
def _make_name_seq_mapping(
6✔
5253
    data: PySeq[str | bytes | NumpyIntArrayType | c3_sequence.Sequence | Aligned]
5254
    | Mapping[str, str | bytes | NumpyIntArrayType | c3_sequence.Sequence | Aligned]
5255
    | set[str | bytes | NumpyIntArrayType | c3_sequence.Sequence | Aligned]
5256
    | SequenceCollection,
5257
    seq_namer: _SeqNamer,
5258
) -> dict[str, str | bytes | NumpyIntArrayType | c3_sequence.Sequence | Aligned]:
5259
    """returns a dict mapping names to sequences
5260

5261
    Parameters
5262
    ----------
5263
    data
5264
        a dict of {name: seq, ...}, or python sequence of StrORBytesORArrayOrSeq
5265
    seq_namer
5266
        callback that takes the sequence and optionally a name and
5267
        returns a new name
5268
    """
5269
    if isinstance(data, dict):
6✔
5270
        return {seq_namer(seq=seq, name=name): seq for name, seq in data.items()}
6✔
5271

5272
    if isinstance(data, (tuple, set)):
6✔
5273
        data = list(data)
6✔
5274

5275
    if isinstance(data, list):
6✔
5276
        if isinstance(data[0], (list, tuple)):
6✔
5277
            # handle case where we've been given data like
5278
            # for example [[name, seq], ...]
5279
            with contextlib.suppress(ValueError):
6✔
5280
                return _make_name_seq_mapping(dict(data), seq_namer)  # type: ignore[arg-type]
6✔
5281

5282
        return {seq_namer(seq=record): record for record in data}
6✔
5283

5284
    if not hasattr(data, "seqs"):
6✔
5285
        msg = f"_make_name_seq_mapping not implemented for {type(data)}"
6✔
5286
        raise NotImplementedError(msg)
6✔
5287

5288
    return {
6✔
5289
        seq_namer(seq=record): record
5290
        for record in cast("SequenceCollection", data).seqs
5291
    }
5292

5293

5294
def _seqname_parent_name(
6✔
5295
    record: str | bytes | NumpyIntArrayType | c3_sequence.Sequence | Aligned,
5296
    name: str | None = None,
5297
) -> tuple[str, str]:
5298
    if hasattr(record, "parent_coordinates"):
6✔
5299
        record = cast("c3_sequence.Sequence | Aligned", record)
6✔
5300
        parent_name, *_ = record.parent_coordinates()
6✔
5301
        return name or cast("str", record.name), parent_name or cast("str", name)
6✔
5302
    if hasattr(record, "name"):
6✔
5303
        record = cast("Aligned", record)
×
5304
        return name or record.name, name or record.name
×
5305
    name = cast("str", name)
6✔
5306
    return name, name
6✔
5307

5308

5309
def make_name_map(data: dict[str, str | bytes | NumpyIntArrayType]) -> dict[str, str]:
6✔
5310
    """returns a dict mapping names to parent names
5311

5312
    Parameters
5313
    ----------
5314
    data
5315
        a dict of {name: seq, ...}
5316

5317
    Returns
5318
    -------
5319
        empty dict if names and parent names are always equal
5320
    """
5321
    name_map: dict[str, str] = {}
6✔
5322
    for name, record in data.items():
6✔
5323
        new_name, parent_name = _seqname_parent_name(record, name=name)
6✔
5324
        if new_name == parent_name:
6✔
5325
            continue
6✔
5326
        name_map[new_name] = parent_name
6✔
5327

5328
    return name_map
6✔
5329

5330

5331
def merged_db_collection(
6✔
5332
    seqs: Mapping[str, str | bytes | NumpyIntArrayType | c3_sequence.Sequence]
5333
    | Iterable[str | bytes | NumpyIntArrayType | c3_sequence.Sequence],
5334
) -> SupportsFeatures | None:
5335
    """return one AnnotationDb from a collection of sequences
5336

5337
    Parameters
5338
    ----------
5339
    seqs
5340
        iterable list of data
5341

5342
    Returns
5343
    -------
5344
    list of all annotation db's
5345

5346
    Raises
5347
    ------
5348
    TypeError if different classes of AnnotationDb
5349
    """
5350
    if isinstance(seqs, Mapping):
6✔
5351
        seqs = cast(
6✔
5352
            "Iterable[str | bytes | NumpyIntArrayType | c3_sequence.Sequence]",
5353
            seqs.values(),
5354
        )
5355

5356
    first = None
6✔
5357
    merged = None
6✔
5358
    for seq in seqs:
6✔
5359
        if not isinstance(seq, c3_sequence.Sequence):
6✔
5360
            continue
6✔
5361

5362
        db: SqliteAnnotationDbMixin | None = cast(
6✔
5363
            "SqliteAnnotationDbMixin | None", seq.annotation_db
5364
        )
5365

5366
        if first is None and db:
6✔
5367
            # TODO gah should this be a copy so immutable?
5368
            first = db
6✔
5369
            merged = first
6✔
5370
            continue
6✔
5371

5372
        if first is None or db is None or first is db:
6✔
5373
            continue
6✔
5374
        first.update(db)
6✔
5375

5376
    return cast("SupportsFeatures", merged)
6✔
5377

5378

5379
@dataclasses.dataclass
6✔
5380
class raw_seq_data:
6✔
5381
    seq: bytes | NumpyIntArrayType
6✔
5382
    name: str | None = None
6✔
5383
    parent_name: str | None = None
6✔
5384
    offset: int = 0
6✔
5385
    is_reversed: bool = False
6✔
5386

5387

5388
def coerce_to_raw_seq_data(
6✔
5389
    seq: str | bytes | NumpyIntArrayType | c3_sequence.Sequence | Aligned,
5390
    moltype: c3_moltype.MolType[Any],
5391
    name: str | None = None,
5392
) -> raw_seq_data:
5393
    """aggregates sequence data into a single object
5394

5395
    Parameters
5396
    ----------
5397
    seq
5398
        sequence data, can be a string, bytes, numpy array or Sequence
5399
        instance. The latter is converted to a numpy array.
5400
    moltype
5401
        name of a cogent3 molecular type, or a cogent3 MolType instance
5402
    name
5403
        name of the sequence
5404

5405
    Returns
5406
    -------
5407
        raw_seq_data
5408
    """
5409
    if isinstance(seq, Aligned):
6✔
5410
        # convert the Aligned instance
5411
        # into a Sequence instance that includes the gaps
5412
        seq = seq.gapped_seq
6✔
5413
    if isinstance(seq, str):
6✔
5414
        seq = seq.encode("utf8")
6✔
5415

5416
    if isinstance(seq, numpy.ndarray):
6✔
5417
        return raw_seq_data(seq=seq, name=name)
6✔
5418

5419
    if isinstance(seq, bytes):
6✔
5420
        # converts the sequence to a upper case bytes, and applies
5421
        # moltype coercion if needed (e.g. RNA to DNA replaces U with T)
5422
        seq = seq.upper()
6✔
5423
        seq = moltype.coerce_to(seq) if moltype.coerce_to else seq
6✔
5424
        return raw_seq_data(seq=seq, name=name)
6✔
5425

5426
    if isinstance(seq, c3_sequence.Sequence):
6✔
5427
        # converts the sequence to a numpy array
5428
        seq = seq.to_moltype(moltype)
6✔
5429
        parent_name, start, _, step = seq.parent_coordinates()
6✔
5430
        raw_seq = numpy.array(seq)
6✔
5431
        return raw_seq_data(
6✔
5432
            seq=raw_seq,
5433
            name=name or seq.name,
5434
            parent_name=parent_name,
5435
            offset=start,
5436
            is_reversed=step < 0,
5437
        )
5438

5439
    msg = f"coerce_to_seq_data not implemented for {type(seq)}"
×
5440
    raise TypeError(msg)
×
5441

5442

5443
def prep_for_seqs_data(
6✔
5444
    data: Mapping[str, str | bytes | NumpyIntArrayType | c3_sequence.Sequence],
5445
    moltype: c3_moltype.MolType[Any],
5446
    seq_namer: _SeqNamer,
5447
) -> tuple[dict[str, bytes | NumpyIntArrayType], dict[str, int], set[str]]:
5448
    """normalises input data for constructing a SeqsData object
5449

5450
    Parameters
5451
    ----------
5452
    data
5453
        a Mapping[str, str | bytes | NumpyIntArrayType | c3_sequence.Sequence] where the key is the sequence name and
5454
        the value the sequence, or a series of Sequences instances
5455
    moltype
5456
        name of a cogent3 molecular type, or a cogent3 MolType instance
5457
    seq_namer
5458
        callback that takes the sequence name and transforms it to a new name
5459

5460
    Returns
5461
    -------
5462
    seq data as dict[str, bytes | numpy.ndarray], offsets as dict[str, int],
5463
    reversed sequences as set[str], name_map as dict[str, str]
5464
    """
5465
    seqs: dict[str, bytes | NumpyIntArrayType] = {}  # for the (Aligned)SeqsDataABC
6✔
5466
    offsets: dict[str, int] = {}  # for the (Aligned)SeqsDataABC
6✔
5467
    rvd: set[str] = set()
6✔
5468
    for name, seq in data.items():
6✔
5469
        name = seq_namer(seq=seq, name=name)  # noqa: PLW2901
6✔
5470
        seq_data = coerce_to_raw_seq_data(seq, moltype, name=name)
6✔
5471
        if seq_data.offset:
6✔
5472
            offsets[seq_data.parent_name or name] = seq_data.offset
6✔
5473
        seqs[cast("str", seq_data.parent_name or seq_data.name)] = seq_data.seq
6✔
5474
        if seq_data.is_reversed:
6✔
5475
            rvd.add(name)
6✔
5476

5477
    if len(data) != len(seqs):
6✔
5478
        msg = "Not all derived names for storage unique"
6✔
5479
        raise ValueError(msg)
6✔
5480

5481
    return seqs, offsets, rvd
6✔
5482

5483

5484
def make_unaligned_storage(
6✔
5485
    data: dict[str, str | bytes | NumpyIntArrayType],
5486
    *,
5487
    moltype: MolTypes,
5488
    label_to_name: Callable[[str], str] | None = None,
5489
    offset: dict[str, int] | None = None,
5490
    reversed_seqs: set[str] | None = None,
5491
    storage_backend: str | None = None,
5492
    **kwargs: Any,
5493
) -> SeqsDataABC:
5494
    """makes the unaligned storage instance for a SequenceCollection
5495

5496
    Parameters
5497
    ----------
5498
    data
5499
        {name: seq}
5500
    moltype
5501
        label or instance of a cogent3 MolType
5502
    label_to_name
5503
        callable to convert the original name to a new name
5504
    offset
5505
        {name: offset} where the offset is the start position of the
5506
        sequence in the parent sequence
5507
    reversed_seqs
5508
        set of names that are on the reverse strand of the parent sequence
5509
    storage_backend
5510
        name of a third-party storage driver to provide storage functionality
5511
    kwargs
5512
        additional keyword arguments for the storage driver
5513

5514
    Notes
5515
    -----
5516
    This function is intended for use primarly by make_unaligned_seqs function.
5517
    """
5518
    moltype = c3_moltype.get_moltype(moltype)
6✔
5519
    alphabet = moltype.most_degen_alphabet()
6✔
5520
    # if we have Sequences, we need to construct the name map before we construct
5521
    # the SeqsData object - however, if a name_map is provided, we assume that it
5522
    # corrects for any naming differences in data and skip this step
5523
    assign_names = _SeqNamer(name_func=label_to_name)
6✔
5524
    seqs_data, offs, rvd = prep_for_seqs_data(data, moltype, assign_names)
6✔
5525
    offset = offset or {}
6✔
5526
    offset = {**offs, **offset}
6✔
5527
    # seqs_data keys should be the same as the value of name_map
5528
    # name_map keys correspond to names in the sequence collection
5529
    # name_map values correspond to names in seqs_data
5530
    sd_kwargs: dict[str, Any] = {
6✔
5531
        "data": seqs_data,
5532
        "alphabet": alphabet,
5533
        "offset": offset,
5534
        "reversed_seqs": reversed_seqs or rvd,
5535
        **kwargs,
5536
    }
5537
    klass = cast(
6✔
5538
        "SeqsDataABC",
5539
        c3_plugin.get_unaligned_storage_driver(storage_backend),
5540
    )
5541
    return klass.from_seqs(**sd_kwargs)
6✔
5542

5543

5544
def make_unaligned_seqs(
6✔
5545
    data: Mapping[str, str | bytes | NumpyIntArrayType]
5546
    | list[str | bytes | NumpyIntArrayType]
5547
    | SeqsDataABC,
5548
    *,
5549
    moltype: MolTypes,
5550
    label_to_name: Callable[[str], str] | None = None,
5551
    info: dict[str, Any] | None = None,
5552
    source: PathType | None = None,
5553
    annotation_db: SupportsFeatures | None = None,
5554
    offset: dict[str, int] | None = None,
5555
    name_map: dict[str, str] | None = None,
5556
    is_reversed: bool = False,
5557
    reversed_seqs: set[str] | None = None,
5558
    storage_backend: str | None = None,
5559
    **kwargs: Any,
5560
) -> SequenceCollection:
5561
    """Initialise an unaligned collection of sequences.
5562

5563
    Parameters
5564
    ----------
5565
    data
5566
        sequence data, a SeqsData, a dict {name: seq, ...}, an iterable of sequences
5567
    moltype
5568
        string representation of the moltype, e.g., 'dna', 'protein'.
5569
    label_to_name
5570
        function for converting original names into other names.
5571
    info
5572
        a dict from which to make an info object
5573
    source
5574
        origins of this data, defaults to 'unknown'. Converted to a string
5575
        and added to info["source"].
5576
    annotation_db
5577
        annotation database to attach to the collection
5578
    offset
5579
        a dict mapping names to annotation offsets
5580
    name_map
5581
        a dict mapping sequence names to "parent" sequence names. The parent
5582
        name will be used for querying a annotation_db.
5583
    is_reversed
5584
        entire collection has been reverse complemented
5585
    reversed_seqs
5586
        set of names that are on the reverse strand of the parent sequence
5587
    storage_backend
5588
        name of the storage backend to use for the SeqsData object, defaults to
5589
        cogent3 builtin.
5590
    kwargs
5591
        keyword arguments for the storage driver
5592

5593
    Notes
5594
    -----
5595
    If no annotation_db is provided, but the sequences are annotated, an
5596
    annotation_db is created by merging any annotation db's found in the sequences.
5597
    If the sequences are annotated AND an annotation_db is provided, only the
5598
    annotation_db is used.
5599
    """
5600
    # refactor: design
5601
    # rename offset to offsets as it could track potentially multiple offsets
5602

5603
    if isinstance(data, SeqsDataABC):
6✔
5604
        moltype = c3_moltype.get_moltype(moltype)
6✔
5605
        if not moltype.is_compatible_alphabet(data.alphabet):
6✔
5606
            msg = f"Provided moltype: {moltype} is not compatible with SeqsData alphabet {data.alphabet}"
6✔
5607
            raise ValueError(
6✔
5608
                msg,
5609
            )
5610

5611
        # we cannot set offset when creating from an SeqsData
5612
        if offset:
6✔
5613
            msg = f"Setting offset is not supported for {data=}"
6✔
5614
            raise ValueError(msg)
6✔
5615

5616
        info = info if isinstance(info, dict) else {}
6✔
5617
        source = str(source) if source else str(info.pop("source", "unknown"))
6✔
5618
        seqs = SequenceCollection(
6✔
5619
            seqs_data=data,
5620
            moltype=moltype,
5621
            info=info,
5622
            annotation_db=annotation_db,
5623
            source=source,
5624
            name_map=name_map,
5625
            is_reversed=is_reversed,
5626
        )
5627
        if label_to_name:
6✔
5628
            seqs = seqs.renamed_seqs(label_to_name)
6✔
5629
        return seqs
6✔
5630

5631
    if len(data) == 0:
6✔
5632
        msg = "data must be at least one sequence."
6✔
5633
        raise ValueError(msg)
6✔
5634

5635
    annotation_db = annotation_db or merged_db_collection(data)
6✔
5636
    assign_names = _SeqNamer(name_func=label_to_name)
6✔
5637
    data = cast(
6✔
5638
        "dict[str, str | bytes | NumpyIntArrayType]",
5639
        _make_name_seq_mapping(data, assign_names),
5640
    )
5641
    if name_map is None:
6✔
5642
        name_map = make_name_map(data) or None
6✔
5643

5644
    seqs_data = make_unaligned_storage(
6✔
5645
        data,
5646
        label_to_name=label_to_name,
5647
        moltype=moltype,
5648
        offset=offset,
5649
        reversed_seqs=reversed_seqs,
5650
        storage_backend=storage_backend,
5651
        **kwargs,
5652
    )
5653
    # as they were handled in this function, we do not pass on:
5654
    # offset
5655
    # label_to_name
5656
    # reversed_seqs
5657
    # storage_backend
5658

5659
    return make_unaligned_seqs(
6✔
5660
        seqs_data,
5661
        moltype=moltype,
5662
        info=info,
5663
        source=source,
5664
        annotation_db=annotation_db,
5665
        name_map=name_map,
5666
        is_reversed=is_reversed,
5667
    )
5668

5669

5670
@register_deserialiser(
6✔
5671
    get_object_provenance(SequenceCollection),
5672
    "cogent3.core.alignment.SequenceCollection",
5673
)
5674
def deserialise_sequence_collection(
6✔
5675
    data: dict[str, str | dict[str, str]],
5676
) -> SequenceCollection:
5677
    """deserialise SequenceCollection"""
5678
    if "init_args" not in data:
6✔
5679
        return deserialise_old_to_new_type_seqcoll(data)
6✔
5680

5681
    return SequenceCollection.from_rich_dict(data)
6✔
5682

5683

5684
def deserialise_old_to_new_type_seqcoll(
6✔
5685
    data: dict[str, Any],
5686
) -> SequenceCollection:
5687
    """deserialise old type SequenceCollection as a new type collection"""
5688
    moltype_name: MolTypes = data["moltype"]
6✔
5689
    moltype_name = "text" if moltype_name == "bytes" else moltype_name
6✔
5690
    info_data = data["info"]
6✔
5691
    source = info_data.pop("source", None)
6✔
5692
    seq_data = {
6✔
5693
        seqid: record["seq"]["init_args"]["seq"]
5694
        for seqid, record in data["seqs"].items()
5695
    }
5696
    return make_unaligned_seqs(
6✔
5697
        seq_data, moltype=moltype_name, info=info_data, source=source
5698
    )
5699

5700

5701
def make_aligned_storage(
6✔
5702
    data: Mapping[str, str | bytes | NumpyIntArrayType],
5703
    *,
5704
    moltype: MolTypes,
5705
    label_to_name: Callable[[str], str] | None = None,
5706
    offset: dict[str, int] | None = None,
5707
    reversed_seqs: set[str] | None = None,
5708
    storage_backend: str | None = None,
5709
    **kwargs: Any,
5710
) -> AlignedSeqsDataABC:
5711
    """makes the aligned storage instance for Alignment class
5712

5713
    Parameters
5714
    ----------
5715
    data
5716
        {seq name: sequence, ...}
5717
    moltype
5718
        label for looking up the molecular type
5719
    label_to_name
5720
        renamer function
5721
    offset
5722
        mapping of {name: annotation offset}
5723
    reversed_seqs
5724
        name of sequences that are reverse complemented relative to their
5725
        parent
5726
    storage_backend
5727
        name of a third-party storage driver to provide storage functionality
5728
    kwargs
5729
        additional keyword arguments for the storage driver
5730

5731
    Notes
5732
    -----
5733
    This function is intended for use primarly by make_aligned_seqs function.
5734
    """
5735
    assign_names = _SeqNamer(name_func=label_to_name)
6✔
5736
    moltype = c3_moltype.get_moltype(moltype)
6✔
5737
    alphabet = moltype.most_degen_alphabet()
6✔
5738
    seqs_data, offs, rvd = prep_for_seqs_data(data, moltype, assign_names)
6✔
5739
    asd_kwargs: dict[str, Any] = {
6✔
5740
        "alphabet": alphabet,
5741
        "offset": offset or offs,
5742
        "reversed_seqs": reversed_seqs or rvd,
5743
        "data": seqs_data,
5744
        **kwargs,
5745
    }
5746
    # plugin module is private only to exclude users, not developers
5747
    klass = c3_plugin.get_aligned_storage_driver(storage_backend)
6✔
5748
    return klass.from_seqs(**asd_kwargs)
6✔
5749

5750

5751
def make_aligned_seqs(
6✔
5752
    data: Mapping[str, str | bytes | NumpyIntArrayType]
5753
    | list[str | bytes | NumpyIntArrayType]
5754
    | AlignedSeqsDataABC,
5755
    *,
5756
    moltype: MolTypes,
5757
    label_to_name: Callable[[str], str] | None = None,
5758
    info: dict[str, Any] | None = None,
5759
    source: PathType | None = None,
5760
    annotation_db: SupportsFeatures | None = None,
5761
    offset: dict[str, int] | None = None,
5762
    name_map: dict[str, str] | None = None,
5763
    is_reversed: bool | None = None,
5764
    reversed_seqs: set[str] | None = None,
5765
    storage_backend: str | None = None,
5766
    **kwargs: Any,
5767
) -> Alignment:
5768
    """Initialise an aligned collection of sequences.
5769

5770
    Parameters
5771
    ----------
5772
    data
5773
        sequence data, a AlignedSeqsData, a dict {name: seq, ...}, an iterable of sequences
5774
    moltype
5775
        string representation of the moltype, e.g., 'dna', 'protein'.
5776
    label_to_name
5777
        function for converting original names into other names.
5778
    info
5779
        a dict from which to make an info object
5780
    source
5781
        origins of this data, defaults to 'unknown'. Converted to a string
5782
        and added to info["source"].
5783
    annotation_db
5784
        annotation database to attach to the collection
5785
    offset
5786
        a dict mapping names to annotation offsets
5787
    name_map
5788
        a dict mapping sequence names to "parent" sequence names. The parent
5789
        name will be used for querying a annotation_db.
5790
    is_reversed
5791
        entire collection has been reverse complemented
5792
    reversed_seqs
5793
        set of names that are on the reverse strand of the parent sequence
5794
    storage_backend
5795
        name of the storage backend to use for the SeqsData object, defaults to
5796
        cogent3 builtin.
5797
    kwargs
5798
        keyword arguments for the AlignedSeqsData constructor
5799

5800
    Notes
5801
    -----
5802
    If no annotation_db is provided, but the sequences are annotated, an
5803
    annotation_db is created by merging any annotation db's found in the sequences.
5804
    If the sequences are annotated AND an annotation_db is provided, only the
5805
    annotation_db is used.
5806
    """
5807
    if isinstance(data, AlignedSeqsDataABC):
6✔
5808
        moltype = c3_moltype.get_moltype(moltype)
6✔
5809
        if not moltype.is_compatible_alphabet(data.alphabet):
6✔
5810
            msg = f"Provided moltype: {moltype.label} is not compatible with AlignedSeqsData"
6✔
5811
            raise ValueError(
6✔
5812
                msg,
5813
                f" alphabet: {data.alphabet}",
5814
            )
5815

5816
        # we cannot set offset when creating from an AlignedSeqsData
5817
        if offset:
6✔
5818
            msg = f"Setting offset is not supported for {data=}"
6✔
5819
            raise ValueError(msg)
6✔
5820

5821
        info = info if isinstance(info, dict) else {}
6✔
5822
        source = str(source) if source else str(info.pop("source", "unknown"))
6✔
5823
        sr = SliceRecord(parent_len=data.align_len, step=-1) if is_reversed else None
6✔
5824
        aln = Alignment(
6✔
5825
            seqs_data=data,
5826
            moltype=moltype,
5827
            info=info,
5828
            source=source,
5829
            annotation_db=annotation_db,
5830
            name_map=name_map,
5831
            slice_record=sr,
5832
        )
5833
        if label_to_name:
6✔
5834
            aln = aln.renamed_seqs(label_to_name)
6✔
5835
        return aln
6✔
5836

5837
    if len(data) == 0:
6✔
5838
        msg = "data must be at least one sequence."
6✔
5839
        raise ValueError(msg)
6✔
5840

5841
    moltype = c3_moltype.get_moltype(moltype)
6✔
5842
    annotation_db = cast(
6✔
5843
        "SupportsFeatures",
5844
        kwargs.pop("annotation_db", annotation_db)
5845
        or merged_db_collection(
5846
            data,
5847
        ),
5848
    )
5849

5850
    # if we have Sequences, we need to construct the name map before we construct
5851
    # the AlignedSeqsData object - however, if a name_map is provided, we assume that it
5852
    # corrects for any naming differences in data and skip this step
5853
    assign_names = _SeqNamer(name_func=label_to_name)
6✔
5854
    data = cast(
6✔
5855
        "dict[str, str | bytes | NumpyIntArrayType]",
5856
        _make_name_seq_mapping(data, assign_names),
5857
    )
5858
    if name_map is None:
6✔
5859
        name_map = make_name_map(data) or None
6✔
5860

5861
    seqs_data = make_aligned_storage(
6✔
5862
        data,
5863
        moltype=moltype,
5864
        label_to_name=label_to_name,
5865
        offset=offset,
5866
        reversed_seqs=reversed_seqs,
5867
        storage_backend=storage_backend,
5868
        **kwargs,
5869
    )
5870
    # as they were handled in this function, we do not pass on:
5871
    # offset
5872
    # label_to_name
5873
    # reversed_seqs
5874
    # storage_backend
5875
    return make_aligned_seqs(
6✔
5876
        seqs_data,
5877
        moltype=moltype,
5878
        info=info,
5879
        annotation_db=annotation_db,
5880
        source=source,
5881
        name_map=name_map,
5882
        is_reversed=is_reversed,
5883
    )
5884

5885

5886
@register_deserialiser(
6✔
5887
    get_object_provenance(Alignment),
5888
    "cogent3.core.alignment.Alignment",
5889
    "cogent3.core.c3_alignment.Alignment",
5890
)
5891
def deserialise_alignment(data: dict[str, str | dict[str, str]]) -> Alignment:
6✔
5892
    if "init_args" not in data:
6✔
5893
        # old type Alignment class
5894
        return deserialise_alignment_to_new_type_alignment(data)
6✔
5895
    return Alignment.from_rich_dict(data)
6✔
5896

5897

5898
@register_deserialiser("cogent3.core.alignment.ArrayAlignment")
6✔
5899
def deserialise_array_align_to_new_type_alignment(
6✔
5900
    data: dict[str, Any],
5901
) -> Alignment:
5902
    """deserialise old type ArrayAlignment as a new type alignment."""
5903
    moltype_name: MolTypes = data["moltype"]
6✔
5904
    moltype_name = "text" if moltype_name == "bytes" else moltype_name
6✔
5905
    info_data: dict[str, Any] = data["info"]
6✔
5906
    source = info_data.pop("source", None) if info_data else None
6✔
5907

5908
    seq_data: dict[str, str | bytes | NumpyIntArrayType] = {}
6✔
5909
    for seqid, record in data["seqs"].items():
6✔
5910
        if isinstance(record["seq"], str):
6✔
5911
            seq = record["seq"]
6✔
5912
        else:
5913
            seq = record["seq"]["init_args"]["seq"]
6✔
5914

5915
        seq_data[seqid] = seq
6✔
5916

5917
    return make_aligned_seqs(
6✔
5918
        seq_data, moltype=moltype_name, info=info_data, source=source
5919
    )
5920

5921

5922
def deserialise_alignment_to_new_type_alignment(
6✔
5923
    data: dict[str, Any],
5924
) -> Alignment:
5925
    """deserialise old type Alignment as a new type alignment"""
5926
    from cogent3 import get_moltype
6✔
5927
    from cogent3.core.location import deserialise_indelmap
6✔
5928

5929
    moltype_name: MolTypes = data["moltype"]
6✔
5930
    mt = get_moltype(moltype_name)
6✔
5931
    alpha = mt.most_degen_alphabet()
6✔
5932
    info_data = data["info"]
6✔
5933
    source = info_data.pop("source", None)
6✔
5934

5935
    seqs: dict[str, NumpyIntArrayType] = {}
6✔
5936
    gaps: dict[str, NumpyIntArrayType] = {}
6✔
5937
    for name, record in data["seqs"].items():
6✔
5938
        seq_data = record["seq_init"]
6✔
5939
        minit = record["map_init"]
6✔
5940
        imap = deserialise_indelmap(minit)
6✔
5941
        raw_seq = seq_data["seq"]["init_args"]["seq"]
6✔
5942
        seqs[name] = raw_seq
6✔
5943
        gaps[name] = imap.array
6✔
5944

5945
    asd = AlignedSeqsData.from_seqs_and_gaps(seqs=seqs, gaps=gaps, alphabet=alpha)
6✔
5946
    return make_aligned_seqs(asd, moltype=moltype_name, info=info_data, source=source)
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc