Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize select_atoms by coordinates #134

Open
cecilpert opened this issue Aug 4, 2020 · 7 comments
Open

Parallelize select_atoms by coordinates #134

cecilpert opened this issue Aug 4, 2020 · 7 comments

Comments

@cecilpert
Copy link

Hello,

To simplify my problem, I want to select atoms based on coordinates through all frames. I tried to do this with pmda but it doesn't work with n_jobs > 1 and it gives results for n_jobs = 1 but atoms positions aren't the expected ones.

Expected behaviour

The expected behaviour is this one, provided by the serial implementation.

import MDAnalysis as mda

def get_atoms_below10(atomgroup):
    return atomgroup.select_atoms(f"prop x >= 0 and prop x < 10")

u = mda.Universe(top, trj)
membrane_atoms = u.select_atoms("resname DOPC") 
atom_groups_through_frames = [get_atoms_below10(membrane_atoms) for trj in u.trajectory[:100]]
print(atom_groups_through_frames[0][0].position)
print(atom_groups_through_frames[55][8].position)

Gives me this :

[  4.1     172.29999 180.29999]
[  3.9999998  86.6       195.8      ]

Actual behaviour

Here is the code I tested with pmda

python
import MDAnalysis as mda
from pmda.custom import AnalysisFromFunction

def get_atoms_below10(atomgroup):
    return atomgroup.select_atoms(f"prop x >= 0 and prop x < 10")

parallel_groups = AnalysisFromFunction(get_atoms_below10, u, membrane_atoms).run(n_jobs = 4, start = 0, stop = 100)
atom_groups_through_frames = parallel_groups.results
print(atom_groups_through_frames[0][0].position)
print(atom_groups_through_frames[55][8].position)

I got the following error

RuntimeError: Couldn't find a suitable Universe to unpickle AtomGroup onto with Universe hash '1bea900d-48f2-417c-a727-e8801ff0611b'.  Available hashes: 52f98645-a531-4513-a596-072ef9b8a50a

If I try with n_jobs = 1 :

parallel_groups = AnalysisFromFunction(get_atoms_below10, u, membrane_atoms).run(n_jobs = 1, start = 0, stop = 100)
atom_groups_through_frames = parallel_groups.results
print(atom_groups_through_frames[0][0].position)
print(atom_groups_through_frames[55][8].position)

I got results but not as expected :

[ 43.800003 170.8      223.4     ]
[  7.6 216.6 207.2]

I noticed that if we change the number of blocks, results change too. I think all atoms at position i in the same block end with the same coordinates during _reduce step, or something like that.

Finally, I managed to do my parallel computation without pmda with python multiprocessing.Pool, by slicing the trajectories into n chunks, with one copy of the universe per chunk.

Currently version of MDAnalysis:

mda version : 1.0.0
pmda version : 0.3.0+17.g13fa3b5
dask version : 2.21.0

@orbeckst
Copy link
Member

orbeckst commented Aug 4, 2020

Would you mind doing your test with the included test files

from MDAnalysis.tests import datafiles as data
u = mda.Universe(data.GRO_MEMPROT, data.XTC_MEMPROT)

so that we can easily check locally and also write a proper test? Thank you!

@cecilpert
Copy link
Author

Of course, it gives me this :

Serial implementation

u = mda.Universe(data.GRO_MEMPROT, data.XTC_MEMPROT)
atom_groups_through_frames = [get_atoms_below10(u.atoms) for trj in u.trajectory]
print(atom_groups_through_frames[0][0].position)
print(atom_groups_through_frames[4][5].position)
>> [ 9.83     48.320004 97.73001 ]
>> [13.18 46.05 48.47]

PMDA implementation

With n_jobs > 1, I still have the RuntimeError about Universe hashes.
With n_jobs = 1, it gives different results for n_blocks = 1 and n_blocks = 2 and errors for n_blocks from 3 to 5.

parallel_groups = AnalysisFromFunction(get_atoms_below10, u, u.atoms).run(n_blocks = 1, n_jobs = 1)
atom_groups_through_frames = parallel_groups.results
print(atom_groups_through_frames[0][0].position)
print(atom_groups_through_frames[4][5].position)
>> [16.64 49.6  88.55]
>>[ 8.56     40.610004 48.300003]
parallel_groups = AnalysisFromFunction(get_atoms_below10, u, u.atoms).run(n_blocks = 2, n_jobs = 1)
atom_groups_through_frames = parallel_groups.results
print(atom_groups_through_frames[0][0].position)
print(atom_groups_through_frames[4][5].position)
>> [16.760002 50.43     89.3     ]
>> [ 8.56     40.610004 48.300003]

For n_blocks = 3 and n_blocks = 4, I have the following error :

ValueError: all the input arrays must have the same number of dimensions, but the array at index 0 has 1 dimension(s) and the array at index 1 has 2 dimension(s)

And for n_blocks = 5, this one :

ValueError: could not broadcast input array from shape (4169) into shape (1)

The first error with n_blocks doesn't occur if I use the NewAnalysis class instead of AnalysisFromFunction with _conclude redefinition. I also check the position values during _single_frame computation :

class NewAnalysis(ParallelAnalysisBase):
    def __init__(self, universe, atomgroup):
        self._ag = atomgroup
        super(NewAnalysis, self).__init__(universe,
                                          self._ag)

    def _single_frame(self, ts, agroups):
        print(f"= single frame {ts.frame}")
        selection = agroups[0].select_atoms(f"prop x >= 0 and prop x < 10")
        print(selection[0].position)
        return agroups[0].select_atoms(f"prop x >= 0 and prop x < 10")

    def _conclude(self):
        self.results = [frame for block in self._results for frame in block]

na = NewAnalysis(u, [u.atoms]).run(n_jobs=1, n_blocks = 3)
atom_groups_through_frames = na.results
for i in range(5):
    print(f"= final frame {i}")
    print(atom_groups_through_frames[i][0].position)

Returns :

= single frame 0
[ 9.83     48.320004 97.73001 ]
= single frame 1
[  8.880001  61.000004 104.83001 ]
= single frame 4
[ 9.530001 39.53     45.350002]
= single frame 2
[ 9.900001 31.280003 63.800003]
= single frame 3
[ 9.710001 30.670002 57.300003]
= final frame 0
[23.990002  9.68     60.36    ]
= final frame 1
[23.990002  9.68     60.36    ]
= final frame 2
[17.09     18.810001 58.74    ]
= final frame 3
[17.09     18.810001 58.74    ]
= final frame 4
[23.160002 18.140001 53.93    ]

This behavior seems to appear as soon as _single_frame returns AtomGroup.

@cecilpert
Copy link
Author

For the "wrong position" error with n_jobs = 1, a solution for my specific problem is to return just atoms positions instead of all AtomGroup. I got the same kind of problem in my own pipeline with Pool, with AtomGroup saved inside objects and wrong positions when reading at the end. I guess it's something related with universe iteration and "not be at the good frame" when reading. Since I just need atoms positions I didn't check further.

@orbeckst
Copy link
Member

Thank you for the detailed error report.

I agree that something is not right. I don't immediately see what the problem is, though. I currently don't have time to dig into this. Help is welcome.

@yuxuanzhuang
Copy link
Contributor

yuxuanzhuang commented Aug 24, 2020

There's an another very interesting question (aside from how PMDA does wrong), which I guess we should make it clearer--AtomGroup is always attached to a Universe, it will change state with that Universe. So for the serial implementation here:

u = mda.Universe(data.GRO_MEMPROT, data.XTC_MEMPROT)
atom_groups_through_frames = [get_atoms_below10(u.atoms) for trj in u.trajectory]  #  the u.trajectory.ts will be reset to 0 after iteration.
print(atom_groups_through_frames[0][0].position)
print(atom_groups_through_frames[4][5].position)
>> [ 9.83     48.320004 97.73001 ]
>> [13.18 46.05 48.47]  #  not correct. It prints the position of the atom[5] in timestep[**0**].

The AtomGroup saved inside atom_groups_through_frames are all attached to u, which you can only get the "right" position by:

for i, trj in enumerate(u.trajectory):
    print(atom_groups_through_frames[i][0].position)

which I think is confusing. Maybe there's a better way to do it? I mean only returning the position is ofc one option.

EDIT:

Another interesting thing related to ValueError: could not broadcast input array from shape (4169) into shape (1) (n_blocks=5) is:

#  how `AtomGroup` behaves with `numpy` funcs:
>>> atomgroup_list = [u.atoms]
>>> atomgroup_array = np.asarray(atomgroup_list) #  such conversion will happen any numpy funcs.
>>> atomgroup_array
array([[<Atom 1: N of type N of resname TYR, resid 7 and segid SYSTEM>,
        <Atom 2: H1 of type H of resname TYR, resid 7 and segid SYSTEM>,
        <Atom 3: H2 of type H of resname TYR, resid 7 and segid SYSTEM>,
        ...,
        <Atom 43478: H16X of type H of resname POPG, resid 572 and segid SYSTEM>,
        <Atom 43479: H16Y of type H of resname POPG, resid 572 and segid SYSTEM>,
        <Atom 43480: H16Z of type H of resname POPG, resid 572 and segid SYSTEM>]],
      dtype=object)

I would expect it to be array([<AtomGroup with 43480 atoms>]), but it expands itself, which is related to how AtomGroup is structured with __getitem__ implemented.

@cecilpert
Copy link
Author

Thanks you're right I always forget this AtomGroup/Universe link. Here print(atom_groups_through_frames[4][5].position) give the position of atom at timestep 0 but it's the 5th atom of the selection made at timestep 4. For my usage, I only saved the coordinates, besides the need to iterate through trajectory the RAM usage is too high if all AtomGroup is kept.
But it doesn't change the problem for coordinates reading after PMDA computation, I guess that's because universe is copied for parallelization and even if we iterate through universe at the end, it will not be the same universe as those attached to AtomGroup ?

@LeiliZ
Copy link

LeiliZ commented Aug 6, 2021

As a temporary fix, I found my similar issue was resolved by rolling back to dask=2.9.2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants