• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

CCPBioSim / CodeEntropy / 19035988435

03 Nov 2025 01:17PM UTC coverage: 99.433% (-0.6%) from 100.0%
19035988435

Pull #170

github

web-flow
Merge 328e159a2 into a6d704774
Pull Request #170: Add `Python v3.14` Support for `CodeEntropy`

1052 of 1058 relevant lines covered (99.43%)

10.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.73
/CodeEntropy/levels.py
1
import logging
11✔
2

3
import numpy as np
11✔
4
from rich.progress import (
11✔
5
    BarColumn,
6
    Progress,
7
    SpinnerColumn,
8
    TextColumn,
9
    TimeElapsedColumn,
10
)
11

12
logger = logging.getLogger(__name__)
11✔
13

14

15
class LevelManager:
11✔
16
    """
17
    Manages the structural and dynamic levels involved in entropy calculations. This
18
    includes selecting relevant levels, computing axes for translation and rotation,
19
    and handling bead-based representations of molecular systems. Provides utility
20
    methods to extract averaged positions, convert coordinates to spherical systems,
21
    compute weighted forces and torques, and manipulate matrices used in entropy
22
    analysis.
23
    """
24

25
    def __init__(self):
11✔
26
        """
27
        Initializes the LevelManager with placeholders for level-related data,
28
        including translational and rotational axes, number of beads, and a
29
        general-purpose data container.
30
        """
31
        self.data_container = None
11✔
32
        self._levels = None
11✔
33
        self._trans_axes = None
11✔
34
        self._rot_axes = None
11✔
35
        self._number_of_beads = None
11✔
36

37
    def select_levels(self, data_container):
11✔
38
        """
39
        Function to read input system and identify the number of molecules and
40
        the levels (i.e. united atom, residue and/or polymer) that should be used.
41
        The level refers to the size of the bead (atom or collection of atoms)
42
        that will be used in the entropy calculations.
43

44
        Args:
45
            arg_DataContainer: MDAnalysis universe object containing the system of
46
            interest
47

48
        Returns:
49
             number_molecules (int): Number of molecules in the system.
50
             levels (array): Strings describing the length scales for each molecule.
51
        """
52

53
        # fragments is MDAnalysis terminology for what chemists would call molecules
54
        number_molecules = len(data_container.atoms.fragments)
11✔
55
        logger.debug(f"The number of molecules is {number_molecules}.")
11✔
56

57
        fragments = data_container.atoms.fragments
11✔
58
        levels = [[] for _ in range(number_molecules)]
11✔
59

60
        for molecule in range(number_molecules):
11✔
61
            levels[molecule].append(
11✔
62
                "united_atom"
63
            )  # every molecule has at least one atom
64

65
            atoms_in_fragment = fragments[molecule].select_atoms("prop mass > 1.1")
11✔
66
            number_residues = len(atoms_in_fragment.residues)
11✔
67

68
            if len(atoms_in_fragment) > 1:
11✔
69
                levels[molecule].append("residue")
11✔
70

71
                if number_residues > 1:
11✔
72
                    levels[molecule].append("polymer")
11✔
73

74
        logger.debug(f"levels {levels}")
11✔
75

76
        return number_molecules, levels
11✔
77

78
    def get_matrices(
11✔
79
        self,
80
        data_container,
81
        level,
82
        number_frames,
83
        highest_level,
84
        force_matrix,
85
        torque_matrix,
86
    ):
87
        """
88
        Compute and accumulate force/torque covariance matrices for a given level.
89

90
        Parameters:
91
          data_container (MDAnalysis.Universe): Data for a molecule or residue.
92
          level (str): 'polymer', 'residue', or 'united_atom'.
93
          number_frames (int): Number of frames being processed.
94
          highest_level (bool): Whether this is the top (largest bead size) level.
95
          force_matrix, torque_matrix (np.ndarray or None): Accumulated matrices to add
96
          to.
97

98
        Returns:
99
          force_matrix (np.ndarray): Accumulated force covariance matrix.
100
          torque_matrix (np.ndarray): Accumulated torque covariance matrix.
101
        """
102

103
        # Make beads
104
        list_of_beads = self.get_beads(data_container, level)
11✔
105

106
        # number of beads and frames in trajectory
107
        number_beads = len(list_of_beads)
11✔
108

109
        # initialize force and torque arrays
110
        weighted_forces = [None for _ in range(number_beads)]
11✔
111
        weighted_torques = [None for _ in range(number_beads)]
11✔
112

113
        # Calculate forces/torques for each bead
114
        for bead_index in range(number_beads):
11✔
115
            # Set up axes
116
            # translation and rotation use different axes
117
            # how the axes are defined depends on the level
118
            trans_axes, rot_axes = self.get_axes(data_container, level, bead_index)
11✔
119

120
            # Sort out coordinates, forces, and torques for each atom in the bead
121
            weighted_forces[bead_index] = self.get_weighted_forces(
11✔
122
                data_container, list_of_beads[bead_index], trans_axes, highest_level
123
            )
124
            weighted_torques[bead_index] = self.get_weighted_torques(
11✔
125
                data_container, list_of_beads[bead_index], rot_axes
126
            )
127

128
        # Create covariance submatrices
129
        force_submatrix = [
11✔
130
            [0 for _ in range(number_beads)] for _ in range(number_beads)
131
        ]
132
        torque_submatrix = [
11✔
133
            [0 for _ in range(number_beads)] for _ in range(number_beads)
134
        ]
135

136
        for i in range(number_beads):
11✔
137
            for j in range(i, number_beads):
