Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

read, join, and write MS #272

Open
caseyjlaw opened this issue Dec 1, 2022 · 6 comments
Open

read, join, and write MS #272

caseyjlaw opened this issue Dec 1, 2022 · 6 comments

Comments

@caseyjlaw
Copy link

  • dask-ms version: 0.2.6
  • Python version: 3.8
  • Operating System: RHEL 8.6

Description

I'd like to read multiple MS files and write one that joins along spectral window. Each MS file represents a single integration with a different spectral window and data have identical structure in each MS.

What I Did

I tried to follow the suggestion in the docs, but I may have an issue with the MS.

> ds = xds_from_ms('20221129_034113_15MHz.ms:SPECTRAL_WINDOW', group_cols="__row__")
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-75-b6c8fc404ff8> in <module>
----> 1 ds = xds_from_ms('20221129_034113_15MHz.ms:SPECTRAL_WINDOW', group_cols="__row__")

~/.conda/envs/development/lib/python3.6/site-packages/daskms/dask_ms.py in xds_from_ms(ms, columns, index_cols, group_cols, **kwargs)
    397                           index_cols=index_cols,
    398                           group_cols=group_cols,
--> 399                           **kwargs)
    400 
    401 

~/.conda/envs/development/lib/python3.6/site-packages/daskms/dask_ms.py in xds_from_table(table_name, columns, index_cols, group_cols, **kwargs)
    324     dask_datasets = DatasetFactory(table_name, columns,
    325                                    group_cols, index_cols,
--> 326                                    **kwargs).datasets()
    327 
    328     # Return dask datasets if xarray is not available

~/.conda/envs/development/lib/python3.6/site-packages/daskms/reads.py in __init__(self, table, select_cols, group_cols, index_cols, **kwargs)
    279     def __init__(self, table, select_cols, group_cols, index_cols, **kwargs):
    280         if not table_exists(table):
--> 281             raise ValueError("'%s' does not appear to be a CASA Table" % table)
    282 
    283         chunks = kwargs.pop('chunks', [{'row': _DEFAULT_ROW_CHUNKS}])

ValueError: '20221129_034113_15MHz.ms:SPECTRAL_WINDOW' does not appear to be a CASA Table

However, the file can be read at some level:

> ds = xds_from_ms('20221129_034113_15MHz.ms', group_cols=["FIELD_ID", "DATA_DESC_ID"])
> ds
[<xarray.Dataset>
 Dimensions:         (chan: 192, corr: 4, flagcat: 1, row: 62128, uvw: 3)
 Coordinates:
     ROWID           (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
 Dimensions without coordinates: chan, corr, flagcat, row, uvw
 Data variables:
     ANTENNA1        (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
     ANTENNA2        (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
     ARRAY_ID        (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
     DATA            (row, chan, corr) complex64 dask.array<chunksize=(10000, 192, 4), meta=np.ndarray>
     EXPOSURE        (row) float64 dask.array<chunksize=(10000,), meta=np.ndarray>
     FEED1           (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
     FEED2           (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
     FLAG            (row, chan, corr) bool dask.array<chunksize=(10000, 192, 4), meta=np.ndarray>
     FLAG_CATEGORY   (row, flagcat, chan, corr) bool dask.array<chunksize=(10000, 1, 192, 4), meta=np.ndarray>
     FLAG_ROW        (row) bool dask.array<chunksize=(10000,), meta=np.ndarray>
     INTERVAL        (row) float64 dask.array<chunksize=(10000,), meta=np.ndarray>
     OBSERVATION_ID  (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
     PROCESSOR_ID    (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
     SCAN_NUMBER     (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
     SIGMA           (row, corr) float32 dask.array<chunksize=(10000, 4), meta=np.ndarray>
     STATE_ID        (row) int32 dask.array<chunksize=(10000,), meta=np.ndarray>
     TIME            (row) float64 dask.array<chunksize=(10000,), meta=np.ndarray>
     TIME_CENTROID   (row) float64 dask.array<chunksize=(10000,), meta=np.ndarray>
     UVW             (row, uvw) float64 dask.array<chunksize=(10000, 3), meta=np.ndarray>
     WEIGHT          (row, corr) float32 dask.array<chunksize=(10000, 4), meta=np.ndarray>
 Attributes:
     FIELD_ID:      0
     DATA_DESC_ID:  0]
@landmanbester
Copy link
Collaborator

I think you may just need a double colon for the subtable i.e. try with

ds = xds_from_ms('20221129_034113_15MHz.ms::SPECTRAL_WINDOW', group_cols="__row__")

@caseyjlaw
Copy link
Author

Thanks for the quick reply. Sorry, but I actually pasted the wrong error. Below is what I get when using the proper syntax.
Perhaps our MS is not valid somehow. Do you have a way to verify that the MS file meets the specification?

In [2]: ds = xds_from_ms('20221129_034113_15MHz.ms::SPECTRAL_WINDOW', group_cols="__row__")
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
~/.conda/envs/development/lib/python3.6/site-packages/daskms/table_proxy.py in __call__(cls, *args, **kwargs)
    158             try:
--> 159                 return _table_cache[key]
    160             except KeyError:

~/.conda/envs/development/lib/python3.6/weakref.py in __getitem__(self, key)
    136             self._commit_removals()
--> 137         o = self.data[key]()
    138         if o is None:

KeyError: -316058696854364970

During handling of the above exception, another exception occurred:

RuntimeError                              Traceback (most recent call last)
<ipython-input-2-ed052d620bc7> in <module>
----> 1 ds = xds_from_ms('20221129_034113_15MHz.ms::SPECTRAL_WINDOW', group_cols="__row__")

~/.conda/envs/development/lib/python3.6/site-packages/daskms/dask_ms.py in xds_from_ms(ms, columns, index_cols, group_cols, **kwargs)
    397                           index_cols=index_cols,
    398                           group_cols=group_cols,
--> 399                           **kwargs)
    400 
    401 

~/.conda/envs/development/lib/python3.6/site-packages/daskms/dask_ms.py in xds_from_table(table_name, columns, index_cols, group_cols, **kwargs)
    324     dask_datasets = DatasetFactory(table_name, columns,
    325                                    group_cols, index_cols,
--> 326                                    **kwargs).datasets()
    327 
    328     # Return dask datasets if xarray is not available

~/.conda/envs/development/lib/python3.6/site-packages/daskms/reads.py in datasets(self)
    397         elif len(self.group_cols) == 1 and self.group_cols[0] == "__row__":
    398             order_taql = ordering_taql(table_proxy, self.index_cols,
--> 399                                        self.taql_where)
    400             sorted_rows, row_runs = row_ordering(order_taql,
    401                                                  self.index_cols,

~/.conda/envs/development/lib/python3.6/site-packages/daskms/ordering.py in ordering_taql(table_proxy, index_cols, taql_where)
     74 
     75     return TableProxy(taql_factory, query, tables=[table_proxy],
---> 76                       __executor_key__=table_proxy.executor_key)
     77 
     78 

~/.conda/envs/development/lib/python3.6/site-packages/daskms/table_proxy.py in __call__(cls, *args, **kwargs)
    159                 return _table_cache[key]
    160             except KeyError:
--> 161                 instance = type.__call__(cls, *args, **kwargs)
    162                 _table_cache[key] = instance
    163                 return instance

~/.conda/envs/development/lib/python3.6/site-packages/daskms/table_proxy.py in __init__(self, factory, *args, **kwargs)
    341         # Private, should be inaccessible
    342         self._write = False
--> 343         self._writeable = ex.impl.submit(_iswriteable, table).result()
    344 
    345         should_be_writeable = not kwargs.get('readonly', True)

~/.conda/envs/development/lib/python3.6/concurrent/futures/_base.py in result(self, timeout)
    430                 raise CancelledError()
    431             elif self._state == FINISHED:
--> 432                 return self.__get_result()
    433             else:
    434                 raise TimeoutError()

~/.conda/envs/development/lib/python3.6/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

~/.conda/envs/development/lib/python3.6/concurrent/futures/thread.py in run(self)
     54 
     55         try:
---> 56             result = self.fn(*self.args, **self.kwargs)
     57         except BaseException as exc:
     58             self.future.set_exception(exc)

~/.conda/envs/development/lib/python3.6/site-packages/daskms/table_proxy.py in _iswriteable(table_future)
    236 
    237 def _iswriteable(table_future):
--> 238     return table_future.result().iswritable()
    239 
    240 

~/.conda/envs/development/lib/python3.6/concurrent/futures/_base.py in result(self, timeout)
    423                 raise CancelledError()
    424             elif self._state == FINISHED:
--> 425                 return self.__get_result()
    426 
    427             self._condition.wait(timeout)

~/.conda/envs/development/lib/python3.6/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

~/.conda/envs/development/lib/python3.6/concurrent/futures/thread.py in run(self)
     54 
     55         try:
---> 56             result = self.fn(*self.args, **self.kwargs)
     57         except BaseException as exc:
     58             self.future.set_exception(exc)

~/.conda/envs/development/lib/python3.6/site-packages/daskms/table_proxy.py in taql_factory(query, style, tables, readonly)
    189 
    190     try:
--> 191         return pt.taql(query, style=style, tables=tables)
    192     finally:
    193         for t in tables:

~/.conda/envs/development/lib/python3.6/site-packages/casacore/tables/table.py in taql(command, style, tables, globals, locals)
    152     if style:
    153         cmd = 'using style ' + style + ' ' + cmd
--> 154     tab = table(cmd, tabs, _oper=2)
    155     result = tab._getcalcresult()
    156     # If result is empty, it was a normal TaQL command resulting in a table.

~/.conda/envs/development/lib/python3.6/site-packages/casacore/tables/table.py in __init__(self, tablename, tabledesc, nrow, readonly, lockoptions, ack, dminfo, endian, memorytable, concatsubtables, _columnnames, _datatypes, _oper, _delete)
    327         elif _oper == 2:
    328             # This is the query or calc constructor.
--> 329             Table.__init__(self, tablename, tabledesc)
    330             if len(self._getcalcresult()) != 0:
    331                 # Do not make row object for a calc result

RuntimeError: Error in TaQL command: 'using style Python SELECT
	ROWID() as __tablerow__
FROM
	$1
ORDERBY
	TIME'
  Error in select expression: TIME is an unknown column (or keyword) in table /fast/claw/20221129_034113_15MHz.ms/SPECTRAL_WINDOW

@sjperkins
Copy link
Member

Try using xds_from_table instead of xds_from_ms?

ds = xds_from_table('20221129_034113_15MHz.ms::SPECTRAL_WINDOW', group_cols="__row__")

@caseyjlaw
Copy link
Author

Ok, that is working for me.
I am running into other issues when I try to reproduce the tutorials from the documentation. I think my difficulty is that the docs assume a "TEST.MS" with a specific structure (esp that it has two datasets).
Each of my MS files has one integration and one spectral window. I'd like an easy way to merge them together into a file with multiple spectral windows. Is that something that dask-ms can do easily?

@JSKenyon
Copy link
Collaborator

JSKenyon commented Dec 5, 2022

dask-ms can certainly do it as a virtual operation i.e. by concatenating the xarray.Dataset objects in memory. If your aim is to write a new ms that contains multiple spectral windows, that is probably doable but not necessarily simple. I think @sjperkins will be able to advise; I have limited experience with using dask-ms to author new measurement sets. The path of least resistance may be for @sjperkins or myself to add this functionality to dask-ms convert, as that should be guaranteed to have all the necessary metadata/subtables for writing a new measurement set - adding it as part of the xds_to_* functionality will likely be a bit fraught.

@sjperkins
Copy link
Member

sjperkins commented Dec 5, 2022

Ok, that is working for me. I am running into other issues when I try to reproduce the tutorials from the documentation. I think my difficulty is that the docs assume a "TEST.MS" with a specific structure (esp that it has two datasets).

There are three sources of information here that could be useful

  • def _ms_factory_impl(ms_name):
    rs = np.random.RandomState(42)
    ant_name = "::".join((ms_name, "ANTENNA"))
    ddid_name = "::".join((ms_name, "DATA_DESCRIPTION"))
    field_name = "::".join((ms_name, "FIELD"))
    pol_name = "::".join((ms_name, "POLARIZATION"))
    spw_name = "::".join((ms_name, "SPECTRAL_WINDOW"))
    kw = {"ack": False, "readonly": False}
    desc = {
    "DATA": {
    "_c_order": True,
    "comment": "DATA column",
    "dataManagerGroup": "StandardStMan",
    "dataManagerType": "StandardStMan",
    "keywords": {},
    "maxlen": 0,
    "ndim": 2,
    "option": 0,
    # 'shape': ..., # Variably shaped
    "valueType": "COMPLEX",
    }
    }
    na = 64
    corr_types = [[9, 10, 11, 12], [9, 12]]
    spw_chans = [16, 32]
    ddids = ([0, 0, 4], [1, 1, 6])
    with pt.default_ms(ms_name, desc) as ms:
    # Populate ANTENNA table
    with pt.table(ant_name, **kw) as A:
    A.addrows(na)
    A.putcol("POSITION", rs.random_sample((na, 3)) * 10000)
    A.putcol("OFFSET", rs.random_sample((na, 3)))
    A.putcol("NAME", ["ANT-%d" % i for i in range(na)])
    # Populate POLARIZATION table
    with pt.table(pol_name, **kw) as P:
    for r, corr_type in enumerate(corr_types):
    P.addrows(1)
    P.putcol("NUM_CORR", np.array(len(corr_type))[None], startrow=r, nrow=1)
    P.putcol("CORR_TYPE", np.array(corr_type)[None, :], startrow=r, nrow=1)
    # Populate SPECTRAL_WINDOW table
    with pt.table(spw_name, **kw) as SPW:
    freq_start = 0.856e9
    freq_end = 2 * 0.856e9
    for r, nchan in enumerate(spw_chans):
    chan_width = (freq_end - freq_start) / nchan
    chan_width = np.full(nchan, chan_width)
    chan_freq = np.linspace(freq_start, freq_end, nchan)
    ref_freq = chan_freq[chan_freq.size // 2]
    SPW.addrows(1)
    SPW.putcol("NUM_CHAN", np.array(nchan)[None], startrow=r, nrow=1)
    SPW.putcol("CHAN_WIDTH", chan_width[None, :], startrow=r, nrow=1)
    SPW.putcol("CHAN_FREQ", chan_freq[None, :], startrow=r, nrow=1)
    SPW.putcol(
    "REF_FREQUENCY", np.array(ref_freq)[None], startrow=r, nrow=1
    )
    # Populate FIELD table
    with pt.table(field_name, **kw) as F:
    fields = (["3C147", np.deg2rad([0, 60])], ["3C147", np.deg2rad([30, 45])])
    npoly = 1
    for r, (name, phase_dir) in enumerate(fields):
    F.addrows(1)
    F.putcol("NAME", [name], startrow=r, nrow=1)
    F.putcol("NUM_POLY", np.array(npoly)[None], startrow=r, nrow=1)
    # Set all these to the phase centre
    for c in ["PHASE_DIR", "REFERENCE_DIR", "DELAY_DIR"]:
    F.putcol(c, phase_dir[None, None, :], startrow=r, nrow=1)
    # Populate DATA_DESCRIPTION table
    with pt.table(ddid_name, **kw) as D:
    for r, (spw_id, pol_id, _) in enumerate(ddids):
    D.addrows(1)
    D.putcol(
    "SPECTRAL_WINDOW_ID", np.array(spw_id)[None], startrow=r, nrow=1
    )
    D.putcol("POLARIZATION_ID", np.array(pol_id)[None], startrow=r, nrow=1)
    startrow = 0
    # Add some data to the main table
    for ddid, (spw_id, pol_id, rows) in enumerate(ddids):
    ms.addrows(rows)
    ms.putcol("DATA_DESC_ID", np.full(rows, ddid), startrow=startrow, nrow=rows)
    nchan = spw_chans[spw_id]
    ncorr = len(corr_types[pol_id])
    vis = (
    np.random.random((rows, nchan, ncorr))
    + np.random.random((rows, nchan, ncorr)) * 1j
    )
    ms.putcol("DATA", vis, startrow=startrow, nrow=rows)
    startrow += rows
    calls example_ms which crafts a minimal multi-SPW, multi-POL test MS using python-casacore. This was probably the TEST.MS that the docs where referring to (Apologies, its been a while).
  • def test_ms_create(tmp_path, chunks, num_chans, corr_types, sources):
    accomplishes similar but using the xarray and dask interface.

Each of my MS files has one integration and one spectral window. I'd like an easy way to merge them together into a file with multiple spectral windows. Is that something that dask-ms can do easily?

It's doable but you need to do some book-keeping, particularly with respect to the:

  • DATA_DESC_ID column in the main table
  • SPECTRAL_WINDOW_ID and POLARIZATION_ID columns in the DATA_DESCRIPTION sub-tables
  • The relevant SPECTRAL_WINDOW and POLARIZATION sub-tables.

An example of how this can be accomplished in an actual application is available here in our averaging application, xova which handles the SPW case as generally as possible.

If the structure of your input MS's are well-defined, you may want to exploit that instead of going with the above approach.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants