Skip to content

Commit

Permalink
Merge pull request #1824 from devitocodes/sparse-coord-order
Browse files Browse the repository at this point in the history
mpi: Fix mask ordering for sparse gather
  • Loading branch information
mloubout committed Feb 7, 2022
2 parents 2f039b9 + 7bb6ff5 commit 543b7be
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 44 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pytest-core-mpi.yml
Expand Up @@ -41,6 +41,7 @@ jobs:
- name: Test with pytest
run: |
python3 scripts/clear_devito_cache.py
python3 -m pytest --cov --cov-config=.coveragerc --cov-report=xml -m parallel tests/
- name: Upload coverage to Codecov
Expand Down
49 changes: 6 additions & 43 deletions devito/types/sparse.py
Expand Up @@ -151,37 +151,6 @@ def _dist_scatter_mask(self, dmap=None):
ret[self._sparse_position] = mask
return tuple(ret)

def _dist_subfunc_scatter_mask(self, dmap=None):
"""
This method is analogous to :meth:`_dist_scatter_mask`, although
the mask is now suitable to index into self's SubFunctions, rather
than into ``self.data``.
"""
return self._dist_scatter_mask(dmap=dmap)[self._sparse_position]

def _dist_gather_mask(self, dmap=None):
"""
A mask to index into the ``data`` received upon returning from
``self._dist_alltoall``. This mask creates a new data array in which
duplicate sparse data values have been discarded. The resulting data
array can thus be used to populate ``self.data``.
"""
ret = list(self._dist_scatter_mask(dmap=dmap))
mask = ret[self._sparse_position]
inds = np.unique(mask, return_index=True)[1]
inds.sort()
ret[self._sparse_position] = inds.tolist()

return tuple(ret)

def _dist_subfunc_gather_mask(self, dmap=None):
"""
This method is analogous to :meth:`_dist_subfunc_scatter_mask`, although
the mask is now suitable to index into self's SubFunctions, rather
than into ``self.data``.
"""
return self._dist_gather_mask(dmap=dmap)[self._sparse_position]

def _dist_count(self, dmap=None):
"""
A 2-tuple of comm-sized iterables, which tells how many sparse points
Expand Down Expand Up @@ -261,13 +230,6 @@ def _dist_scatter(self):
"""
raise NotImplementedError

def _dist_gather(self, data):
"""
A ``numpy.ndarray`` containing up-to-date data and coordinate values
suitable for insertion into ``self.data``.
"""
raise NotImplementedError

def _arg_defaults(self, alias=None):
key = alias or self
mapper = {self: key}
Expand Down Expand Up @@ -698,9 +660,10 @@ def _dist_scatter(self, data=None):

# Compute dist map only once
dmap = self._dist_datamap
mask = self._dist_scatter_mask(dmap=dmap)

# Pack sparse data values so that they can be sent out via an Alltoallv
data = data[self._dist_scatter_mask(dmap=dmap)]
data = data[mask]
data = np.ascontiguousarray(np.transpose(data, self._dist_reorder_mask))

# Send out the sparse point values
Expand All @@ -714,7 +677,7 @@ def _dist_scatter(self, data=None):
data = np.ascontiguousarray(np.transpose(data, self._dist_reorder_mask))

# Pack (reordered) coordinates so that they can be sent out via an Alltoallv
coords = self.coordinates.data._local[self._dist_subfunc_scatter_mask(dmap=dmap)]
coords = self.coordinates.data._local[mask[self._sparse_position]]

# Send out the sparse point coordinates
_, scount, sdisp, rshape, rcount, rdisp = self._dist_subfunc_alltoall(dmap=dmap)
Expand All @@ -739,6 +702,7 @@ def _dist_gather(self, data, coords):

# Compute dist map only once
dmap = self._dist_datamap
mask = self._dist_scatter_mask(dmap=dmap)

# Pack sparse data values so that they can be sent out via an Alltoallv
data = np.ascontiguousarray(np.transpose(data, self._dist_reorder_mask))
Expand All @@ -750,7 +714,7 @@ def _dist_gather(self, data, coords):
[gathered, scount, sdisp, mpitype])
# Unpack data values so that they follow the expected storage layout
gathered = np.ascontiguousarray(np.transpose(gathered, self._dist_reorder_mask))
self._data[:] = gathered[self._dist_gather_mask(dmap=dmap)]
self._data[mask] = gathered[:]

if coords is not None:
# Pack (reordered) coordinates so that they can be sent out via an Alltoallv
Expand All @@ -762,8 +726,7 @@ def _dist_gather(self, data, coords):
mpitype = MPI._typedict[np.dtype(self.coordinates.dtype).char]
comm.Alltoallv([coords, rcount, rdisp, mpitype],
[gathered, scount, sdisp, mpitype])
self._coordinates.data._local[:] = \
gathered[self._dist_subfunc_gather_mask(dmap=dmap)]
self._coordinates.data._local[mask[self._sparse_position]] = gathered[:]

# Note: this method "mirrors" `_dist_scatter`: a sparse point that is sent
# in `_dist_scatter` is here received; a sparse point that is received in
Expand Down
3 changes: 2 additions & 1 deletion tests/test_gpu_common.py
Expand Up @@ -20,8 +20,9 @@ class TestGPUInfo(object):

def test_get_gpu_info(self):
info = get_gpu_info()
known = ['nvidia', 'tesla', 'geforce', 'unspecified']
try:
assert info['architecture'].lower() in ['tesla', 'geforce', 'unspecified']
assert info['architecture'].lower() in known
except KeyError:
# There might be than one GPUs, but for now we don't care
# as we're not really exploiting this info yet...
Expand Down
16 changes: 16 additions & 0 deletions tests/test_mpi.py
Expand Up @@ -478,6 +478,22 @@ def test_sparse_coords(self):
coords_loc += sf.coordinates.data[i, 0]
assert sf.data[i] == coords_loc

@pytest.mark.parallel(mode=4)
def test_sparse_coords_issue1823(self):
grid = Grid((101, 101, 101), extent=(1000, 1000, 1000))
coords = np.array([[1000., 0., 900.], [1000., 300., 700.],
[1000., 500., 500.], [1000., 700., 300.],
[1000., 900., 0.], [1000., 0., 850.]])
rec = SparseTimeFunction(name="s", grid=grid, coordinates=coords,
nt=10, npoint=6)
ref = SparseTimeFunction(name="s1", grid=grid, coordinates=coords,
nt=10, npoint=6)
u = TimeFunction(name="u", grid=grid, space_order=1)

Operator([Eq(u, u+1)]+rec.interpolate(u))()

assert np.allclose(rec.coordinates.data[:], ref.coordinates.data)


class TestOperatorSimple(object):

Expand Down

0 comments on commit 543b7be

Please sign in to comment.