11✔
138
                f_sub = self.create_submatrix(weighted_forces[i], weighted_forces[j])
11✔
139
                t_sub = self.create_submatrix(weighted_torques[i], weighted_torques[j])
11✔
140
                force_submatrix[i][j] = f_sub
11✔
141
                force_submatrix[j][i] = f_sub.T
11✔
142
                torque_submatrix[i][j] = t_sub
11✔
143
                torque_submatrix[j][i] = t_sub.T
11✔
144

145
        # Convert block matrices to full matrix
146
        force_block = np.block(
11✔
147
            [
148
                [force_submatrix[i][j] for j in range(number_beads)]
149
                for i in range(number_beads)
150
            ]
151
        )
152
        torque_block = np.block(
11✔
153
            [
154
                [torque_submatrix[i][j] for j in range(number_beads)]
155
                for i in range(number_beads)
156
            ]
157
        )
158

159
        # Enforce consistent shape before accumulation
160
        if force_matrix is None:
11✔
161
            force_matrix = np.zeros_like(force_block)
11✔
162
        elif force_matrix.shape != force_block.shape:
11✔
163
            raise ValueError(
11✔
164
                f"Inconsistent force matrix shape: existing "
165
                f"{force_matrix.shape}, new {force_block.shape}"
166
            )
167
        else:
168
            force_matrix = force_block
11✔
169

170
        if torque_matrix is None:
11✔
171
            torque_matrix = np.zeros_like(torque_block)
11✔
172
        elif torque_matrix.shape != torque_block.shape:
11✔
173
            raise ValueError(
11✔
174
                f"Inconsistent torque matrix shape: existing "
175
                f"{torque_matrix.shape}, new {torque_block.shape}"
176
            )
177
        else:
178
            torque_matrix = torque_block
11✔
179

180
        return force_matrix, torque_matrix
11✔
181

182
    def get_dihedrals(self, data_container, level):
11✔
183
        """
184
        Define the set of dihedrals for use in the conformational entropy function.
185
        If united atom level, the dihedrals are defined from the heavy atoms
186
        (4 bonded atoms for 1 dihedral).
187
        If residue level, use the bonds between residues to cast dihedrals.
188
        Note: not using improper dihedrals only ones with 4 atoms/residues
189
        in a linear arrangement.
190

191
        Args:
192
          data_container (MDAnalysis.Universe): system information
193
          level (str): level of the hierarchy (should be residue or polymer)
194

195
        Returns:
196
           dihedrals (array): set of dihedrals
197
        """
198
        # Start with empty array
199
        dihedrals = []
11✔
200

201
        # if united atom level, read dihedrals from MDAnalysis universe
202
        if level == "united_atom":
11✔
203
            dihedrals = data_container.dihedrals
11✔
204

205
        # if residue level, looking for dihedrals involving residues
206
        if level == "residue":
11✔
207
            num_residues = len(data_container.residues)
11✔
208
            logger.debug(f"Number Residues: {num_residues}")
11✔
209
            if num_residues < 4:
11✔
210
                logger.debug("no residue level dihedrals")
11✔
211

212
            else:
213
                # find bonds between residues N-3:N-2 and N-1:N
214
                for residue in range(4, num_residues + 1):
11✔
215
                    # Using MDAnalysis selection,
216
                    # assuming only one covalent bond between neighbouring residues
217
                    # TODO not written for branched polymers
218
                    atom_string = (
11✔
219
                        "resindex "
220
                        + str(residue - 4)
221
                        + " and bonded resindex "
222
                        + str(residue - 3)
223
                    )
224
                    atom1 = data_container.select_atoms(atom_string)
11✔
225

226
                    atom_string = (
11✔
227
                        "resindex "
228
                        + str(residue - 3)
229
                        + " and bonded resindex "
230
                        + str(residue - 4)
231
                    )
232
                    atom2 = data_container.select_atoms(atom_string)
11✔
233

234
                    atom_string = (
11✔
235
                        "resindex "
236
                        + str(residue - 2)
237
                        + " and bonded resindex "
238
                        + str(residue - 1)
239
                    )
240
                    atom3 = data_container.select_atoms(atom_string)
11✔
241

242
                    atom_string = (
11✔
243
                        "resindex "
244
                        + str(residue - 1)
245
                        + " and bonded resindex "
246
                        + str(residue - 2)
247
                    )
248
                    atom4 = data_container.select_atoms(atom_string)
11✔
249

250
                    atom_group = atom1 + atom2 + atom3 + atom4
11✔
251
                    dihedrals.append(atom_group.dihedral)
11✔
252

253
        logger.debug(f"Level: {level}, Dihedrals: {dihedrals}")
11✔
254

255
        return dihedrals
11✔
256

257
    def compute_dihedral_conformations(
11✔
258
        self,
259
        selector,
260
        level,
261
        number_frames,
262
        bin_width,
263
        start,
264
        end,
265
        step,
266
        ce,
267
    ):
268
        """
269
        Compute dihedral conformations for a given selector and entropy level.
270

271
        Parameters:
272
            selector (AtomGroup): Atom selection to compute dihedrals for.
273
            level (str): Entropy level ("united_atom" or "residue").
274
            number_frames (int): Number of frames to process.
275
            bin_width (float): Bin width for dihedral angle discretization.
276
            start (int): Start frame index.
277
            end (int): End frame index.
278
            step (int): Step size for frame iteration.
279
            ce : Conformational Entropy class
280

281
        Returns:
282
            states (list): List of conformation strings per frame.
283
        """
284
        # Identify the dihedral angles in the residue/molecule
285
        dihedrals = self.get_dihedrals(selector, level)
11✔
286

287
        # When there are no dihedrals, there is only one possible conformation
288
        # so the conformational states are not relevant
289
        if len(dihedrals) == 0:
11✔
290
            logger.debug("No dihedrals found; skipping conformation assignment.")
11✔
291
            states = []
11✔
292
        else:
293
            # Identify the conformational label for each dihedral at each frame
294
            num_dihedrals = len(dihedrals)
11✔
295
            conformation = np.zeros((num_dihedrals, number_frames))
11✔
296

297
            for i, dihedral in enumerate(dihedrals):
11✔
298
                conformation[i] = ce.assign_conformation(
11✔
299
                    selector, dihedral, number_frames, bin_width, start, end, step
300
                )
301

302
            # for all the dihedrals available concatenate the label of each
303
            # dihedral into the state for that frame
304
            states = [
11✔
305
                state
306
                for state in (
307
                    "".join(str(int(conformation[d][f])) for d in range(num_dihedrals))
308
                    for f in range(number_frames)
309
                )
310
                if state
311
            ]
312

313
        logger.debug(f"level: {level}, states: {states}")
11✔
314
        return states
11✔
315

316
    def get_beads(self, data_container, level):
11✔
317
        """
318
        Function to define beads depending on the level in the hierarchy.
319

320
        Args:
321
           data_container (MDAnalysis.Universe): the molecule data
322
           level (str): the heirarchy level (polymer, residue, or united atom)
323

324
        Returns:
325
           list_of_beads : the relevent beads
326
        """
327

328
        if level == "polymer":
11✔
329
            list_of_beads = []
11✔
330
            atom_group = "all"
11✔
331
            list_of_beads.append(data_container.select_atoms(atom_group))
11✔
332

333
        if level == "residue":
11✔
334
            list_of_beads = []
11✔
335
            num_residues = len(data_container.residues)
11✔
336
            for residue in range(num_residues):
11✔
337
                atom_group = "resindex " + str(residue)
11✔
338
                list_of_beads.append(data_container.select_atoms(atom_group))
11✔
339

340
        if level == "united_atom":
11✔
341
            list_of_beads = []
11✔
342
            heavy_atoms = data_container.select_atoms("prop mass > 1.1")
11✔
343
            if len(heavy_atoms) == 0:
11✔
344
                # molecule without heavy atoms would be a hydrogen molecule
345
                list_of_beads.append(data_container.select_atoms("all"))
11✔
346
            else:
347
                # Select one heavy atom and all light atoms bonded to it
348
                for atom in heavy_atoms:
11✔
349
                    atom_group = (
11✔
350
                        "index "
351
                        + str(atom.index)
352
                        + " or ((prop mass <= 1.1) and bonded index "
353
                        + str(atom.index)
354
                        + ")"
355
                    )
356
                    list_of_beads.append(data_container.select_atoms(atom_group))
11✔
357

358
        logger.debug(f"List of beads: {list_of_beads}")
11✔
359

360
        return list_of_beads
11✔
361

362
    def get_axes(self, data_container, level, index=0):
11✔
363
        """
364
        Function to set the translational and rotational axes.
365
        The translational axes are based on the principal axes of the unit
366
        one level larger than the level we are interested in (except for
367
        the polymer level where there is no larger unit). The rotational
368
        axes use the covalent links between residues or atoms where possible
369
        to define the axes, or if the unit is not bonded to others of the
370
        same level the prinicpal axes of the unit are used.
371

372
        Args:
373
          data_container (MDAnalysis.Universe): the molecule and trajectory data
374
          level (str): the level (united atom, residue, or polymer) of interest
375
          index (int): residue index
376

377
        Returns:
378
          trans_axes : translational axes
379
          rot_axes : rotational axes
380
        """
381
        index = int(index)
11✔
382

383
        if level == "polymer":
11✔
384
            # for polymer use principle axis for both translation and rotation
385
            trans_axes = data_container.atoms.principal_axes()
11✔
386
            rot_axes = data_container.atoms.principal_axes()
11✔
387

388
        elif level == "residue":
11✔
389
            # Translation
390
            # for residues use principal axes of whole molecule for translation
391
            trans_axes = data_container.atoms.principal_axes()
11✔
392

393
            # Rotation
394
            # find bonds between atoms in residue of interest and other residues
395
            # we are assuming bonds only exist between adjacent residues
396
            # (linear chains of residues)
397
            # TODO refine selection so that it will work for branched polymers
398
            index_prev = index - 1
11✔
399
            index_next = index + 1
11✔
400
            atom_set = data_container.select_atoms(
11✔
401
                f"(resindex {index_prev} or resindex {index_next}) "
402
                f"and bonded resid {index}"
403
            )
404
            residue = data_container.select_atoms(f"resindex {index}")
11✔
405

406
            if len(atom_set) == 0:
11✔
407
                # if no bonds to other residues use pricipal axes of residue
408
                rot_axes = residue.atoms.principal_axes()
11✔
409

410
            else:
411
                # set center of rotation to center of mass of the residue
412
                center = residue.atoms.center_of_mass()
11✔
413

414
                # get vector for average position of bonded atoms
415
                vector = self.get_avg_pos(atom_set, center)
11✔
416

417
                # use spherical coordinates function to get rotational axes
418
                rot_axes = self.get_sphCoord_axes(vector)
11✔
419

420
        elif level == "united_atom":
11✔
421
            # Translation
422
            # for united atoms use principal axes of residue for translation
423
            trans_axes = data_container.residues.principal_axes()
11✔
424

425
            # Rotation
426
            # for united atoms use heavy atoms bonded to the heavy atom
427
            atom_set = data_container.select_atoms(
11✔
428
                f"(prop mass > 1.1) and bonded index {index}"
429
            )
430

431
            if len(atom_set) == 0:
11✔
432
                # if no bonds to other residues use pricipal axes of residue
433
                rot_axes = data_container.residues.principal_axes()
11✔
434
            else:
435
                # center at position of heavy atom
436
                atom_group = data_container.select_atoms(f"index {index}")
11✔
437
                center = atom_group.positions[0]
11✔
438

439
                # get vector for average position of bonded atoms
440
                vector = self.get_avg_pos(atom_set, center)
11✔
441

442
                # use spherical coordinates function to get rotational axes
443
                rot_axes = self.get_sphCoord_axes(vector)
11✔
444

445
        logger.debug(f"Translational Axes: {trans_axes}")
11✔
446
        logger.debug(f"Rotational Axes: {rot_axes}")
11✔
447

448
        return trans_axes, rot_axes
11✔
449

450
    def get_avg_pos(self, atom_set, center):
11✔
451
        """
452
        Function to get the average position of a set of atoms.
453

454
        Args:
455
            atom_set : MDAnalysis atom group
456
            center : position for center of rotation
457

458
        Returns:
459
            avg_position : three dimensional vector
460
        """
461
        # start with an empty vector
462
        avg_position = np.zeros((3))
11✔
463

464
        # get number of atoms
465
        number_atoms = len(atom_set.names)
11✔
466

467
        if number_atoms != 0:
11✔
468
            # sum positions for all atoms in the given set
469
            for atom_index in range(number_atoms):
11✔
470
                atom_position = atom_set.atoms[atom_index].position
11✔
471

472
                avg_position += atom_position
11✔
473

474
            avg_position /= number_atoms  # divide by number of atoms to get average
11✔
475

476
        else:
477
            # if no atoms in set the unit has no bonds to restrict its rotational
478
            # motion, so we can use a random vector to get spherical
479
            # coordinate axes
480
            avg_position = np.random.random(3)
11✔
481

482
        # transform the average position to a coordinate system with the origin
483
        # at center
484
        avg_position = avg_position - center
11✔
485

486
        logger.debug(f"Average Position: {avg_position}")
11✔
487

488
        return avg_position
11✔
489

490
    def get_sphCoord_axes(self, arg_r):
11✔
491
        """
492
        For a given vector in space, treat it is a radial vector rooted at
493
        0,0,0 and derive a curvilinear coordinate system according to the
494
        rules of polar spherical coordinates
495

496
        Args:
497
            arg_r: 3 dimensional vector
498

499
        Returns:
500
            spherical_basis: axes set (3 vectors)
501
        """
502

503
        x2y2 = arg_r[0] ** 2 + arg_r[1] ** 2
11✔
504
        r2 = x2y2 + arg_r[2] ** 2
11✔
505

506
        # Check for division by zero
507
        if r2 == 0.0:
11✔
508
            raise ValueError("r2 is zero, cannot compute spherical coordinates.")
11✔
509

510
        if x2y2 == 0.0:
11✔
511
            raise ValueError("x2y2 is zero, cannot compute sin_phi and cos_phi.")
11✔
512

513
        # These conditions are mathematically unreachable for real-valued vectors.
514
        # Marked as no cover to avoid false negatives in coverage reports.
515

516
        # Check for non-negative values inside the square root
517
        if x2y2 / r2 < 0:  # pragma: no cover
518
            raise ValueError(
519
                f"Negative value encountered for sin_theta calculation: {x2y2 / r2}. "
520
                f"Cannot take square root."
521
            )
522

523
        if x2y2 < 0:  # pragma: no cover
524
            raise ValueError(
525
                f"Negative value encountered for sin_phi and cos_phi "
526
                f"calculation: {x2y2}. "
527
                f"Cannot take square root."
528
            )
529

530
        if x2y2 != 0.0:
11✔
531
            sin_theta = np.sqrt(x2y2 / r2)
11✔
532
            cos_theta = arg_r[2] / np.sqrt(r2)
11✔
533

534
            sin_phi = arg_r[1] / np.sqrt(x2y2)
11✔
535
            cos_phi = arg_r[0] / np.sqrt(x2y2)
11✔
536

537
        else:  # pragma: no cover
538
            sin_theta = 0.0
539
            cos_theta = 1
540

541
            sin_phi = 0.0
542
            cos_phi = 1
543

544
        # if abs(sin_theta) > 1 or abs(sin_phi) > 1:
545
        #     print('Bad sine : T {} , P {}'.format(sin_theta, sin_phi))
546

547
        # cos_theta = np.sqrt(1 - sin_theta*sin_theta)
548
        # cos_phi = np.sqrt(1 - sin_phi*sin_phi)
549

550
        # print('{} {} {}'.format(*arg_r))
551
        # print('Sin T : {}, cos T : {}'.format(sin_theta, cos_theta))
552
        # print('Sin P : {}, cos P : {}'.format(sin_phi, cos_phi))
553

554
        spherical_basis = np.zeros((3, 3))
11✔
555

556
        # r^
557
        spherical_basis[0, :] = np.asarray(
11✔
558
            [sin_theta * cos_phi, sin_theta * sin_phi, cos_theta]
559
        )
560

561
        # Theta^
562
        spherical_basis[1, :] = np.asarray(
11✔
563
            [cos_theta * cos_phi, cos_theta * sin_phi, -sin_theta]
564
        )
565

566
        # Phi^
567
        spherical_basis[2, :] = np.asarray([-sin_phi, cos_phi, 0.0])
11✔
568

569
        logger.debug(f"Spherical Basis: {spherical_basis}")
11✔
570

571
        return spherical_basis
11✔
572

573
    def get_weighted_forces(
11✔
574
        self, data_container, bead, trans_axes, highest_level, force_partitioning=0.5
575
    ):
576
        """
577
        Function to calculate the mass weighted forces for a given bead.
578

579
        Args:
580
           data_container (MDAnalysis.Universe): Contains atomic positions and forces.
581
           bead : The part of the molecule to be considered.
582
           trans_axes (np.ndarray): The axes relative to which the forces are located.
583
           highest_level (bool): Is this the largest level of the length scale hierarchy
584
           force_partitioning (float): Factor to adjust force contributions to avoid
585
           over counting correlated forces, default is 0.5.
586

587
        Returns:
588
            weighted_force (np.ndarray): The mass-weighted sum of the forces in the
589
            bead.
590
        """
591

592
        forces_trans = np.zeros((3,))
11✔
593

594
        # Sum forces from all atoms in the bead
595
        for atom in bead.atoms:
11✔
596
            # update local forces in translational axes
597
            forces_local = np.matmul(trans_axes, data_container.atoms[atom.index].force)
11✔
598
            forces_trans += forces_local
11✔
599

600
        if highest_level:
11✔
601
            # multiply by the force_partitioning parameter to avoid double counting
602
            # of the forces on weakly correlated atoms
603
            # the default value of force_partitioning is 0.5 (dividing by two)
604
            forces_trans = force_partitioning * forces_trans
11✔
605

606
        # divide the sum of forces by the mass of the bead to get the weighted forces
607
        mass = bead.total_mass()
11✔
608

609
        # Check that mass is positive to avoid division by 0 or negative values inside
610
        # sqrt
611
        if mass <= 0:
11✔
612
            raise ValueError(
11✔
613
                f"Invalid mass value: {mass}. Mass must be positive to compute the "
614
                f"square root."
615
            )
616

617
        weighted_force = forces_trans / np.sqrt(mass)
11✔
618

619
        logger.debug(f"Weighted Force: {weighted_force}")
11✔
620

621
        return weighted_force
11✔
622

623
    def get_weighted_torques(
11✔
624
        self, data_container, bead, rot_axes, force_partitioning=0.5
625
    ):
626
        """
627
        Function to calculate the moment of inertia weighted torques for a given bead.
628

629
        This function computes torques in a rotated frame and then weights them using
630
        the moment of inertia tensor. To prevent numerical instability, it treats
631
        extremely small diagonal elements of the moment of inertia tensor as zero
632
        (since values below machine precision are effectively zero). This avoids
633
        unnecessary use of extended precision (e.g., float128).
634

635
        Additionally, if the computed torque is already zero, the function skips
636
        the division step, reducing unnecessary computations and potential errors.
637

638
        Parameters
639
        ----------
640
        data_container : object
641
            Contains atomic positions and forces.
642
        bead : object
643
            The part of the molecule to be considered.
644
        rot_axes : np.ndarray
645
            The axes relative to which the forces and coordinates are located.
646
        force_partitioning : float, optional
647
            Factor to adjust force contributions, default is 0.5.
648

649
        Returns
650
        -------
651
        weighted_torque : np.ndarray
652
            The mass-weighted sum of the torques in the bead.
653
        """
654

655
        torques = np.zeros((3,))
11✔
656
        weighted_torque = np.zeros((3,))
11✔
657

658
        for atom in bead.atoms:
11✔
659

660
            # update local coordinates in rotational axes
661
            coords_rot = (
11✔
662
                data_container.atoms[atom.index].position - bead.center_of_mass()
663
            )
664
            coords_rot = np.matmul(rot_axes, coords_rot)
11✔
665
            # update local forces in rotational frame
666
            forces_rot = np.matmul(rot_axes, data_container.atoms[atom.index].force)
11✔
667

668
            # multiply by the force_partitioning parameter to avoid double counting
669
            # of the forces on weakly correlated atoms
670
            # the default value of force_partitioning is 0.5 (dividing by two)
671
            forces_rot = force_partitioning * forces_rot
11✔
672

673
            # define torques (cross product of coordinates and forces) in rotational
674
            # axes
675
            torques_local = np.cross(coords_rot, forces_rot)
11✔
676
            torques += torques_local
11✔
677

678
        # divide by moment of inertia to get weighted torques
679
        # moment of inertia is a 3x3 tensor
680
        # the weighting is done in each dimension (x,y,z) using the diagonal
681
        # elements of the moment of inertia tensor
682
        moment_of_inertia = bead.moment_of_inertia()
11✔
683

684
        for dimension in range(3):
11✔
685
            # Skip calculation if torque is already zero
686
            if np.isclose(torques[dimension], 0):
11✔
687
                weighted_torque[dimension] = 0
11✔
688
                continue
11✔
689

690
            # Check for zero moment of inertia
691
            if np.isclose(moment_of_inertia[dimension, dimension], 0):
11✔
692
                raise ZeroDivisionError(
11✔
693
                    f"Attempted to divide by zero moment of inertia in dimension "
694
                    f"{dimension}."
695
                )
696

697
            # Check for negative moment of inertia
698
            if moment_of_inertia[dimension, dimension] < 0:
11✔
699
                raise ValueError(
11✔
700
                    f"Negative value encountered for moment of inertia: "
701
                    f"{moment_of_inertia[dimension, dimension]} "
702
                    f"Cannot compute weighted torque."
703
                )
