Skip to content

Commit

Permalink
Merge pull request #70 from Deltares/small_things
Browse files Browse the repository at this point in the history
Small things
  • Loading branch information
DirkEilander committed Feb 18, 2022
2 parents 559c569 + be4a32f commit fced2d1
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 50 deletions.
14 changes: 10 additions & 4 deletions docs/changelog.rst
Expand Up @@ -15,20 +15,26 @@ Added
- workflow.river.river_bathymetry method to derive river width and depth estimates.
Note that the new river width estimates are different and result in different model results.
- moved basemaps workflows (hydrography and topography) from hydromt core.
- new ID columns for the outlets staticgeoms
- new ``index_col`` attribute to setup_gauges to choose a specific column of gauges_fn as ID for wflow_gauges

Changed
^^^^^^^
- Moved interpolate_na function to be done only on output dataset (i.e. on the model resolution), rather then on the original data resolution. This change will generate small differences in the parameter values, but (largely) improve memory usage.
- setup_riverwidth method deprecated (will be removed in future versions) in favour of setup_rivers.
- setup_rivers takes an additional river_geom_fn argument with a river segment geometry file to calculate river width and depth from its attributes
- Possibility to use any datasets and not just the default ones for setup_laimaps, setup_lakes, setup_glaciers

Fixed
^^^^^
- Calculation of lake_b parameter in setup_lakes.
- Add a minimum averaged discharge to lakes to avoid division by zero when computing lake_b.
- When writting several forcing files instead of one, their time_units should be the same to get one wflow run (time_units option in write_forcing)
- Filter gauges that could not be snapped to river (if snap_to_river is True) in setup_gauges
- Avoid duplicates in the toml csv column for gauges
- Fill missing values in landslope with zeros within the basin mask
- prevent writing a _FillValue on the time coordinate of forcing data

Changed
^^^^^^^^
- setup_riverwidth method deprecated (will be removed in future versions) in favour of setup_rivers.
- setup_rivers takes an additional river_geom_fn argument with a river segment geometry file to calculate river width and depth from its attributes