704

705
            # Compute weighted torque
706
            weighted_torque[dimension] = torques[dimension] / np.sqrt(
11✔
707
                moment_of_inertia[dimension, dimension]
708
            )
709

710
        logger.debug(f"Weighted Torque: {weighted_torque}")
11✔
711

712
        return weighted_torque
11✔
713

714
    def create_submatrix(self, data_i, data_j):
11✔
715
        """
716
        Function for making covariance matrices.
717

718
        Args
719
        -----
720
        data_i : values for bead i
721
        data_j : values for bead j
722

723
        Returns
724
        ------
725
        submatrix : 3x3 matrix for the covariance between i and j
726
        """
727

728
        # Start with 3 by 3 matrix of zeros
729
        submatrix = np.zeros((3, 3))
11✔
730

731
        # For each frame calculate the outer product (cross product) of the data from
732
        # the two beads and add the result to the submatrix
733
        outer_product_matrix = np.outer(data_i, data_j)
11✔
734
        submatrix = np.add(submatrix, outer_product_matrix)
11✔
735

736
        logger.debug(f"Submatrix: {submatrix}")
11✔
737

738
        return submatrix
11✔
739

740
    def build_covariance_matrices(
11✔
741
        self,
742
        entropy_manager,
743
        reduced_atom,
744
        levels,
745
        groups,
746
        start,
747
        end,
748
        step,
749
        number_frames,
750
    ):
751
        """
752
        Construct average force and torque covariance matrices for all molecules and
753
        entropy levels.
754

755
        Parameters
756
        ----------
757
        entropy_manager : EntropyManager
758
            Instance of the EntropyManager.
759
        reduced_atom : Universe
760
            The reduced atom selection.
761
        levels : dict
762
            Dictionary mapping molecule IDs to lists of entropy levels.
763
        groups : dict
764
            Dictionary mapping group IDs to lists of molecule IDs.
765
        start : int
766
            Start frame index.
767
        end : int
768
            End frame index.
769
        step : int
770
            Step size for frame iteration.
771
        number_frames : int
772
            Total number of frames to process.
773

774
        Returns
775
        -------
776
        tuple
777
            force_avg : dict
778
                Averaged force covariance matrices by entropy level.
779
            torque_avg : dict
780
                Averaged torque covariance matrices by entropy level.
781
        """
782
        number_groups = len(groups)
11✔
783

784
        force_avg = {
11✔
785
            "ua": {},
786
            "res": [None] * number_groups,
787
            "poly": [None] * number_groups,
788
        }
789
        torque_avg = {
11✔
790
            "ua": {},
791
            "res": [None] * number_groups,
792
            "poly": [None] * number_groups,
793
        }
794

795
        total_steps = len(reduced_atom.trajectory[start:end:step])
11✔
796
        total_items = (
11✔
797
            sum(len(levels[mol_id]) for mols in groups.values() for mol_id in mols)
798
            * total_steps
799
        )
800

801
        frame_counts = {
11✔
802
            "ua": {},
803
            "res": np.zeros(number_groups, dtype=int),
804
            "poly": np.zeros(number_groups, dtype=int),
805
        }
806

807
        with Progress(
11✔
808
            SpinnerColumn(),
809
            TextColumn("[bold blue]{task.fields[title]}", justify="right"),
810
            BarColumn(),
811
            TextColumn("[progress.percentage]{task.percentage:>3.1f}%"),
812
            TimeElapsedColumn(),
813
        ) as progress:
814

815
            task = progress.add_task(
11✔
816
                "[green]Processing...",
817
                total=total_items,
818
                title="Starting...",
819
            )
820

821
            indices = list(range(number_frames))
11✔
822
            for time_index, _ in zip(indices, reduced_atom.trajectory[start:end:step]):
11✔
823
                for group_id, molecules in groups.items():
11✔
824
                    for mol_id in molecules:
11✔
825
                        mol = entropy_manager._get_molecule_container(
11✔
826
                            reduced_atom, mol_id
827
                        )
828
                        for level in levels[mol_id]:
11✔
829
                            mol = entropy_manager._get_molecule_container(
11✔
830
                                reduced_atom, mol_id
831
                            )
832

833
                            resname = mol.atoms[0].resname
11✔
834
                            resid = mol.atoms[0].resid
11✔
835
                            segid = mol.atoms[0].segid
11✔
836

837
                            mol_label = f"{resname}_{resid} (segid {segid})"
11✔
838

839
                            progress.update(
11✔
840
                                task,
841
                                title=f"Building covariance matrices | "
842
                                f"Timestep {time_index} | "
843
                                f"Molecule: {mol_label} | "
844
                                f"Level: {level}",
845
                            )
846

847
                            self.update_force_torque_matrices(
11✔
848
                                entropy_manager,
849
                                mol,
850
                                group_id,
851
                                level,
852
                                levels[mol_id],
853
                                time_index,
854
                                number_frames,
855
                                force_avg,
856
                                torque_avg,
857
                                frame_counts,
858
                            )
859

860
                            progress.advance(task)
11✔
861

862
        return force_avg, torque_avg, frame_counts
11✔
863

864
    def update_force_torque_matrices(
11✔
865
        self,
866
        entropy_manager,
867
        mol,
868
        group_id,
869
        level,
870
        level_list,
871
        time_index,
872
        num_frames,
873
        force_avg,
874
        torque_avg,
875
        frame_counts,
876
    ):
877
        """
878
        Update the running averages of force and torque covariance matrices
879
        for a given molecule and entropy level.
880

881
        This function computes the force and torque covariance matrices for the
882
        current frame and updates the existing averages in-place using the incremental
883
        mean formula:
884

885
            new_avg = old_avg + (value - old_avg) / n
886

887
        where n is the number of frames processed so far for that molecule/level
888
        combination. This ensures that the averages are maintained without storing
889
        all previous frame data.
890

891
        Parameters
892
        ----------
893
        entropy_manager : EntropyManager
894
            Instance of the EntropyManager.
895
        mol : AtomGroup
896
            The molecule to process.
897
        group_id : int
898
            Index of the group to which the molecule belongs.
899
        level : str
900
            Current entropy level ("united_atom", "residue", or "polymer").
901
        level_list : list
902
            List of entropy levels for the molecule.
903
        time_index : int
904
            Index of the current frame relative to the start of the trajectory slice.
905
        num_frames : int
906
            Total number of frames to process.
907
        force_avg : dict
908
            Dictionary holding the running average force matrices, keyed by entropy
909
            level.
910
        torque_avg : dict
911
            Dictionary holding the running average torque matrices, keyed by entropy
912
            level.
913
        frame_counts : dict
914
            Dictionary holding the count of frames processed for each molecule/level
915
            combination.
916

917
        Returns
918
        -------
919
        None
920
            Updates are performed in-place on `force_avg`, `torque_avg`, and
921
            `frame_counts`.
922
        """
923
        highest = level == level_list[-1]
11✔
924

925
        # United atom level calculations are done separately for each residue
926
        # This allows information per residue to be output and keeps the
927
        # matrices from becoming too large
928
        if level == "united_atom":
11✔
929
            for res_id, residue in enumerate(mol.residues):
11✔
930
                key = (group_id, res_id)
11✔
931
                res = entropy_manager._run_manager.new_U_select_atom(
11✔
932
                    mol, f"index {residue.atoms.indices[0]}:{residue.atoms.indices[-1]}"
933
                )
934

935
                # This is to get MDAnalysis to get the information from the
936
                # correct frame of the trajectory
937
                res.trajectory[time_index]
11✔
938

939
                # Build the matrices, adding data from each timestep
940
                # Being careful for the first timestep when data has not yet
941
                # been added to the matrices
942
                f_mat, t_mat = self.get_matrices(
11✔
943
                    res,
944
                    level,
945
                    num_frames,
946
                    highest,
947
                    None if key not in force_avg["ua"] else force_avg["ua"][key],
948
                    None if key not in torque_avg["ua"] else torque_avg["ua"][key],
949
                )
950

951
                if key not in force_avg["ua"]:
11✔
952
                    force_avg["ua"][key] = f_mat.copy()
11✔
953
                    torque_avg["ua"][key] = t_mat.copy()
11✔
954
                    frame_counts["ua"][key] = 1
11✔
955
                else:
956
                    frame_counts["ua"][key] += 1
×
957
                    n = frame_counts["ua"][key]
×
958
                    force_avg["ua"][key] += (f_mat - force_avg["ua"][key]) / n
×
959
                    torque_avg["ua"][key] += (t_mat - torque_avg["ua"][key]) / n
×
960

961
        elif level in ["residue", "polymer"]:
11✔
962
            # This is to get MDAnalysis to get the information from the
963
            # correct frame of the trajectory
964
            mol.trajectory[time_index]
11✔
965

966
            key = "res" if level == "residue" else "poly"
11✔
967

968
            # Build the matrices, adding data from each timestep
969
            # Being careful for the first timestep when data has not yet
970
            # been added to the matrices
971
            f_mat, t_mat = self.get_matrices(
11✔
972
                mol,
973
                level,
974
                num_frames,
975
                highest,
976
                None if force_avg[key][group_id] is None else force_avg[key][group_id],
977
                (
978
                    None
979
                    if torque_avg[key][group_id] is None
980
                    else torque_avg[key][group_id]
981
                ),
982
            )
983

984
            if force_avg[key][group_id] is None:
11✔
985
                force_avg[key][group_id] = f_mat.copy()
11✔
986
                torque_avg[key][group_id] = t_mat.copy()
11✔
987
                frame_counts[key][group_id] = 1
11✔
988
            else:
989
                frame_counts[key][group_id] += 1
11✔
990
                n = frame_counts[key][group_id]
11✔
991
                force_avg[key][group_id] += (f_mat - force_avg[key][group_id]) / n
11✔
992
                torque_avg[key][group_id] += (t_mat - torque_avg[key][group_id]) / n
11✔
993

994
        return frame_counts
11✔
995

996
    def filter_zero_rows_columns(self, arg_matrix):
11✔
997
        """
998
        function for removing rows and columns that contain only zeros from a matrix
999

1000
        Args:
1001
            arg_matrix : matrix
1002

1003
        Returns:
1004
            arg_matrix : the reduced size matrix
1005
        """
1006

1007
        # record the initial size
1008
        init_shape = np.shape(arg_matrix)
11✔
1009

1010
        zero_indices = list(
11✔
1011
            filter(
1012
                lambda row: np.all(np.isclose(arg_matrix[row, :], 0.0)),
1013
                np.arange(np.shape(arg_matrix)[0]),
1014
            )
1015
        )
1016
        all_indices = np.ones((np.shape(arg_matrix)[0]), dtype=bool)
11✔
1017
        all_indices[zero_indices] = False
11✔
1018
        arg_matrix = arg_matrix[all_indices, :]
11✔
1019

1020
        all_indices = np.ones((np.shape(arg_matrix)[1]), dtype=bool)