v0.1.3 (4 October 2021)
-------------------------
Expand Down
2 changes: 1 addition & 1 deletion examples/wflow_piave_subbasin/staticgeoms/gauges.geojson
Expand Up @@ -2,6 +2,6 @@
"type": "FeatureCollection",
"crs": { "type": "name", "properties": { "name": "urn:ogc:def:crs:OGC:1.3:CRS84" } },
"features": [
{ "type": "Feature", "properties": { }, "geometry": { "type": "Point", "coordinates": [ 12.2, 45.833333333333329 ] } }
{ "type": "Feature", "properties": { "fid": 1 }, "geometry": { "type": "Point", "coordinates": [ 12.2, 45.833333333333329 ] } }
]
}
Expand Up @@ -2,6 +2,6 @@
"type": "FeatureCollection",
"crs": { "type": "name", "properties": { "name": "urn:ogc:def:crs:OGC:1.3:CRS84" } },
"features": [
{ "type": "Feature", "properties": { }, "geometry": { "type": "Point", "coordinates": [ 12.2, 45.833333333333329 ] } }
{ "type": "Feature", "properties": { "fid": 1 }, "geometry": { "type": "Point", "coordinates": [ 12.2, 45.833333333333329 ] } }
]
}
140 changes: 96 additions & 44 deletions hydromt_wflow/wflow.py
Expand Up @@ -227,6 +227,7 @@ def setup_basemaps(
ds_topo = workflows.topography(
ds=ds_org, ds_like=self.staticmaps, method="average", logger=self.logger
)
ds_topo["lndslp"] = np.maximum(ds_topo["lndslp"], 0.0)
rmdict = {k: v for k, v in self._MAPS.items() if k in ds_topo.data_vars}
self.set_staticmaps(ds_topo.rename(rmdict))
# set basin geometry
Expand Down Expand Up @@ -652,7 +653,7 @@ def setup_laimaps(self, lai_fn="model_lai"):
* Required variables: ['LAI']
"""
if lai_fn not in ["modis_lai"]:
if lai_fn not in self.data_catalog:
self.logger.warning(f"Invalid source '{lai_fn}', skipping setup_laimaps.")
return
# retrieve data for region
Expand All @@ -671,6 +672,7 @@ def setup_gauges(
self,
gauges_fn="grdc",
source_gdf=None,
index_col=None,
snap_to_river=True,
mask=None,
derive_subcatch=False,
Expand Down Expand Up @@ -707,6 +709,8 @@ def setup_gauges(
Known source name or path to gauges file geometry file, by default None.
source_gdf : geopandas.GeoDataFame, optional
Direct gauges file geometry, by default None.
index_col : str, optional
Column in gauges_fn to use for ID values, by default None (use the default index column)
snap_to_river : bool, optional
Snap point locations to the closest downstream river cell, by default True
mask : np.boolean, optional
Expand All @@ -731,7 +735,7 @@ def setup_gauges(

if derive_outlet:
self.logger.info(f"Gauges locations set based on river outlets.")
da, idxs, ids = flw.gaugemap(self.staticmaps, idxs=self.flwdir.idxs_pit)
da, idxs, ids = flw.gauge_map(self.staticmaps, idxs=self.flwdir.idxs_pit)
# Only keep river outlets for gauges
da = da.where(self.staticmaps[self._MAPS["rivmsk"]], da.raster.nodata)
ids_da = np.unique(da.values[da.values > 0])
Expand All @@ -741,6 +745,7 @@ def setup_gauges(
gdf = gpd.GeoDataFrame(
index=ids_da.astype(np.int32), geometry=points, crs=self.crs
)
gdf["fid"] = ids_da.astype(np.int32)
self.set_staticgeoms(gdf, name="gauges")
self.logger.info(f"Gauges map based on catchment river outlets added.")

Expand Down Expand Up @@ -780,6 +785,9 @@ def setup_gauges(
self.logger.info(
f"{gdf.index.size} {basename} gauge locations found within domain"
)
# Set index to index_col
if index_col is not None and index_col in gdf:
gdf = gdf.set_index(index_col)
xs, ys = np.vectorize(lambda p: (p.xy[0][0], p.xy[1][0]))(
gdf["geometry"]
)
Expand All @@ -788,27 +796,47 @@ def setup_gauges(

if snap_to_river and mask is None:
mask = self.staticmaps[self._MAPS["rivmsk"]].values
da, idxs, _ = flw.gaugemap(
da, idxs, ids = flw.gauge_map(
self.staticmaps,
idxs=idxs,
ids=ids,
mask=mask,
stream=mask,
flwdir=self.flwdir,
logger=self.logger,
)

# Filter gauges that could not be snapped to rivers
if snap_to_river:
ids_old = ids.copy()
da = da.where(
self.staticmaps[self._MAPS["rivmsk"]], da.raster.nodata
)
ids_new = np.unique(da.values[da.values > 0])
idxs = idxs[np.isin(ids_old, ids_new)]
ids = da.values.flat[idxs]
# Add to staticmaps
mapname = f'{str(self._MAPS["gauges"])}_{basename}'
self.set_staticmaps(da, name=mapname)

# geoms
points = gpd.points_from_xy(*self.staticmaps.raster.idx_to_xy(idxs))
# if csv contains additional columns, these are also written in the staticgeoms
gdf_snapped = gpd.GeoDataFrame(index=ids, geometry=points, crs=self.crs)
gdf["geometry"] = gdf_snapped.geometry
# if first column has no name, give it a default name otherwise column is omitted when written to geojson
if gdf.index.name is None:
gdf_snapped = gpd.GeoDataFrame(
index=ids.astype(np.int32), geometry=points, crs=self.crs
)
# Set the index name of gdf snapped based on original gdf
if gdf.index.name is not None:
gdf_snapped.index.name = gdf.index.name
else:
gdf_snapped.index.name = "fid"
gdf.index.name = "fid"
self.set_staticgeoms(gdf, name=mapname.replace("wflow_", ""))
# Add gdf attributes to gdf_snapped (filter on snapped index before merging)
df_attrs = pd.DataFrame(gdf.drop(columns="geometry"))
df_attrs = df_attrs[np.isin(df_attrs.index, gdf_snapped.index)]
gdf_snapped = gdf_snapped.merge(
df_attrs, how="inner", on=gdf.index.name
)
# Add gdf_snapped to staticgeoms
self.set_staticgeoms(gdf_snapped, name=mapname.replace("wflow_", ""))

# # Add new outputcsv section in the config
if gauge_toml_param is None and update_toml:
Expand All @@ -819,13 +847,14 @@ def setup_gauges(
self.set_config(f"input.gauges_{basename}", f"{mapname}")
if self.get_config("csv") is not None:
for o in range(len(gauge_toml_param)):
self.config["csv"]["column"].append(
{
"header": gauge_toml_header[o],
"map": f"gauges_{basename}",
"parameter": gauge_toml_param[o],
}
)
gauge_toml_dict = {
"header": gauge_toml_header[o],
"map": f"gauges_{basename}",
"parameter": gauge_toml_param[o],
}
# If the gauge outcsv column already exists skip writting twice
if gauge_toml_dict not in self.config["csv"]["column"]:
self.config["csv"]["column"].append(gauge_toml_dict)
self.logger.info(f"Gauges map from {basename} added.")

# add subcatch
Expand Down Expand Up @@ -881,9 +910,9 @@ def setup_lakes(self, lakes_fn="hydro_lakes", min_area=10.0):
"""This component generates maps of lake areas and outlets as well as parameters
with average lake area, depth a discharge values.
The data is generated from features with ``min_area`` [km2] from a database with
lake geometry, IDs and metadata. Currently, "hydro_lakes" (hydroLakes) is the only
supported ``lakes_fn`` data source and we use a default minimum area of 1 km2.
The data is generated from features with ``min_area`` [km2] (default 1 km2) from a database with
lake geometry, IDs and metadata. Data required are lake ID 'waterbody_id', average area 'Area_avg' [m2],
average volume 'Vol_avg' [m3], average depth 'Depth_avg' [m] and average discharge 'Dis_avg' [m3/s].
Adds model layers:
Expand Down Expand Up @@ -920,9 +949,6 @@ def setup_lakes(self, lakes_fn="hydro_lakes", min_area=10.0):
"input.lateral.river.lake.waterlevel": "LakeAvgLevel",
}

if lakes_fn not in ["hydro_lakes"]:
self.logger.warning(f"Invalid source '{lakes_fn}', skipping setup_lakes.")
return
gdf_org, ds_lakes = self._setup_waterbodies(lakes_fn, "lake", min_area)
if ds_lakes is not None:
rmdict = {k: v for k, v in self._MAPS.items() if k in ds_lakes.data_vars}
Expand All @@ -936,6 +962,9 @@ def setup_lakes(self, lakes_fn="hydro_lakes", min_area=10.0):
"Dis_avg": "LakeAvgOut",
}
)
# Minimum value for LakeAvgOut
LakeAvgOut = gdf_org["LakeAvgOut"].copy()
gdf_org["LakeAvgOut"] = np.maximum(gdf_org["LakeAvgOut"], 0.01)
gdf_org["Lake_b"] = gdf_org["LakeAvgOut"].values / (
gdf_org["LakeAvgLevel"].values
) ** (2)
Expand All @@ -945,6 +974,12 @@ def setup_lakes(self, lakes_fn="hydro_lakes", min_area=10.0):
gdf_org["LakeThreshold"] = 0.0
gdf_org["LinkedLakeLocs"] = 0

# Check if some LakeAvgOut values have been replaced
if not np.all(LakeAvgOut == gdf_org["LakeAvgOut"]):
self.logger.warning(
"Some values of LakeAvgOut have been replaced by a minimum value of 0.01m3/s"
)

lake_params = [
"waterbody_id",
"LakeArea",
Expand Down Expand Up @@ -983,14 +1018,30 @@ def setup_reservoirs(
priority_jrc=True,
**kwargs,
):
"""This component generates maps of lake areas and outlets as well as parameters
"""This component generates maps of reservoir areas and outlets as well as parameters
with average reservoir area, demand, min and max target storage capacities and
discharge capacity values.
The data is generated from features with ``min_area`` [km2]
The data is generated from features with ``min_area`` [km2] (default is 1 km2)
from a database with reservoir geometry, IDs and metadata.
Currently, "hydro_reservoirs" (based on GRAND) is the only supported ``reservoirs_fn``
data source and we use a default minimum area of 1 km2.
Data requirements for direct use (ie wflow parameters are data already present in reservoirs_fn)
are reservoir ID 'waterbody_id', area 'ResSimpleArea' [m2], maximum volume 'ResMaxVolume' [m3],
the targeted minimum and maximum fraction of water volume in the reservoir 'ResTargetMinFrac'
and 'ResTargetMaxFrac' [-], the average water demand ResDemand [m3/s] and the maximum release of
the reservoir before spilling 'ResMaxRelease' [m3/s].
In case the wflow parameters are not directly available they can be computed by HydroMT using other
reservoir characteristics. If not enough characteristics are available, the hydroengine tool will be
used to download additionnal timeseries from the JRC database.
The required variables for computation of the parameters with hydroengine are reservoir ID 'waterbody_id',
reservoir ID in the HydroLAKES database 'Hylak_id', average volume 'Vol_avg' [m3], average depth 'Depth_avg'
[m], average discharge 'Dis_avg' [m3/s] and dam height 'Dam_height' [m].
To compute parameters without using hydroengine, the required varibales in reservoirs_fn are reservoir ID 'waterbody_id',
average area 'Area_avg' [m2], average volume 'Vol_avg' [m3], average depth 'Depth_avg' [m], average discharge 'Dis_avg'
[m3/s] and dam height 'Dam_height' [m] and minimum / normal / maximum storage capacity of the dam 'Capacity_min',
'Capacity_norm', 'Capacity_max' [m3].
Adds model layers:
Expand Down Expand Up @@ -1045,12 +1096,6 @@ def setup_reservoirs(
"input.lateral.river.reservoir.targetminfrac": "ResTargetMinFrac",
}

# path or filename. get_geodataframe
if reservoirs_fn not in self.data_catalog:
self.logger.warning(
f"Invalid source '{reservoirs_fn}', skipping setup_reservoirs."
)
return
gdf_org, ds_res = self._setup_waterbodies(reservoirs_fn, "reservoir", min_area)
# TODO: check if there are missing values in the above columns of the parameters tbls =
# if everything is present, skip calculate reservoirattrs() and directly make the maps
Expand Down Expand Up @@ -1231,10 +1276,12 @@ def setup_glaciers(self, glaciers_fn="rgi", min_area=1):
as well as tables with temperature threshold, melting factor and snow-to-ice
convertion fraction.
The data is generated from features with ``min_area`` [km2]
The data is generated from features with ``min_area`` [km2] (default is 1 km2)
from a database with glacier geometry, IDs and metadata.
Currently, "rgi" (Randolph Glacier Inventory) is the only supported ``glaciers_fn``
data source and we use a default minimum area of 1 km2.
The required variables from glaciers_fn dataset are glacier ID 'simple_id'.
Optionnally glacier area 'AREA' [km2] can be present to filter the glaciers
by size. If not present it will be computed on the fly.
Adds model layers:
Expand Down Expand Up @@ -1264,12 +1311,6 @@ def setup_glaciers(self, glaciers_fn="rgi", min_area=1):
"input.vertical.g_sifrac": "G_SIfrac",
}
# retrieve data for basin
if glaciers_fn not in ["rgi"]:
if glaciers_fn:
self.logger.warning(
f"Invalid source '{glaciers_fn}', skipping setup_glaciers."
)
return
self.logger.info(f"Preparing glacier maps.")
gdf_org = self.data_catalog.get_geodataframe(
glaciers_fn, geom=self.basins, predicate="contains"
Expand All @@ -1288,7 +1329,7 @@ def setup_glaciers(self, glaciers_fn="rgi", min_area=1):
ds_glac = workflows.glaciermaps(
gdf=gdf_org,
ds_like=self.staticmaps,
id_column="simple_id", # TODO set id as index in data adapter
id_column="simple_id",
elevtn_name=self._MAPS["elevtn"],
logger=self.logger,
)
Expand Down Expand Up @@ -1703,7 +1744,13 @@ def read_forcing(self):
self.set_forcing(ds[v])

def write_forcing(
self, fn_out=None, freq_out=None, chunksize=1, decimals=2, **kwargs
self,
fn_out=None,
freq_out=None,
chunksize=1,
decimals=2,
time_units="days since 1900-01-01T00:00:00",
**kwargs,
):
"""write forcing at `fn_out` in model ready format.
Expand All @@ -1726,6 +1773,8 @@ def write_forcing(
Chunksize on time dimension when saving to disk. By default 1.
decimals, int, optional
Round the ouput data to the given number of decimals.
time_units: str, optional
Common time units when writting several netcdf forcing files. By default "days since 1900-01-01T00:00:00".
"""
if not self._write:
Expand Down Expand Up @@ -1799,6 +1848,7 @@ def write_forcing(
for v in ds.data_vars.keys()
}
# make sure no _FillValue is written to the time dimension
# For several forcing files add common units attributes to time
ds["time"].attrs.pop("_FillValue", None)
encoding["time"] = {"_FillValue": None}

Expand All @@ -1814,6 +1864,8 @@ def write_forcing(
forcing_list.append([fn_out, ds])
else:
self.logger.info(f"Writting several forcing with freq {freq_out}")
# For several forcing files add common units attributes to time
encoding["time"] = {"_FillValue": None, "units": time_units}
# Updating path forcing in config
fns_out = os.path.relpath(fn_out, self.root)
fns_out = f"{str(fns_out)[0:-3]}_*.nc"
Expand Down

0 comments on commit fced2d1

Please sign in to comment.