11✔
1021
        zero_indices = list(
11✔
1022
            filter(
1023
                lambda col: np.all(np.isclose(arg_matrix[:, col], 0.0)),
1024
                np.arange(np.shape(arg_matrix)[1]),
1025
            )
1026
        )
1027
        all_indices[zero_indices] = False
11✔
1028
        arg_matrix = arg_matrix[:, all_indices]
11✔
1029

1030
        # get the final shape
1031
        final_shape = np.shape(arg_matrix)
11✔
1032

1033
        if init_shape != final_shape:
11✔
1034
            logger.debug(
11✔
1035
                "A shape change has occurred ({},{}) -> ({}, {})".format(
1036
                    *init_shape, *final_shape
1037
                )
1038
            )
1039

1040
        logger.debug(f"arg_matrix: {arg_matrix}")
11✔
1041

1042
        return arg_matrix
11✔
1043

1044
    def build_conformational_states(
11✔
1045
        self,
1046
        entropy_manager,
1047
        reduced_atom,
1048
        levels,
1049
        groups,
1050
        start,
1051
        end,
1052
        step,
1053
        number_frames,
1054
        bin_width,
1055
        ce,
1056
    ):
1057
        """
1058
        Construct the conformational states for each molecule at
1059
        relevant levels.
1060

1061
        Parameters:
1062
            entropy_manager (EntropyManager): Instance of the EntropyManager
1063
            reduced_atom (Universe): The reduced atom selection.
1064
            levels (list): List of entropy levels per molecule.
1065
            groups (dict): Groups for averaging over molecules.
1066
            start (int): Start frame index.
1067
            end (int): End frame index.
1068
            step (int): Step size for frame iteration.
1069
            number_frames (int): Total number of frames to process.
1070
            bin_width (int): Width of histogram bins.
1071
            ce: Conformational Entropy object
1072

1073
        Returns:
1074
            tuple: A tuple containing:
1075
                - states_ua (dict): Conformational states at the united-atom level.
1076
                - states_res (list): Conformational states at the residue level.
1077
        """
1078
        number_groups = len(groups)
11✔
1079
        states_ua = {}
11✔
1080
        states_res = [None] * number_groups
11✔
1081

1082
        total_items = sum(
11✔
1083
            len(levels[mol_id]) for mols in groups.values() for mol_id in mols
1084
        )
1085

1086
        with Progress(
11✔
1087
            SpinnerColumn(),
1088
            TextColumn("[bold blue]{task.fields[title]}", justify="right"),
1089
            BarColumn(),
1090
            TextColumn("[progress.percentage]{task.percentage:>3.1f}%"),
1091
            TimeElapsedColumn(),
1092
        ) as progress:
1093

1094
            task = progress.add_task(
11✔
1095
                "[green]Building Conformational States...",
1096
                total=total_items,
1097
                title="Starting...",
1098
            )
1099

1100
            for group_id in groups.keys():
11✔
1101
                molecules = groups[group_id]
11✔
1102
                for mol_id in molecules:
11✔
1103
                    mol = entropy_manager._get_molecule_container(reduced_atom, mol_id)
11✔
1104

1105
                    resname = mol.atoms[0].resname
11✔
1106
                    resid = mol.atoms[0].resid
11✔
1107
                    segid = mol.atoms[0].segid
11✔
1108

1109
                    mol_label = f"{resname}_{resid} (segid {segid})"
11✔
1110

1111
                    for level in levels[mol_id]:
11✔
1112
                        progress.update(
11✔
1113
                            task,
1114
                            title=f"Building conformational states | "
1115
                            f"Molecule: {mol_label} | "
1116
                            f"Level: {level}",
1117
                        )
1118

1119
                        if level == "united_atom":
11✔
1120
                            for res_id, residue in enumerate(mol.residues):
11✔
1121
                                key = (group_id, res_id)
11✔
1122

1123
                                res_container = (
11✔
1124
                                    entropy_manager._run_manager.new_U_select_atom(
1125
                                        mol,
1126
                                        f"index {residue.atoms.indices[0]}:"
1127
                                        f"{residue.atoms.indices[-1]}",
1128
                                    )
1129
                                )
1130
                                heavy_res = (
11✔
1131
                                    entropy_manager._run_manager.new_U_select_atom(
1132
                                        res_container, "prop mass > 1.1"
1133
                                    )
1134
                                )
1135
                                states = self.compute_dihedral_conformations(
11✔
1136
                                    heavy_res,
1137
                                    level,
1138
                                    number_frames,
1139
                                    bin_width,
1140
                                    start,
1141
                                    end,
1142
                                    step,
1143
                                    ce,
1144
                                )
1145

1146
                                if key in states_ua:
11✔
1147
                                    states_ua[key].extend(states)
11✔
1148
                                else:
1149
                                    states_ua[key] = states
11✔
1150

1151
                        elif level == "residue":
11✔
1152
                            states = self.compute_dihedral_conformations(
11✔
1153
                                mol,
1154
                                level,
1155
                                number_frames,
1156
                                bin_width,
1157
                                start,
1158
                                end,
1159
                                step,
1160
                                ce,
1161
                            )
1162

1163
                            if states_res[group_id] is None:
11✔
1164
                                states_res[group_id] = states
11✔
1165
                            else:
1166
                                states_res[group_id].extend(states)
11✔
1167

1168
                        progress.advance(task)
11✔
1169

1170
        logger.debug(f"states_ua {states_ua}")
11✔
1171
        logger.debug(f"states_res {states_res}")
11✔
1172

1173
        return states_ua, states_res
11✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc